uffd-ldapd-container/uffd-ldapd
Nis Wechselberg 27d78f4719
Added initial container definition for uffd-ldapd
Signed-off-by: Nis Wechselberg <enbewe@enbewe.de>
2025-06-09 16:50:47 +02:00

339 lines
14 KiB
Python
Executable file

#!/usr/bin/python3
import os
import sys
import socketserver
import logging
import socket
import re
import signal
import click
import requests
from cachecontrol import CacheControl
from cachecontrol.heuristics import ExpiresAfter
import ldapserver
from ldapserver.exceptions import LDAPInvalidCredentials, LDAPInsufficientAccessRights, LDAPUnwillingToPerform
from ldapserver.schema import RFC2307BIS_SCHEMA, RFC2798_SCHEMA
logger = logging.getLogger(__name__)
CUSTOM_SCHEMA = (RFC2307BIS_SCHEMA|RFC2798_SCHEMA).extend(attribute_type_definitions=[
# pylint: disable=line-too-long
"( 1.2.840.113556.1.2.102 NAME 'memberOf' DESC 'Group that the entry belongs to' EQUALITY distinguishedNameMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.12 USAGE dSAOperation )"
])
class UffdAPI:
def __init__(self, baseurl, client_id, client_secret, cache_ttl=60):
self.baseurl = baseurl
self.client_id = client_id
self.client_secret = client_secret
self.session = requests.Session()
self.session.auth = (client_id, client_secret)
if cache_ttl:
self.session = CacheControl(self.session, heuristic=ExpiresAfter(seconds=cache_ttl))
def get(self, endpoint, **kwargs):
resp = self.session.get(self.baseurl + endpoint, params=kwargs)
resp.raise_for_status()
return resp.json()
def post(self, endpoint, **kwargs):
resp = self.session.post(self.baseurl + endpoint, data=kwargs)
resp.raise_for_status()
return resp.json()
# pylint: disable=invalid-name,redefined-builtin
def get_users(self, id=None, loginname=None, group=None):
return self.get('/api/v1/getusers', id=id, loginname=loginname, group=group)
def get_groups(self, id=None, name=None, member=None):
return self.get('/api/v1/getgroups', id=id, name=name, member=member)
def check_password(self, loginname, password):
return self.post('/api/v1/checkpassword', loginname=loginname, password=password)
def normalize_user_loginname(loginname):
# The equality matching rule for uid is caseIgnoreMatch. It prepares
# attribute and assertion value according to LDAP stringprep with
# case-folding.
#
# Uffd restricts loginnames to lower-case ASCII letters, digits,
# underscores and dashes. None of these characters are changed or
# rejetced by stringprep with case-folding. The effect stringprep has
# on loginnames is that it adds a leading and a final SPACE character.
#
# The assertion value (the argument to this function) could however contain
# characters that are mapped to SPACE or nothing for example. So we apply
# stringprep to the assertion value and then strip the added leading and
# final SPACE characters. Stringprep case-folds the input string to
# lower-case.
#
# The resulting string can be compared to an actual loginname with simple
# byte-for-byte or codepoint-for-codepoint comparison with or without
# case-folding. It matches if and only if the loginname matches the input
# value according to caseIgnoreMatch.
try:
return ldapserver.rfc4518_stringprep.prepare(loginname, ldapserver.rfc4518_stringprep.MatchingType.CASE_IGNORE_STRING).strip(' ')
except ValueError: # Input value contains prohibited characters
return None
def normalize_group_name(name):
# Currently uffd has no restrictions for group names, but it is planned
# to add restrictions similar to loginname restrictions.
# See https://git.cccv.de/uffd/uffd/-/issues/127
return normalize_user_loginname(name)
class UffdLDAPRequestHandler(ldapserver.LDAPRequestHandler):
subschema = ldapserver.SubschemaSubentry(CUSTOM_SCHEMA, 'cn=Subschema')
# Overwritten before use
api = None
dn_base = None
bind_password = None # if None anonymous reads are allowed
group_filter_regex = None
def do_bind_simple_authenticated(self, dn, password):
dn = self.subschema.DN.from_str(dn)
if dn == self.subschema.DN('cn=service,ou=system') + self.dn_base and password == self.bind_password:
return True
if not dn.is_direct_child_of(self.subschema.DN('ou=users') + self.dn_base) or len(dn[0]) != 1 or dn[0][0].attribute != 'uid':
raise LDAPInvalidCredentials()
try:
if self.api.check_password(loginname=dn[0][0].value, password=password):
return True
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 403: # We don't have "checkpassword" scope
raise LDAPInsufficientAccessRights() from exc
if exc.response.status_code == 429: # Ratelimited
raise LDAPUnwillingToPerform('Too Many Requests') from exc
raise exc
raise LDAPInvalidCredentials()
supports_sasl_plain = True
def do_bind_sasl_plain(self, identity, password, authzid=None):
if authzid is not None and identity != authzid:
raise LDAPInvalidCredentials()
try:
if self.api.check_password(loginname=identity, password=password):
return True
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 403: # We don't have "checkpassword" scope
raise LDAPInsufficientAccessRights() from exc
if exc.response.status_code == 429: # Ratelimited
raise LDAPUnwillingToPerform('Too Many Requests') from exc
raise exc
raise LDAPInvalidCredentials()
def do_search(self, baseobj, scope, filterobj):
yield from super().do_search(baseobj, scope, filterobj)
if self.bind_object or self.bind_password is None:
yield from self.do_search_static()
yield from self.do_search_users(baseobj, scope, filterobj)
yield from self.do_search_groups(baseobj, scope, filterobj)
def do_search_static(self):
base_attrs = {
'objectClass': ['top', 'dcObject', 'organization'],
'structuralObjectClass': ['organization'],
}
for rdnassertion in self.dn_base[0]: # pylint: disable=unsubscriptable-object
base_attrs[rdnassertion.attribute] = [rdnassertion.value]
yield self.subschema.ObjectEntry(self.dn_base, **base_attrs)
yield self.subschema.ObjectEntry(self.subschema.DN('ou=users') + self.dn_base,
ou=['users'],
objectClass=['top', 'organizationalUnit'],
structuralObjectClass=['organizationalUnit'],
)
yield self.subschema.ObjectEntry(self.subschema.DN('ou=groups') + self.dn_base,
ou=['groups'],
objectClass=['top', 'organizationalUnit'],
structuralObjectClass=['organizationalUnit'],
)
yield self.subschema.ObjectEntry(self.subschema.DN('ou=system') + self.dn_base,
ou=['system'],
objectClass=['top', 'organizationalUnit'],
structuralObjectClass=['organizationalUnit'],
)
yield self.subschema.ObjectEntry(self.subschema.DN('cn=service,ou=system') + self.dn_base,
cn=['service'],
objectClass=['top', 'organizationalRole', 'simpleSecurityObject'],
structuralObjectClass=['organizationalRole'],
)
def do_search_users(self, baseobj, scope, filterobj):
template = self.subschema.EntryTemplate(self.subschema.DN(self.dn_base, ou='users'), 'uid',
structuralObjectClass=['inetorgperson'],
objectClass=['top', 'inetorgperson', 'organizationalperson', 'person', 'posixaccount'],
cn=ldapserver.WILDCARD,
displayname=ldapserver.WILDCARD,
givenname=ldapserver.WILDCARD,
homeDirectory=ldapserver.WILDCARD,
mail=ldapserver.WILDCARD,
sn=[' '],
uid=ldapserver.WILDCARD,
uidNumber=ldapserver.WILDCARD,
memberOf=ldapserver.WILDCARD,
)
if not template.match_search(baseobj, scope, filterobj):
return
constraints = template.extract_search_constraints(baseobj, scope, filterobj)
request_params = {}
if 'uid' in constraints:
request_params = {'loginname': normalize_user_loginname(constraints['uid'][0])}
elif 'uidnumber' in constraints:
request_params = {'id': constraints['uidnumber'][0]}
elif 'memberof' in constraints:
for value in constraints['memberof']:
if value.is_direct_child_of(self.subschema.DN(self.dn_base, ou='groups')) and value.object_attribute == 'cn':
request_params = {'group': normalize_group_name(value.object_value)}
break
if 'group' in request_params and not self.group_filter_regex.match(request_params['group']):
return
for user in self.api.get_users(**request_params):
yield template.create_entry(user['loginname'],
cn=[user['displayname']],
displayname=[user['displayname']],
givenname=[user['displayname']],
homeDirectory=['/home/'+user['loginname']],
mail=[user['email']],
uid=[user['loginname']],
uidNumber=[user['id']],
memberOf=[self.subschema.DN(self.subschema.DN(self.dn_base, ou='groups'), cn=group)
for group in user['groups']
if self.group_filter_regex.match(group)],
)
def do_search_groups(self, baseobj, scope, filterobj):
template = self.subschema.EntryTemplate(self.subschema.DN(self.dn_base, ou='groups'), 'cn',
structuralObjectClass=['groupOfUniqueNames'],
objectClass=['top', 'groupOfUniqueNames', 'posixGroup'],
cn=ldapserver.WILDCARD,
description=[' '],
gidNumber=ldapserver.WILDCARD,
uniqueMember=ldapserver.WILDCARD,
)
if not template.match_search(baseobj, scope, filterobj):
return
constraints = template.extract_search_constraints(baseobj, scope, filterobj)
request_params = {}
if 'cn' in constraints:
request_params = {'name': normalize_group_name(constraints['cn'][0])}
elif 'gidnumber' in constraints:
request_params = {'id': constraints['gidnumber'][0]}
elif 'uniquemember' in constraints:
for value in constraints['uniquemember']:
if value.is_direct_child_of(self.subschema.DN(self.dn_base, ou='users')) and value.object_attribute == 'uid':
request_params = {'member': normalize_user_loginname(value.object_value)}
break
if 'name' in request_params and not self.group_filter_regex.match(request_params['name']):
return
for group in self.api.get_groups(**request_params):
if not self.group_filter_regex.match(group['name']):
continue
yield template.create_entry(group['name'],
cn=[group['name']],
gidNumber=[group['id']],
uniqueMember=[self.subschema.DN(self.subschema.DN(self.dn_base, ou='users'), uid=user) for user in group['members']],
)
def make_requesthandler(api, dn_base, bind_password=None, group_filter_regex=None):
class RequestHandler(UffdLDAPRequestHandler):
pass
dn_base = RequestHandler.subschema.DN.from_str(dn_base)
RequestHandler.api = api
RequestHandler.dn_base = dn_base
RequestHandler.bind_password = bind_password.encode() if bind_password else None
RequestHandler.group_filter_regex = re.compile(group_filter_regex) if group_filter_regex else re.compile('')
return RequestHandler
class FilenoUnixStreamServer(socketserver.UnixStreamServer):
def __init__(self, fd, RequestHandlerClass, bind_and_activate=True):
self.server_fd = fd
super().__init__(None, RequestHandlerClass, bind_and_activate=bind_and_activate)
def server_bind(self):
self.socket.close() # UnixStreamServer.__init__ creates an unbound socket
self.socket = socket.fromfd(self.server_fd, socket.AF_UNIX, socket.SOCK_STREAM)
self.server_address = self.socket.getsockname()
class ThreadingFilenoUnixStreamServer(socketserver.ThreadingMixIn, FilenoUnixStreamServer):
pass
def cleanup_unix_socket(path):
if not os.path.exists(path):
return
conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
conn.connect(path)
except ConnectionRefusedError:
os.remove(path)
conn.close()
def parse_network_address(addr):
port = '389'
if addr.startswith('['):
addr, remainder = addr[1:].split(']', 1)
if remainder.startswith(':'):
port = remainder[1:]
elif ':' in addr:
addr, port = addr.split(':')
return addr, port
class StdoutFilter(logging.Filter):
def filter(self, record):
return record.levelno <= logging.INFO
def sigterm_handler(signum, frame):
logger.info("Received SIGTERM, shutting down gracefully ...")
sys.exit(0)
# pylint: disable=line-too-long
@click.command(help='LDAP proxy for integrating LDAP service with uffd SSO. Supports user and group searches and as well as binds with user passwords.')
@click.option('--socket-address', help='Host and port "ip:port" to listen on')
@click.option('--socket-path', type=click.Path(), help='Path for UNIX domain socket')
@click.option('--socket-fd', type=int, help='Use fd number as server socket (alternative to --socket-path)')
@click.option('--api-url', required=True, help='Uffd base URL without API prefix or trailing slash (e.g. https://example.com)')
@click.option('--api-user', required=True, help='API user/client id')
@click.option('--api-secret', required=True, help='API secret, do not set this on the command-line, use environment variable SERVER_API_SECRET instead')
@click.option('--cache-ttl', default=60, help='Time-to-live for API response caching in seconds')
@click.option('--base-dn', required=True, help='Base DN for user, group and system objects. E.g. "dc=example,dc=com"')
@click.option('--bind-password', help='Authentication password for the service connection to LDAP. Bind DN is always "cn=service,ou=system,BASEDN". If set, anonymous access is disabled.')
@click.option('--group-filter-regex', help='Python regular expression that group names must match for the group to be visible to LDAP clients')
def main(socket_address, socket_path, socket_fd, api_url, api_user, api_secret, cache_ttl, base_dn, bind_password, group_filter_regex):
# Register signal handler for proper SIGTERM handling
signal.signal(signal.SIGTERM, sigterm_handler)
# pylint: disable=too-many-locals
if (socket_address is not None) \
+ (socket_path is not None) \
+ (socket_fd is not None) != 1:
raise click.ClickException('Either --socket-address, --socket-path or --socket-fd must be specified')
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
stdout_handler.addFilter(StdoutFilter())
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.WARNING)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(stdout_handler)
root_logger.addHandler(stderr_handler)
api = UffdAPI(api_url, api_user, api_secret, cache_ttl)
RequestHandler = make_requesthandler(api, base_dn, bind_password, group_filter_regex)
if socket_address is not None:
host, port = parse_network_address(socket_address)
server = socketserver.ThreadingTCPServer((host, int(port)), RequestHandler)
elif socket_path is not None:
cleanup_unix_socket(socket_path)
server = socketserver.ThreadingUnixStreamServer(socket_path, RequestHandler)
else:
server = ThreadingFilenoUnixStreamServer(socket_fd, RequestHandler)
server.serve_forever()
if __name__ == '__main__':
# Pylint does not seem to understand click's decorators
# pylint: disable=unexpected-keyword-arg,no-value-for-parameter
main(auto_envvar_prefix='SERVER')