#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright: Nis Wechselberg # MIT License (see https://spdx.org/licenses/MIT.html) from __future__ import absolute_import, print_function DOCUMENTATION = r''' --- module: forgejo_oauth short_description: Module to manage forgejo oauth authentication sources description: - This module uses the forgejo api to manage the configured oauth authentication sources. - It can add, remove or update an authentication source. - This module only works for forgejo running inside containers, created through this collections role. - There are several assumptions regardings names and ids. version_added: '1.0.0' author: Nis Wechselberg (@eNBeWe) options: state: description: Controls whether the OAuth source should be added or removed. default: 'present' choices: - 'present' - 'absent' type: str update: description: Update the OAuth source if it already exists. default: false type: bool name: description: The name of the OAuth source in the Forgejo configuration. required: true type: str provider: description: The type of OAuth source to configure. required: true type: str key: description: The OAuth2 client id configured in the auth source. required: true type: str secret: description: The OAuth2 client secret key configured in the auth source. required: true type: str auto_discover_url: description: The autoconfiguration url to use for openIdConnect. type: str use_custom_urls: description: Whether to use detailed urls or use autoconfiguration. default: false type: bool custom_tenant_id: description: Custom configuration of the OAuth source. type: str custom_auth_url: description: Custom configuration of the OAuth source. type: str custom_token_url: description: Custom configuration of the OAuth source. type: str custom_profile_url: description: Custom configuration of the OAuth source. type: str custom_email_url: description: Custom configuration of the OAuth source. type: str icon_url: description: Url of the icon to use with the source. type: str skip_local_2fa: description: Whether local two-factor-authentication should be ignored when using the source. default: false type: bool scopes: description: The scopes to request in the authentication process. type: str required_claim_name: description: The OAuth claim to check when deciding if the access should be granted. type: str required_claim_value: description: The value the OAuth claim should fulfil to grant access. type: str group_claim_name: description: The OAuth claim to check for group membership. type: str admin_group: description: The OAuth group that will automatically grant admin privileges. type: str restricted_group: description: The OAuth group that will automatically restrict users. type: str group_team_map: description: Mapping configuration that matches OAuth groups to organisation teams. type: str group_team_map_removal: default: false type: bool ''' EXAMPLES = r''' # Create a new auth provider, but don't touch existing source - name: 'Add authentication source' enbewe.forgejo.forgejo_oauth: state: 'present' update: true name: 'eNBeWe.eu SSO' provider: 'openidConnect' key: 'clientIdConfiguredInAuthProvider' secret: 'secretKeyOfTheClientIdInAuthProvider' auto_discover_url: 'https://' skip_local_2fa: true ''' RETURN = r''' # Return values ''' from ansible.module_utils.basic import AnsibleModule class ForgejoOAuth: ''' Handles the management of forgejo authentication sources. ''' module = None def __init__(self, module: AnsibleModule): ''' Create a new OAuth Handler ''' self.module = module self.name = module.params.get('name') def run(self): ''' Apply the configuration. ''' oauth_source_already_exists, oauth_source_id = self.get_oauth_source_with_name() if self.module.params.get('state') == 'absent': if oauth_source_already_exists: return self.remove_auth_source(oauth_source_id) return { 'failed': False, 'changed': False, 'msg': f'OAuth2 authorization source {self.name} is not configured.' } if self.module.params.get('state') == 'present': if oauth_source_already_exists and self.module.params.get('update'): return self.update_oauth_source(oauth_source_id) if oauth_source_already_exists: return { 'failed': False, 'changed': False, 'msg': f'OAuth2 authorization source {self.name} already exists.' } return self.add_oauth_source() return { 'failed': True, 'changed': False, 'msg': 'Misconfigured plugin, unknown state.' } def get_oauth_source_with_name(self): ''' Check the list of configured authentication sources for an existing OAuth source of the given name. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'list', '--vertical-bars' ] _, auth_list_stdout, _ = self.module.run_command(args_list) auth_table_rows = auth_list_stdout.split("\n") if len(auth_table_rows) > 2: for auth_id in range(1, len(auth_table_rows) - 1): auth_row = auth_table_rows[auth_id].split('|') auth_id = int(auth_row[0].strip()) auth_name = auth_row[1].strip() auth_type = auth_row[2].strip() if auth_type == 'OAuth2' and auth_name == self.name: return (True, auth_id) return (False, -1) def add_oauth_source(self): ''' Add a new OAuth2 source with the given configuration. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'add-oauth', ] args_list += self.create_command_arguments() auth_add_rc, auth_add_stdout, _ = self.module.run_command(args_list) if auth_add_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} successful created.' } return { 'failed': True, 'msg': auth_add_stdout } def remove_auth_source(self, source_id): ''' Removes the authentication source with the given id from the config. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'delete', '--id', str(source_id), ] auth_delete_rc, auth_delete_stdout, _ = self.module.run_command(args_list) if auth_delete_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} removed.' } return { 'failed': True, 'msg': auth_delete_stdout } def update_oauth_source(self, source_id): ''' Updates the authentication source with the given id to match the passed config. ''' args_list = [ 'podman', 'exec', '-u', '1000:1000', '-it', 'forgejo-app', 'forgejo', 'admin', 'auth', 'update-oauth', '--id', str(source_id), ] args_list += self.create_command_arguments() auth_update_rc, auth_update_stdout, _ = self.module.run_command(args_list) if auth_update_rc == 0: return { 'failed': False, 'changed': True, 'msg': f'OAuth2 authorization source {self.name} has been updated' } return { 'failed': True, 'msg': auth_update_stdout } def create_command_arguments(self): ''' Create the command arguments for cli commands from the passed config. ''' result = [ '--name', self.name, '--provider', self.module.params.get('provider'), '--key', self.module.params.get('key'), '--secret', self.module.params.get('secret'), '--auto-discover-url', (self.module.params.get('auto_discover_url') if self.module.params.get('auto_discover_url') else ""), '--custom-tenant-id', (self.module.params.get('custom_tenant_id') if self.module.params.get('custom_tenant_id') else ""), '--custom-auth-url', (self.module.params.get('custom_auth_url') if self.module.params.get('custom_auth_url') else ""), '--custom-token-url', (self.module.params.get('custom_token_url') if self.module.params.get('custom_token_url') else ""), '--custom-profile-url', (self.module.params.get('custom_profile_url') if self.module.params.get('custom_profile_url') else ""), '--custom-email-url', (self.module.params.get('custom_email_url') if self.module.params.get('custom_email_url') else ""), '--icon-url', (self.module.params.get('icon_url') if self.module.params.get('icon_url') else ""), '--required-claim-name', (self.module.params.get('required_claim_name') if self.module.params.get('required_claim_name') else ""), '--required-claim-value', (self.module.params.get('required_claim_value') if self.module.params.get('required_claim_value') else ""), '--group-claim-name', (self.module.params.get('group_claim_name') if self.module.params.get('group_claim_name') else ""), '--admin-group', (self.module.params.get('admin_group') if self.module.params.get('admin_group') else ""), '--restricted-group', (self.module.params.get('restricted_group') if self.module.params.get('restricted_group') else ""), '--group-team-map', (self.module.params.get('group_team_map') if self.module.params.get('group_team_map') else ""), ] for scope in self.module.params.get('scopes').split(): result.append('--scopes') result.append(scope) if self.module.params.get('use_custom_urls'): result.append('--use-custom-urls') if self.module.params.get('skip_local_2fa'): result.append('--skip-local-2fa') if self.module.params.get('group_team_map_removal'): result.append('--group-team-map-removal') return result def main(): ''' Run the module code. ''' module_args = { 'state': {'type': str, 'default': 'present', 'choices': ['present', 'absent']}, 'update': {'type': bool, 'default': False}, 'name': {'type': str, 'required': True}, 'provider': {'type': str, 'required': True}, 'key': {'type': str, 'required': True}, 'secret': {'type': str, 'required': True}, 'auto_discover_url': {'type': str, 'required': False}, 'use_custom_urls': {'type': bool, 'default': False}, 'custom_tenant_id': {'type': str, 'required': False}, 'custom_auth_url': {'type': str, 'required': False}, 'custom_token_url': {'type': str, 'required': False}, 'custom_profile_url': {'type': str, 'required': False}, 'custom_email_url': {'type': str, 'required': False}, 'icon_url': {'type': str, 'required': False}, 'skip_local_2fa': {'type': bool, 'default': False}, 'scopes': {'type': str, 'required': False}, 'required_claim_name': {'type': str, 'required': False}, 'required_claim_value': {'type': str, 'required': False}, 'group_claim_name': {'type': str, 'required': False}, 'admin_group': {'type': str, 'required': False}, 'restricted_group': {'type': str, 'required': False}, 'group_team_map': {'type': str, 'required': False}, 'group_team_map_removal': {'type': bool, 'default': False}, } module = AnsibleModule( argument_spec = module_args, supports_check_mode = False, ) forgejo_auth_module = ForgejoOAuth(module) result = forgejo_auth_module.run() module.exit_json(**result) if __name__ == '__main__': main()