300 lines
10 KiB
Python
300 lines
10 KiB
Python
|
#!/usr/bin/python
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
# Copyright: Nis Wechselberg <enbewe@enbewe.de>
|
||
|
# MIT License (see https://spdx.org/licenses/MIT.html)
|
||
|
from __future__ import absolute_import, print_function
|
||
|
|
||
|
DOCUMENTATION = r'''
|
||
|
---
|
||
|
module: forgejo_auth
|
||
|
|
||
|
short_description: Module to manage forgejo oauth authentication sources
|
||
|
|
||
|
version_added: "1.0.0"
|
||
|
|
||
|
description: This module uses the forgejo api to manage the configured oauth
|
||
|
authentication sources. It can add, remove or update an authentication source.
|
||
|
This module only works for forgejo running inside containers, created through
|
||
|
this collections role. There are several assumptions regardings names and ids.
|
||
|
|
||
|
author:
|
||
|
- Nis Wechselberg (@eNBeWe)
|
||
|
'''
|
||
|
|
||
|
EXAMPLES = r'''
|
||
|
# Create a new auth provider, but don't touch existing source
|
||
|
- name: 'Add authentication source'
|
||
|
enbewe.forgejo.forgejo_oauth:
|
||
|
state: 'present'
|
||
|
update: true
|
||
|
name: 'eNBeWe.eu SSO'
|
||
|
provider: 'openidConnect'
|
||
|
key: 'clientIdConfiguredInAuthProvider'
|
||
|
secret: 'secretKeyOfTheClientIdInAuthProvider'
|
||
|
auto_discover_url: 'https://<autodiscover-url>'
|
||
|
skip_local_2fa: true
|
||
|
'''
|
||
|
|
||
|
RETURN = r'''
|
||
|
# Return values
|
||
|
'''
|
||
|
|
||
|
from ansible.module_utils.basic import AnsibleModule
|
||
|
|
||
|
class ForgejoOAuth:
|
||
|
'''
|
||
|
Handles the management of forgejo authentication sources.
|
||
|
'''
|
||
|
module = None
|
||
|
|
||
|
def __init__(self, module: AnsibleModule):
|
||
|
'''
|
||
|
Create a new OAuth Handler
|
||
|
'''
|
||
|
self.module = module
|
||
|
|
||
|
self.name = module.params.get('name')
|
||
|
|
||
|
def run(self):
|
||
|
'''
|
||
|
Apply the configuration.
|
||
|
'''
|
||
|
oauth_source_already_exists, oauth_source_id = self.get_oauth_source_with_name()
|
||
|
|
||
|
if self.module.params.get('state') == 'absent':
|
||
|
if oauth_source_already_exists:
|
||
|
return self.remove_auth_source(oauth_source_id)
|
||
|
|
||
|
return {
|
||
|
'failed': False,
|
||
|
'changed': False,
|
||
|
'msg': f'OAuth2 authorization source {self.name} is not configured.'
|
||
|
}
|
||
|
|
||
|
if self.module.params.get('state') == 'present':
|
||
|
if oauth_source_already_exists and self.module.params.get('update'):
|
||
|
return self.update_oauth_source(oauth_source_id)
|
||
|
|
||
|
if oauth_source_already_exists:
|
||
|
return {
|
||
|
'failed': False,
|
||
|
'changed': False,
|
||
|
'msg': f'OAuth2 authorization source {self.name} already exists.'
|
||
|
}
|
||
|
|
||
|
return self.add_oauth_source()
|
||
|
|
||
|
return {
|
||
|
'failed': True,
|
||
|
'changed': False,
|
||
|
'msg': 'Misconfigured plugin, unknown state.'
|
||
|
}
|
||
|
|
||
|
def get_oauth_source_with_name(self):
|
||
|
'''
|
||
|
Check the list of configured authentication sources for
|
||
|
an existing OAuth source of the given name.
|
||
|
'''
|
||
|
args_list = [
|
||
|
'podman', 'exec',
|
||
|
'-u', '1000:1000',
|
||
|
'-it', 'forgejo-app',
|
||
|
'forgejo', 'admin', 'auth', 'list',
|
||
|
'--vertical-bars'
|
||
|
]
|
||
|
_, auth_list_stdout, _ = self.module.run_command(args_list)
|
||
|
auth_table_rows = auth_list_stdout.split("\n")
|
||
|
|
||
|
if len(auth_table_rows) > 2:
|
||
|
for auth_id in range(1, len(auth_table_rows) - 1):
|
||
|
auth_row = auth_table_rows[auth_id].split('|')
|
||
|
auth_id = int(auth_row[0].strip())
|
||
|
auth_name = auth_row[1].strip()
|
||
|
auth_type = auth_row[2].strip()
|
||
|
if auth_type == 'OAuth2' and auth_name == self.name:
|
||
|
return (True, auth_id)
|
||
|
return (False, -1)
|
||
|
|
||
|
def add_oauth_source(self):
|
||
|
'''
|
||
|
Add a new OAuth2 source with the given configuration.
|
||
|
'''
|
||
|
args_list = [
|
||
|
'podman', 'exec',
|
||
|
'-u', '1000:1000',
|
||
|
'-it', 'forgejo-app',
|
||
|
'forgejo', 'admin', 'auth', 'add-oauth',
|
||
|
]
|
||
|
args_list += self.create_command_arguments()
|
||
|
|
||
|
auth_add_rc, auth_add_stdout, _ = self.module.run_command(args_list)
|
||
|
|
||
|
if auth_add_rc == 0:
|
||
|
return {
|
||
|
'failed': False,
|
||
|
'changed': True,
|
||
|
'msg': f'OAuth2 authorization source {self.name} successful created.'
|
||
|
}
|
||
|
|
||
|
return {
|
||
|
'failed': True,
|
||
|
'msg': auth_add_stdout
|
||
|
}
|
||
|
|
||
|
def remove_auth_source(self, source_id):
|
||
|
'''
|
||
|
Removes the authentication source with the given id from the config.
|
||
|
'''
|
||
|
args_list = [
|
||
|
'podman', 'exec',
|
||
|
'-u', '1000:1000',
|
||
|
'-it', 'forgejo-app',
|
||
|
'forgejo', 'admin', 'auth', 'delete',
|
||
|
'--id', str(source_id),
|
||
|
]
|
||
|
auth_delete_rc, auth_delete_stdout, _ = self.module.run_command(args_list)
|
||
|
|
||
|
if auth_delete_rc == 0:
|
||
|
return {
|
||
|
'failed': False,
|
||
|
'changed': True,
|
||
|
'msg': f'OAuth2 authorization source {self.name} removed.'
|
||
|
}
|
||
|
|
||
|
return {
|
||
|
'failed': True,
|
||
|
'msg': auth_delete_stdout
|
||
|
}
|
||
|
|
||
|
def update_oauth_source(self, source_id):
|
||
|
'''
|
||
|
Updates the authentication source with the given id to match the passed config.
|
||
|
'''
|
||
|
args_list = [
|
||
|
'podman', 'exec',
|
||
|
'-u', '1000:1000',
|
||
|
'-it', 'forgejo-app',
|
||
|
'forgejo', 'admin', 'auth', 'update-oauth',
|
||
|
'--id', str(source_id),
|
||
|
]
|
||
|
args_list += self.create_command_arguments()
|
||
|
|
||
|
auth_update_rc, auth_update_stdout, _ = self.module.run_command(args_list)
|
||
|
|
||
|
if auth_update_rc == 0:
|
||
|
return {
|
||
|
'failed': False,
|
||
|
'changed': True,
|
||
|
'msg': f'OAuth2 authorization source {self.name} has been updated'
|
||
|
}
|
||
|
|
||
|
return {
|
||
|
'failed': True,
|
||
|
'msg': auth_update_stdout
|
||
|
}
|
||
|
|
||
|
def create_command_arguments(self):
|
||
|
'''
|
||
|
Create the command arguments for cli commands from the passed config.
|
||
|
'''
|
||
|
result = [
|
||
|
'--name', self.name,
|
||
|
'--provider', self.module.params.get('provider'),
|
||
|
'--key', self.module.params.get('key'),
|
||
|
'--secret', self.module.params.get('secret'),
|
||
|
'--auto-discover-url',
|
||
|
(self.module.params.get('auto_discover_url')
|
||
|
if self.module.params.get('auto_discover_url') else ""),
|
||
|
'--custom-tenant-id',
|
||
|
(self.module.params.get('custom_tenant_id')
|
||
|
if self.module.params.get('custom_tenant_id') else ""),
|
||
|
'--custom-auth-url',
|
||
|
(self.module.params.get('custom_auth_url')
|
||
|
if self.module.params.get('custom_auth_url') else ""),
|
||
|
'--custom-token-url',
|
||
|
(self.module.params.get('custom_token_url')
|
||
|
if self.module.params.get('custom_token_url') else ""),
|
||
|
'--custom-profile-url',
|
||
|
(self.module.params.get('custom_profile_url')
|
||
|
if self.module.params.get('custom_profile_url') else ""),
|
||
|
'--custom-email-url',
|
||
|
(self.module.params.get('custom_email_url')
|
||
|
if self.module.params.get('custom_email_url') else ""),
|
||
|
'--icon-url',
|
||
|
(self.module.params.get('icon_url')
|
||
|
if self.module.params.get('icon_url') else ""),
|
||
|
'--required-claim-name',
|
||
|
(self.module.params.get('required_claim_name')
|
||
|
if self.module.params.get('required_claim_name') else ""),
|
||
|
'--required-claim-value',
|
||
|
(self.module.params.get('required_claim_value')
|
||
|
if self.module.params.get('required_claim_value') else ""),
|
||
|
'--group-claim-name',
|
||
|
(self.module.params.get('group_claim_name')
|
||
|
if self.module.params.get('group_claim_name') else ""),
|
||
|
'--admin-group',
|
||
|
(self.module.params.get('admin_group')
|
||
|
if self.module.params.get('admin_group') else ""),
|
||
|
'--restricted-group',
|
||
|
(self.module.params.get('restricted_group')
|
||
|
if self.module.params.get('restricted_group') else ""),
|
||
|
'--group-team-map',
|
||
|
(self.module.params.get('group_team_map')
|
||
|
if self.module.params.get('group_team_map') else ""),
|
||
|
]
|
||
|
for scope in self.module.params.get('scopes').split():
|
||
|
result.append('--scopes')
|
||
|
result.append(scope)
|
||
|
if self.module.params.get('use_custom_urls'):
|
||
|
result.append('--use-custom-urls')
|
||
|
if self.module.params.get('skip_local_2fa'):
|
||
|
result.append('--skip-local-2fa')
|
||
|
if self.module.params.get('group_team_map_removal'):
|
||
|
result.append('--group-team-map-removal')
|
||
|
|
||
|
return result
|
||
|
|
||
|
def main():
|
||
|
'''
|
||
|
Run the module code.
|
||
|
'''
|
||
|
module_args = {
|
||
|
'state': {'type': str, 'default': 'present', 'choices': ['present', 'absent']},
|
||
|
'update': {'type': bool, 'default': False},
|
||
|
'name': {'type': str, 'required': True},
|
||
|
'provider': {'type': str, 'required': True},
|
||
|
'key': {'type': str, 'required': True},
|
||
|
'secret': {'type': str, 'required': True},
|
||
|
'auto_discover_url': {'type': str, 'required': False},
|
||
|
'use_custom_urls': {'type': bool, 'default': False},
|
||
|
'custom_tenant_id': {'type': str, 'required': False},
|
||
|
'custom_auth_url': {'type': str, 'required': False},
|
||
|
'custom_token_url': {'type': str, 'required': False},
|
||
|
'custom_profile_url': {'type': str, 'required': False},
|
||
|
'custom_email_url': {'type': str, 'required': False},
|
||
|
'icon_url': {'type': str, 'required': False},
|
||
|
'skip_local_2fa': {'type': bool, 'default': False},
|
||
|
'scopes': {'type': str, 'required': False},
|
||
|
'required_claim_name': {'type': str, 'required': False},
|
||
|
'required_claim_value': {'type': str, 'required': False},
|
||
|
'group_claim_name': {'type': str, 'required': False},
|
||
|
'admin_group': {'type': str, 'required': False},
|
||
|
'restricted_group': {'type': str, 'required': False},
|
||
|
'group_team_map': {'type': str, 'required': False},
|
||
|
'group_team_map_removal': {'type': bool, 'default': False},
|
||
|
}
|
||
|
|
||
|
module = AnsibleModule(
|
||
|
argument_spec = module_args,
|
||
|
supports_check_mode = False,
|
||
|
)
|
||
|
|
||
|
forgejo_auth_module = ForgejoOAuth(module)
|
||
|
result = forgejo_auth_module.run()
|
||
|
|
||
|
module.exit_json(**result)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|