ansible-collection-forgejo/plugins/modules/forgejo_oauth.py

300 lines
10 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: Nis Wechselberg <enbewe@enbewe.de>
# MIT License (see https://spdx.org/licenses/MIT.html)
from __future__ import absolute_import, print_function
DOCUMENTATION = r'''
---
module: forgejo_auth
short_description: Module to manage forgejo oauth authentication sources
version_added: "1.0.0"
description: This module uses the forgejo api to manage the configured oauth
authentication sources. It can add, remove or update an authentication source.
This module only works for forgejo running inside containers, created through
this collections role. There are several assumptions regardings names and ids.
author:
- Nis Wechselberg (@eNBeWe)
'''
EXAMPLES = r'''
# Create a new auth provider, but don't touch existing source
- name: 'Add authentication source'
enbewe.forgejo.forgejo_oauth:
state: 'present'
update: true
name: 'eNBeWe.eu SSO'
provider: 'openidConnect'
key: 'clientIdConfiguredInAuthProvider'
secret: 'secretKeyOfTheClientIdInAuthProvider'
auto_discover_url: 'https://<autodiscover-url>'
skip_local_2fa: true
'''
RETURN = r'''
# Return values
'''
from ansible.module_utils.basic import AnsibleModule
class ForgejoOAuth:
'''
Handles the management of forgejo authentication sources.
'''
module = None
def __init__(self, module: AnsibleModule):
'''
Create a new OAuth Handler
'''
self.module = module
self.name = module.params.get('name')
def run(self):
'''
Apply the configuration.
'''
oauth_source_already_exists, oauth_source_id = self.get_oauth_source_with_name()
if self.module.params.get('state') == 'absent':
if oauth_source_already_exists:
return self.remove_auth_source(oauth_source_id)
return {
'failed': False,
'changed': False,
'msg': f'OAuth2 authorization source {self.name} is not configured.'
}
if self.module.params.get('state') == 'present':
if oauth_source_already_exists and self.module.params.get('update'):
return self.update_oauth_source(oauth_source_id)
if oauth_source_already_exists:
return {
'failed': False,
'changed': False,
'msg': f'OAuth2 authorization source {self.name} already exists.'
}
return self.add_oauth_source()
return {
'failed': True,
'changed': False,
'msg': 'Misconfigured plugin, unknown state.'
}
def get_oauth_source_with_name(self):
'''
Check the list of configured authentication sources for
an existing OAuth source of the given name.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'list',
'--vertical-bars'
]
_, auth_list_stdout, _ = self.module.run_command(args_list)
auth_table_rows = auth_list_stdout.split("\n")
if len(auth_table_rows) > 2:
for auth_id in range(1, len(auth_table_rows) - 1):
auth_row = auth_table_rows[auth_id].split('|')
auth_id = int(auth_row[0].strip())
auth_name = auth_row[1].strip()
auth_type = auth_row[2].strip()
if auth_type == 'OAuth2' and auth_name == self.name:
return (True, auth_id)
return (False, -1)
def add_oauth_source(self):
'''
Add a new OAuth2 source with the given configuration.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'add-oauth',
]
args_list += self.create_command_arguments()
auth_add_rc, auth_add_stdout, _ = self.module.run_command(args_list)
if auth_add_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} successful created.'
}
return {
'failed': True,
'msg': auth_add_stdout
}
def remove_auth_source(self, source_id):
'''
Removes the authentication source with the given id from the config.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'delete',
'--id', str(source_id),
]
auth_delete_rc, auth_delete_stdout, _ = self.module.run_command(args_list)
if auth_delete_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} removed.'
}
return {
'failed': True,
'msg': auth_delete_stdout
}
def update_oauth_source(self, source_id):
'''
Updates the authentication source with the given id to match the passed config.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'update-oauth',
'--id', str(source_id),
]
args_list += self.create_command_arguments()
auth_update_rc, auth_update_stdout, _ = self.module.run_command(args_list)
if auth_update_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} has been updated'
}
return {
'failed': True,
'msg': auth_update_stdout
}
def create_command_arguments(self):
'''
Create the command arguments for cli commands from the passed config.
'''
result = [
'--name', self.name,
'--provider', self.module.params.get('provider'),
'--key', self.module.params.get('key'),
'--secret', self.module.params.get('secret'),
'--auto-discover-url',
(self.module.params.get('auto_discover_url')
if self.module.params.get('auto_discover_url') else ""),
'--custom-tenant-id',
(self.module.params.get('custom_tenant_id')
if self.module.params.get('custom_tenant_id') else ""),
'--custom-auth-url',
(self.module.params.get('custom_auth_url')
if self.module.params.get('custom_auth_url') else ""),
'--custom-token-url',
(self.module.params.get('custom_token_url')
if self.module.params.get('custom_token_url') else ""),
'--custom-profile-url',
(self.module.params.get('custom_profile_url')
if self.module.params.get('custom_profile_url') else ""),
'--custom-email-url',
(self.module.params.get('custom_email_url')
if self.module.params.get('custom_email_url') else ""),
'--icon-url',
(self.module.params.get('icon_url')
if self.module.params.get('icon_url') else ""),
'--required-claim-name',
(self.module.params.get('required_claim_name')
if self.module.params.get('required_claim_name') else ""),
'--required-claim-value',
(self.module.params.get('required_claim_value')
if self.module.params.get('required_claim_value') else ""),
'--group-claim-name',
(self.module.params.get('group_claim_name')
if self.module.params.get('group_claim_name') else ""),
'--admin-group',
(self.module.params.get('admin_group')
if self.module.params.get('admin_group') else ""),
'--restricted-group',
(self.module.params.get('restricted_group')
if self.module.params.get('restricted_group') else ""),
'--group-team-map',
(self.module.params.get('group_team_map')
if self.module.params.get('group_team_map') else ""),
]
for scope in self.module.params.get('scopes').split():
result.append('--scopes')
result.append(scope)
if self.module.params.get('use_custom_urls'):
result.append('--use-custom-urls')
if self.module.params.get('skip_local_2fa'):
result.append('--skip-local-2fa')
if self.module.params.get('group_team_map_removal'):
result.append('--group-team-map-removal')
return result
def main():
'''
Run the module code.
'''
module_args = {
'state': {'type': str, 'default': 'present', 'choices': ['present', 'absent']},
'update': {'type': bool, 'default': False},
'name': {'type': str, 'required': True},
'provider': {'type': str, 'required': True},
'key': {'type': str, 'required': True},
'secret': {'type': str, 'required': True},
'auto_discover_url': {'type': str, 'required': False},
'use_custom_urls': {'type': bool, 'default': False},
'custom_tenant_id': {'type': str, 'required': False},
'custom_auth_url': {'type': str, 'required': False},
'custom_token_url': {'type': str, 'required': False},
'custom_profile_url': {'type': str, 'required': False},
'custom_email_url': {'type': str, 'required': False},
'icon_url': {'type': str, 'required': False},
'skip_local_2fa': {'type': bool, 'default': False},
'scopes': {'type': str, 'required': False},
'required_claim_name': {'type': str, 'required': False},
'required_claim_value': {'type': str, 'required': False},
'group_claim_name': {'type': str, 'required': False},
'admin_group': {'type': str, 'required': False},
'restricted_group': {'type': str, 'required': False},
'group_team_map': {'type': str, 'required': False},
'group_team_map_removal': {'type': bool, 'default': False},
}
module = AnsibleModule(
argument_spec = module_args,
supports_check_mode = False,
)
forgejo_auth_module = ForgejoOAuth(module)
result = forgejo_auth_module.run()
module.exit_json(**result)
if __name__ == '__main__':
main()