#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright: Nis Wechselberg # MIT License (see https://spdx.org/licenses/MIT.html) from __future__ import absolute_import, print_function DOCUMENTATION = r''' --- module: forgejo_auth short_description: Module to manage forgejo oauth authentication sources version_added: "1.0.0" description: This module uses the forgejo api to manage the configured oauth authentication sources. It can add, remove or update an authentication source. This module only works for forgejo running inside containers, created through this collections role. There are several assumptions regardings names and ids. author: - Nis Wechselberg (@eNBeWe) ''' EXAMPLES = r''' # Create a new auth provider, but don't touch existing source - name: 'Add authentication source' enbewe.forgejo.forgejo_oauth: state: 'present' update: true name: 'eNBeWe.eu SSO' provider: 'openidConnect' key: 'clientIdConfiguredInAuthProvider' secret: 'secretKeyOfTheClientIdInAuthProvider' auto_discover_url: 'https://' skip_local_2fa: true ''' RETURN = r''' # Return values ''' from ansible.module_utils.basic import AnsibleModule class ForgejoOAuth: ''' Handles the management of forgejo authentication sources. ''' module = None def __init__(self, module: AnsibleModule): ''' Create a new OAuth Handler ''' self.module = module self.name = module.params.get('name') def run(self): ''' Apply the configuration. ''' oauth_source_already_exists, oauth_source_id = self.get_oauth_source_with_name() if self.module.params.get('state') == 'absent': if oauth_source_already_exists: return self.remove_auth_source(oauth_source_id) return { 'failed': False, 'changed': False, 'msg': f'OAuth2 authorization source {self.name} is not configured.' } if self.module.params.get('state') == 'present': if oauth_source_already_exists and self.module.params.get('update'): return self.update_oauth_source(oauth_source_id) if oauth_source_already_exists: return { 'failed': False, 'changed': False, 'msg': f'OAuth2 authorization source {self.name} already exists.' } return self.add_oauth_source() return { 'failed': True, 'changed': False, 'msg': 'Misconfigured plugin, unknown state.' } def get_oauth_source_with_name(self): ''' Check the list of configured authentication sources for an existing OAuth source of the given name. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'list', '--vertical-bars' ] _, auth_list_stdout, _ = self.module.run_command(args_list) auth_table_rows = auth_list_stdout.split("\n") if len(auth_table_rows) > 2: for auth_id in range(1, len(auth_table_rows) - 1): auth_row = auth_table_rows[auth_id].split('|') auth_id = int(auth_row[0].strip()) auth_name = auth_row[1].strip() auth_type = auth_row[2].strip() if auth_type == 'OAuth2' and auth_name == self.name: return (True, auth_id) return (False, -1) def add_oauth_source(self): ''' Add a new OAuth2 source with the given configuration. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'add-oauth', ] args_list += self.create_command_arguments() auth_add_rc, auth_add_stdout, _ = self.module.run_command(args_list) if auth_add_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} successful created.' } return { 'failed': True, 'msg': auth_add_stdout } def remove_auth_source(self, source_id): ''' Removes the authentication source with the given id from the config. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'delete', '--id', str(source_id), ] auth_delete_rc, auth_delete_stdout, _ = self.module.run_command(args_list) if auth_delete_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} removed.' } return { 'failed': True, 'msg': auth_delete_stdout } def update_oauth_source(self, source_id): ''' Updates the authentication source with the given id to match the passed config. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'update-oauth', '--id', str(source_id), ] args_list += self.create_command_arguments() auth_update_rc, auth_update_stdout, _ = self.module.run_command(args_list) if auth_update_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} has been updated' } return { 'failed': True, 'msg': auth_update_stdout } def create_command_arguments(self): ''' Create the command arguments for cli commands from the passed config. ''' result = [ '--name', self.name, '--provider', self.module.params.get('provider'), '--key', self.module.params.get('key'), '--secret', self.module.params.get('secret'), '--auto-discover-url', (self.module.params.get('auto_discover_url') if self.module.params.get('auto_discover_url') else ""), '--custom-tenant-id', (self.module.params.get('custom_tenant_id') if self.module.params.get('custom_tenant_id') else ""), '--custom-auth-url', (self.module.params.get('custom_auth_url') if self.module.params.get('custom_auth_url') else ""), '--custom-token-url', (self.module.params.get('custom_token_url') if self.module.params.get('custom_token_url') else ""), '--custom-profile-url', (self.module.params.get('custom_profile_url') if self.module.params.get('custom_profile_url') else ""), '--custom-email-url', (self.module.params.get('custom_email_url') if self.module.params.get('custom_email_url') else ""), '--icon-url', (self.module.params.get('icon_url') if self.module.params.get('icon_url') else ""), '--required-claim-name', (self.module.params.get('required_claim_name') if self.module.params.get('required_claim_name') else ""), '--required-claim-value', (self.module.params.get('required_claim_value') if self.module.params.get('required_claim_value') else ""), '--group-claim-name', (self.module.params.get('group_claim_name') if self.module.params.get('group_claim_name') else ""), '--admin-group', (self.module.params.get('admin_group') if self.module.params.get('admin_group') else ""), '--restricted-group', (self.module.params.get('restricted_group') if self.module.params.get('restricted_group') else ""), '--group-team-map', (self.module.params.get('group_team_map') if self.module.params.get('group_team_map') else ""), ] for scope in self.module.params.get('scopes').split(): result.append('--scopes') result.append(scope) if self.module.params.get('use_custom_urls'): result.append('--use-custom-urls') if self.module.params.get('skip_local_2fa'): result.append('--skip-local-2fa') if self.module.params.get('group_team_map_removal'): result.append('--group-team-map-removal') return result def main(): ''' Run the module code. ''' module_args = { 'state': {'type': str, 'default': 'present', 'choices': ['present', 'absent']}, 'update': {'type': bool, 'default': False}, 'name': {'type': str, 'required': True}, 'provider': {'type': str, 'required': True}, 'key': {'type': str, 'required': True}, 'secret': {'type': str, 'required': True}, 'auto_discover_url': {'type': str, 'required': False}, 'use_custom_urls': {'type': bool, 'default': False}, 'custom_tenant_id': {'type': str, 'required': False}, 'custom_auth_url': {'type': str, 'required': False}, 'custom_token_url': {'type': str, 'required': False}, 'custom_profile_url': {'type': str, 'required': False}, 'custom_email_url': {'type': str, 'required': False}, 'icon_url': {'type': str, 'required': False}, 'skip_local_2fa': {'type': bool, 'default': False}, 'scopes': {'type': str, 'required': False}, 'required_claim_name': {'type': str, 'required': False}, 'required_claim_value': {'type': str, 'required': False}, 'group_claim_name': {'type': str, 'required': False}, 'admin_group': {'type': str, 'required': False}, 'restricted_group': {'type': str, 'required': False}, 'group_team_map': {'type': str, 'required': False}, 'group_team_map_removal': {'type': bool, 'default': False}, } module = AnsibleModule( argument_spec = module_args, supports_check_mode = False, ) forgejo_auth_module = ForgejoOAuth(module) result = forgejo_auth_module.run() module.exit_json(**result) if __name__ == '__main__': main()