Initial version of the forgejo collection

* Forgejo role for quick deployment in podman
* Forgejo_OAuth module to manage authentication source

Signed-off-by: Nis Wechselberg <enbewe@enbewe.de>
This commit is contained in:
Nis Wechselberg 2024-06-23 15:23:51 +02:00
parent 70646c7ba8
commit 47467b3169
Signed by: eNBeWe
GPG key ID: 7B25171F921B9E57
12 changed files with 923 additions and 0 deletions

View file

@ -0,0 +1,299 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: Nis Wechselberg <enbewe@enbewe.de>
# MIT License (see https://spdx.org/licenses/MIT.html)
from __future__ import absolute_import, print_function
DOCUMENTATION = r'''
---
module: forgejo_auth
short_description: Module to manage forgejo oauth authentication sources
version_added: "1.0.0"
description: This module uses the forgejo api to manage the configured oauth
authentication sources. It can add, remove or update an authentication source.
This module only works for forgejo running inside containers, created through
this collections role. There are several assumptions regardings names and ids.
author:
- Nis Wechselberg (@eNBeWe)
'''
EXAMPLES = r'''
# Create a new auth provider, but don't touch existing source
- name: 'Add authentication source'
enbewe.forgejo.forgejo_oauth:
state: 'present'
update: true
name: 'eNBeWe.eu SSO'
provider: 'openidConnect'
key: 'clientIdConfiguredInAuthProvider'
secret: 'secretKeyOfTheClientIdInAuthProvider'
auto_discover_url: 'https://<autodiscover-url>'
skip_local_2fa: true
'''
RETURN = r'''
# Return values
'''
from ansible.module_utils.basic import AnsibleModule
class ForgejoOAuth:
'''
Handles the management of forgejo authentication sources.
'''
module = None
def __init__(self, module: AnsibleModule):
'''
Create a new OAuth Handler
'''
self.module = module
self.name = module.params.get('name')
def run(self):
'''
Apply the configuration.
'''
oauth_source_already_exists, oauth_source_id = self.get_oauth_source_with_name()
if self.module.params.get('state') == 'absent':
if oauth_source_already_exists:
return self.remove_auth_source(oauth_source_id)
return {
'failed': False,
'changed': False,
'msg': f'OAuth2 authorization source {self.name} is not configured.'
}
if self.module.params.get('state') == 'present':
if oauth_source_already_exists and self.module.params.get('update'):
return self.update_oauth_source(oauth_source_id)
if oauth_source_already_exists:
return {
'failed': False,
'changed': False,
'msg': f'OAuth2 authorization source {self.name} already exists.'
}
return self.add_oauth_source()
return {
'failed': True,
'changed': False,
'msg': 'Misconfigured plugin, unknown state.'
}
def get_oauth_source_with_name(self):
'''
Check the list of configured authentication sources for
an existing OAuth source of the given name.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'list',
'--vertical-bars'
]
_, auth_list_stdout, _ = self.module.run_command(args_list)
auth_table_rows = auth_list_stdout.split("\n")
if len(auth_table_rows) > 2:
for auth_id in range(1, len(auth_table_rows) - 1):
auth_row = auth_table_rows[auth_id].split('|')
auth_id = int(auth_row[0].strip())
auth_name = auth_row[1].strip()
auth_type = auth_row[2].strip()
if auth_type == 'OAuth2' and auth_name == self.name:
return (True, auth_id)
return (False, -1)
def add_oauth_source(self):
'''
Add a new OAuth2 source with the given configuration.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'add-oauth',
]
args_list += self.create_command_arguments()
auth_add_rc, auth_add_stdout, _ = self.module.run_command(args_list)
if auth_add_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} successful created.'
}
return {
'failed': True,
'msg': auth_add_stdout
}
def remove_auth_source(self, source_id):
'''
Removes the authentication source with the given id from the config.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'delete',
'--id', str(source_id),
]
auth_delete_rc, auth_delete_stdout, _ = self.module.run_command(args_list)
if auth_delete_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} removed.'
}
return {
'failed': True,
'msg': auth_delete_stdout
}
def update_oauth_source(self, source_id):
'''
Updates the authentication source with the given id to match the passed config.
'''
args_list = [
'podman', 'exec',
'-u', '1000:1000',
'-it', 'forgejo-app',
'forgejo', 'admin', 'auth', 'update-oauth',
'--id', str(source_id),
]
args_list += self.create_command_arguments()
auth_update_rc, auth_update_stdout, _ = self.module.run_command(args_list)
if auth_update_rc == 0:
return {
'failed': False,
'changed': True,
'msg': f'OAuth2 authorization source {self.name} has been updated'
}
return {
'failed': True,
'msg': auth_update_stdout
}
def create_command_arguments(self):
'''
Create the command arguments for cli commands from the passed config.
'''
result = [
'--name', self.name,
'--provider', self.module.params.get('provider'),
'--key', self.module.params.get('key'),
'--secret', self.module.params.get('secret'),
'--auto-discover-url',
(self.module.params.get('auto_discover_url')
if self.module.params.get('auto_discover_url') else ""),
'--custom-tenant-id',
(self.module.params.get('custom_tenant_id')
if self.module.params.get('custom_tenant_id') else ""),
'--custom-auth-url',
(self.module.params.get('custom_auth_url')
if self.module.params.get('custom_auth_url') else ""),
'--custom-token-url',
(self.module.params.get('custom_token_url')
if self.module.params.get('custom_token_url') else ""),
'--custom-profile-url',
(self.module.params.get('custom_profile_url')
if self.module.params.get('custom_profile_url') else ""),
'--custom-email-url',
(self.module.params.get('custom_email_url')
if self.module.params.get('custom_email_url') else ""),
'--icon-url',
(self.module.params.get('icon_url')
if self.module.params.get('icon_url') else ""),
'--required-claim-name',
(self.module.params.get('required_claim_name')
if self.module.params.get('required_claim_name') else ""),
'--required-claim-value',
(self.module.params.get('required_claim_value')
if self.module.params.get('required_claim_value') else ""),
'--group-claim-name',
(self.module.params.get('group_claim_name')
if self.module.params.get('group_claim_name') else ""),
'--admin-group',
(self.module.params.get('admin_group')
if self.module.params.get('admin_group') else ""),
'--restricted-group',
(self.module.params.get('restricted_group')
if self.module.params.get('restricted_group') else ""),
'--group-team-map',
(self.module.params.get('group_team_map')
if self.module.params.get('group_team_map') else ""),
]
for scope in self.module.params.get('scopes').split():
result.append('--scopes')
result.append(scope)
if self.module.params.get('use_custom_urls'):
result.append('--use-custom-urls')
if self.module.params.get('skip_local_2fa'):
result.append('--skip-local-2fa')
if self.module.params.get('group_team_map_removal'):
result.append('--group-team-map-removal')
return result
def main():
'''
Run the module code.
'''
module_args = {
'state': {'type': str, 'default': 'present', 'choices': ['present', 'absent']},
'update': {'type': bool, 'default': False},
'name': {'type': str, 'required': True},
'provider': {'type': str, 'required': True},
'key': {'type': str, 'required': True},
'secret': {'type': str, 'required': True},
'auto_discover_url': {'type': str, 'required': False},
'use_custom_urls': {'type': bool, 'default': False},
'custom_tenant_id': {'type': str, 'required': False},
'custom_auth_url': {'type': str, 'required': False},
'custom_token_url': {'type': str, 'required': False},
'custom_profile_url': {'type': str, 'required': False},
'custom_email_url': {'type': str, 'required': False},
'icon_url': {'type': str, 'required': False},
'skip_local_2fa': {'type': bool, 'default': False},
'scopes': {'type': str, 'required': False},
'required_claim_name': {'type': str, 'required': False},
'required_claim_value': {'type': str, 'required': False},
'group_claim_name': {'type': str, 'required': False},
'admin_group': {'type': str, 'required': False},
'restricted_group': {'type': str, 'required': False},
'group_team_map': {'type': str, 'required': False},
'group_team_map_removal': {'type': bool, 'default': False},
}
module = AnsibleModule(
argument_spec = module_args,
supports_check_mode = False,
)
forgejo_auth_module = ForgejoOAuth(module)
result = forgejo_auth_module.run()
module.exit_json(**result)
if __name__ == '__main__':
main()