ansible-collection-duplicity/plugins/action/gpg_key.py
2024-02-11 16:29:29 +01:00

1556 lines
53 KiB
Python

#!/usr/bin/python
# Copyright: (c) 2019, Rinck H. Sonnenberg - Netson <r.sonnenberg@netson.nl>
# License: MIT
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'
}
DOCUMENTATION = '''
---
module: gpg_key
short_description: Module to install and trust GPG keys
version_added: "2.7"
description: |
Module to install and trust GPG keys from files and keyservers.
I shouldn't have to tell you that it is a BAD idea to store your
secret keys inside a playbook or role! Please take approriate measures
to protect your sensitive information from falling into the wrong hands!
options:
fpr:
description: |
Key Fingerprint to install from keyserver, to delete from target
machine, or to get info on. To get info on all installed keys,
use * as the value for fpr. Using any shorter ID than the full
fingerprint will fail. Using the short ID's isn't recommended
anyways, due to possible collisions.
required: false
type: str
keyserver:
description: Keyserver to download key from
default: keyserver.ubuntu.com
type: str
file:
description: |
File on target machine containing the key(s) to install;
be aware that a file can contain more than 1 key; if this
is the case, all keys will be imported and all keys will
receive the same trust level. The module auto-detects if
the given key is a public or secret key.
required: false
type: path
content:
description: |
Contents of keyfile to install on target machine
just like the file, the contents can contain more than 1 key
and all keys will receive the same trust level. The module
auto-detects if the given key is a public or secret key.
The content parameter simply creates a temporary file on the
target host and then performs the same actions as the file
parameter. It is just an easy method to not have to create
a keyfile on the target machine first.
required: false
type: str
manage_trust:
description: |
Setting controls wether or not the module controls the trust levels
of the (imported) keys. If set to false, no changes will be made to
the trust level regardless of the 'trust' setting.
default: true
type: bool
trust:
description: |
Trust level to apply to newly imported keys or existing keys;
please keep in mind that keys with a trust level other than 5
need to be signed by a fully trusted key in order to effectively
set the trust level. If your key is not signed by a fully trusted
key and the trust level is 2, 3 or 4, the module will report a
changed state on each run due to the fact that GnuPG will report
an 'Unknown' trust level.
choices:
- 1
- 2
- 3
- 4
- 5
default: 1
type: str
state:
description: |
Key should be present, absent, latest (keyserver only) or info.
Info only shows info for key given via fpr. Alternatively, you
can use the special value * for the fpr to get a list of all
installed keys and their relevant info.
default: present
type: str
choices:
- present
- absent
- latest
- info
gpgbin:
description: Full path to GnuPG binary on target host
default: uses get_bin_path method to find gpg
type: path
homedir:
description: |
Full path to the gpg homedir you wish to use; If none is provided,
gpg will use the default homedir of ~/.gnupg
Please be aware that this will be the user executing the module
on the target host! So there will likely be a difference between
running the module with and without become:yes! If you don't want to
be surprised, set the path to the homedir with the variable. For more
information on the GnuPG homedir, check
https://www.gnupg.org/gph/en/manual/r1616.html
default: None
type: path
author:
- Rinck H. Sonnenberg (r.sonnenberg@netson.nl)
'''
EXAMPLES = '''
# install key from keyfile on target host and set trust level to 5
- name: add key(s) from file and set trust
gpg_key:
file: "/tmp/testkey.asc"
trust: '5'
# make sure all keys in a file are NOT present on the keychain
- name: remove keys inside file from the keychain
gpg_key:
file: "/tmp/testkey.asc"
state: absent
# install keys on the target host from a keyfile on the ansible master
- name: install keys on the target host from a keyfile on the ansible master
gpg_key:
content: "{{ lookup('file', '/my/tmp/file/on/host') }}"
# alternatively, you can simply provide the key contents directly
- name: install keys from key contents
content: "-----BEGIN PGP PUBLIC KEY BLOCK-----........."
# install key from keyserver on target machine
- name: install key from default keyserver on target machine
gpg_key:
fpr: 0D69E11F12BDBA077B3726AB4E1F799AA4FF2279
# install key from keyserver on target machine and set trust level
- name: install key from alternate keyserver on target machine and set trust level 5
gpg_key:
fpr: 0D69E11F12BDBA077B3726AB4E1F799AA4FF2279
keyserver: eu.pool.sks-keyservers.net
trust: '5'
# delete a key from the target machine
- name: remove a key from the target machine
gpg_key:
fpr: 0D69E11F12BDBA077B3726AB4E1F799AA4FF2279
state: absent
# get keyinfo for a specific key; will also return success if key not installed
- name: get keyinfo
gpg_key:
fpr: 0D69E11F12BDBA077B3726AB4E1F799AA4FF2279
state: info
# get keyinfo for all installed keys, public and secret
- name: get keyinfo for all keys
gpg_key:
fpr: '*'
state: info
'''
RETURN = '''
keys:
description: |
list of keys touched by the module;
list contains dicts of fingerprint, keytype, capabilities and trust level for each key
an exmaple output would looke like:
A0880EC90DD07F5968CEE3B6C6B3D8E7A7CD2528:
changed: false
creationdate: '1576698396'
curve_name: ed25519
expirationdate: ''
fingerprint: A0880EC90DD07F5968CEE3B6C6B3D8E7A7CD2528
hash_algorithm: ''
key_capabilities: cSC
key_length: '256'
keyid: C6B3D8E7A7CD2528
pubkey_algorithm: Ed25519
state: present
trust_level: u
trust_level_desc: The key is ultimately trusted
type: pub
userid: 'somekey <test@example.org>'
If you set the state to absent, and the key was already absent, obviously
not all info will be available; it would look similar to:
A0880EC90DD07F5968CEE3B6C6B3D8E7A7CD2528:
changed: false
fingerprint: A0880EC90DD07F5968CEE3B6C6B3D8E7A7CD2528
state: absent
type: list
returned: always
debug:
description: contains debug information
type: list
returned: when verbosity >= 2
'''
import re
import os
import time
from ansible.module_utils.basic import AnsibleModule
from packaging import version
# class to import GPG keys
class GpgKey(object):
def __init__(self, module):
"""
init method
"""
# set ansible module
self.module = module
self.debugmsg = []
self.installed_keys = {}
self.changed = False
# seed the result dict in the object
# we primarily care about changed and state
# change is if this module effectively modified the target
# state will include any data that you want your module to pass back
# for consumption, for example, in a subsequent task
self.result = dict(
changed=False,
keys={},
msg="",
)
# set gpg binary none was provided
if not self.module.params["gpgbin"] or self.module.params["gpgbin"] is None:
self.module.params["gpgbin"] = self.module.get_bin_path('gpg')
def _vv(self, msg):
"""
debug info
"""
# add debug message
self.debugmsg.append("{}".format(msg))
def has_method(self, name):
"""
method to check if other methods exist
"""
return callable(getattr(self, name, None))
def run(self):
"""
run module with given parameters
"""
# check versions of gnupg and libgcrypt
# check homedir
self.check_versions()
self.check_homedir()
# determine and run action
if self.module.params["file"]:
run_action = "file"
elif self.module.params["fpr"]:
run_action = "fpr"
elif self.module.params["content"]:
run_action = "content"
else:
self.module.fail_json(msg="You shouldn't be here; no valid action could be determined")
# determine action and method
run_state = self.module.params["state"]
run_method = "run_{}_{}".format(run_action, run_state)
self._vv("determined action [{}] with state [{}]".format(run_action, run_state))
# always check installed keys first
self.check_installed_keys()
#self.result["installed_keys"] = self.installed_keys
# check if run method exists, and if not fail with an error
if self.has_method(run_method):
getattr(self, run_method)()
else:
self.module.fail_json(msg="Action [{}] is not supported with state [{}]".format(run_action, run_state))
# check verbosity and add debug messages
if self.module._verbosity >= 2:
self.result['debug'] = "\n".join(self.debugmsg)
# return result
return self.result
def run_file_present(self):
"""
import key from file
"""
# first, check if the file is OK
keyinfo = self.check_file()
self._vv("import new keys from file")
# import count
impcnt = 0
trucnt = 0
# then see if the key is already installed
# fk = file key
# ik = installed key
for index, fk in enumerate(keyinfo["keys"]):
# check expiration by checking trust
if fk["trust_level"] in ['i','d','r','e']:
self.module.fail_json(msg="key is either expired or invalid [trust={}] [expiration={}]".format(fk["trust_level"], fk["expirationdate"]))
# check if key is installed
installed = False
for ik in self.installed_keys["keys"]:
if (fk["fingerprint"] == ik["fingerprint"] and
fk["type"] == ik["type"] and
fk["key_capabilities"] == ik["key_capabilities"]
):
self._vv("fingerprint [{}] already installed".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "present"
keyinfo["keys"][index]["changed"] = False
installed = True
# check trust
if not self.compare_trust(fk["trust_level"], self.module.params["trust"]):
# update trust level
self.set_trust(fk["fingerprint"], self.module.params["trust"])
trucnt += 1
# get trust level as displayed by gpg
tru_level, tru_desc = self.get_trust(self.module.params["trust"])
keyinfo["keys"][index]["changed"] = True
keyinfo["keys"][index]["trust_level"] = tru_level
keyinfo["keys"][index]["trust_level_desc"] = tru_desc
continue
if not installed:
self._vv("fingerprint [{}] not yet installed".format(fk["fingerprint"]))
# import file
cmd = self.prepare_command("file", "present")
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self._vv("fingerprint [{}] successfully imported".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "present"
keyinfo["keys"][index]["changed"] = True
impcnt += 1
# check trust
if not self.compare_trust(fk["trust_level"], self.module.params["trust"]):
# update trust level
self.set_trust(fk["fingerprint"], self.module.params["trust"])
trucnt += 1
# get trust level as displayed by gpg
tru_level, tru_desc = self.get_trust(self.module.params["trust"])
keyinfo["keys"][index]["changed"] = True
keyinfo["keys"][index]["trust_level"] = tru_level
keyinfo["keys"][index]["trust_level_desc"] = tru_desc
# set keyinfo
self.set_keyinfo(keyinfo)
# check import count
if impcnt > 0 or trucnt > 0:
self.result["changed"] = True
# set message and return
self.result["msg"] = "[{}] keys were imported; [{}] trust levels updated".format(impcnt, trucnt)
return True
def run_file_absent(self):
"""
remove key(s) present in file
"""
# first, check if the file is OK
keyinfo = self.check_file()
self._vv("remove keys identified in file")
# key count
keycnt = 0
# then see if the key is installed or not
# fk = file key
# ik = installed key
for index, fk in enumerate(keyinfo["keys"]):
installed = False
for ik in self.installed_keys["keys"]:
if (fk["fingerprint"] == ik["fingerprint"] and
fk["type"] == ik["type"] and
fk["key_capabilities"] == ik["key_capabilities"]
):
installed = True
continue
if not installed:
self._vv("fingerprint [{}] not installed; nothing to remove".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "absent"
keyinfo["keys"][index]["changed"] = False
else:
self._vv("fingerprint [{}] installed; will be removed".format(fk["fingerprint"]))
# remove file
cmd = self.prepare_command("file", "absent")
# add fingerprint as argument
cmd += [fk["fingerprint"]]
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self._vv("fingerprint [{}] successfully removed".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "absent"
keyinfo["keys"][index]["changed"] = True
keycnt += 1
# re-run check installed command to prevent attempting to remove same
# fingerprint again (for example after removing pub/sec counterpart
# with the same fpr
self.check_installed_keys()
# set keyinfo
self.set_keyinfo(keyinfo)
# check import count
if keycnt > 0:
self.result["changed"] = True
# return
self.result["msg"] = "[{}] keys were removed".format(keycnt)
return True
def run_file_info(self):
"""
method to only retrive current status of keys
wether from file, content or fpr
"""
# first, check if the file is OK
keyinfo = self.check_file()
self._vv("showing key info from file")
# then see if the key is already installed
# fk = file key
# ik = installed key
for index, fk in enumerate(keyinfo["keys"]):
# check if key is installed
installed = False
for ik in self.installed_keys["keys"]:
if (fk["fingerprint"] == ik["fingerprint"] and
fk["type"] == ik["type"] and
fk["key_capabilities"] == ik["key_capabilities"]
):
self._vv("fingerprint [{}] installed".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "present"
keyinfo["keys"][index]["changed"] = False
installed = True
continue
if not installed:
# set state
self._vv("fingerprint [{}] not installed".format(fk["fingerprint"]))
keyinfo["keys"][index]["state"] = "absent"
keyinfo["keys"][index]["changed"] = False
# set keyinfo
self.set_keyinfo(keyinfo)
# set message and return
return True
def run_content_present(self):
"""
import keys from content
"""
# prepare content
filename = self.prepare_content(self.module.params["content"])
# set file parameter and run file present
self.module.params["file"] = filename
self.run_file_present()
# delete content
self.delete_content(filename)
def run_content_absent(self):
"""
remove keys from content
"""
# prepare content
filename = self.prepare_content(self.module.params["content"])
# set file parameter and run file present
self.module.params["file"] = filename
self.run_file_absent()
# delete content
self.delete_content(filename)
def run_content_info(self):
"""
get key info from content
"""
# prepare content
filename = self.prepare_content(self.module.params["content"])
# set file parameter and run file present
self.module.params["file"] = filename
self.run_file_info()
# delete content
self.delete_content(filename)
def run_fpr_present(self):
"""
import key from keyserver
"""
self._vv("import new keys from keyserver")
# set fpr shorthand
fpr = self.module.params["fpr"]
# set base values
installed = False
impcnt = 0
trucnt = 0
keyinfo = {
'fprs': [],
'keys': [],
}
# check if key is installed
for ik in self.installed_keys["keys"]:
if (fpr == ik["fingerprint"]):
# set keyinfo
self._vv("fingerprint [{}] already installed".format(fpr))
keyinfo["fprs"].append(fpr)
keyinfo["keys"].append(ik)
keyinfo["keys"][0]["state"] = "present"
keyinfo["keys"][0]["changed"] = False
installed = True
# check trust
if not self.compare_trust(ik["trust_level"], self.module.params["trust"]):
# update trust level
self.set_trust(fpr, self.module.params["trust"])
trucnt += 1
# get trust level as displayed by gpg
tru_level, tru_desc = self.get_trust(self.module.params["trust"])
keyinfo["keys"][0]["changed"] = True
keyinfo["keys"][0]["trust_level"] = tru_level
keyinfo["keys"][0]["trust_level_desc"] = tru_desc
continue
if not installed:
self._vv("fingerprint [{}] not yet installed".format(fpr))
# import file
cmd = self.prepare_command("fpr", "present")
cmd += [fpr]
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self._vv("fingerprint [{}] successfully imported from keyserver".format(fpr))
# get info from specific key; keyservers only contain public keys
# so no point in checking the secret keys
cmd = self.prepare_command("check", "installed_public")
cmd += [fpr]
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
keyinfo = self.process_colons(stdout)
# check expiration by checking trust
if keyinfo["keys"][0]["trust_level"] in ['i','d','r','e']:
# deleted the expired key and fail
cmd = self.prepare_command("fpr", "absent")
cmd += [fpr]
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self.module.fail_json(msg="key is either expired or invalid [trust={}] [expiration={}]".format(keyinfo["keys"][0]["trust_level"], keyinfo["keys"][0]["expirationdate"]))
# update key info
keyinfo["keys"][0]["state"] = "present"
keyinfo["keys"][0]["changed"] = True
impcnt += 1
# check trust
if not self.compare_trust(keyinfo["keys"][0]["trust_level"], self.module.params["trust"]):
# update trust level
self.set_trust(fpr, self.module.params["trust"])
trucnt += 1
# get trust level as displayed by gpg
tru_level, tru_desc = self.get_trust(self.module.params["trust"])
keyinfo["keys"][0]["changed"] = True
keyinfo["keys"][0]["trust_level"] = tru_level
keyinfo["keys"][0]["trust_level_desc"] = tru_desc
# set keyinfo
self.set_keyinfo(keyinfo)
# check import count
if impcnt > 0 or trucnt > 0:
self.result["changed"] = True
# set message and return
self.result["msg"] = "[{}] keys were imported; [{}] trust levels updated".format(impcnt, trucnt)
return True
def run_fpr_absent(self):
"""
remove key(s)
"""
self._vv("delete keys based on fingerprint")
# set fpr shorthand
fpr = self.module.params["fpr"]
# set base values
installed = False
keycnt = 0
keyinfo = {
'fprs': [],
'keys': [],
}
# see if the key is installed or not
# ik = installed key
for ik in self.installed_keys["keys"]:
if fpr == ik["fingerprint"]:
if ("state" in ik and ik["state"] != "absent") or ("state" not in ik):
keyinfo["fprs"].append(fpr)
keyinfo["keys"].append(ik)
installed = True
continue
if not installed:
self._vv("fingerprint [{}] not installed; nothing to remove".format(fpr))
key = {}
key[fpr] = {
"state" : "absent",
"changed" : False,
"fingerprint" : fpr,
}
keyinfo["fprs"].append(fpr)
keyinfo["keys"].append(key)
else:
self._vv("fingerprint [{}] installed; will be removed".format(fpr))
# remove file
cmd = self.prepare_command("fpr", "absent")
# add fingerprint as argument
cmd += [fpr]
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self._vv("fingerprint [{}] successfully removed".format(fpr))
keyinfo["keys"][0]["state"] = "absent"
keyinfo["keys"][0]["changed"] = True
keycnt += 1
# re-run check installed command to prevent attempting to remove same
# fingerprint again (for example after removing pub/sec counterpart
# with the same fpr
self.check_installed_keys()
# set keyinfo
self.set_keyinfo(keyinfo)
# check import count
if keycnt > 0:
self.result["changed"] = True
# return
self.result["msg"] = "[{}] keys were removed".format(keycnt)
return True
def run_fpr_latest(self):
"""
get the latest key from the keyserver
"""
self._vv("get latest key from keyserver")
# set fpr shorthand
fpr = self.module.params["fpr"]
# set base values
installed = False
updated = False
updcnt = 0
trucnt = 0
keyinfo = {
'fprs': [],
'keys': [],
}
# check if key is installed
for ik in self.installed_keys["keys"]:
if (fpr == ik["fingerprint"]):
# set keyinfo
self._vv("fingerprint [{}] installed; updating from server".format(fpr))
keyinfo["fprs"].append(fpr)
keyinfo["keys"].append(ik)
keyinfo["keys"][0]["state"] = "present"
keyinfo["keys"][0]["changed"] = False
installed = True
continue
if not installed:
self._vv("fingerprint [{}] not yet installed; install first".format(fpr))
# import from keyserver
self.run_fpr_present()
return True
else:
self._vv("fetching updates from keyserver")
# get updates from keyserver
cmd = self.prepare_command("fpr", "latest")
cmd += [fpr]
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
# see if any updates were downloaded or not
# for some reason, gpg outputs these messages to stderr
updated = re.search('gpg:\s+unchanged: 1\n', stderr) is None
if updated:
updcnt += 1
# if key was updated, refresh info
if updated:
self._vv("key was updated on server")
# get info from specific key; keyservers only contain public keys
# so no point in checking the secret keys
cmd = self.prepare_command("check", "installed_public")
cmd += [fpr]
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
keyinfo = self.process_colons(stdout)
# check expiration by checking trust
if keyinfo["keys"][0]["trust_level"] in ['i','d','r','e']:
# deleted the expired key and fail
cmd = self.prepare_command("fpr", "absent")
cmd += [fpr]
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
self.module.fail_json(msg="key is either expired or invalid [trust={}] [expiration={}]".format(keyinfo["keys"][0]["trust_level"], keyinfo["keys"][0]["expirationdate"]))
# update key info
keyinfo["keys"][0]["state"] = "present"
keyinfo["keys"][0]["changed"] = True
# check trust
if not self.compare_trust(keyinfo["keys"][0]["trust_level"], self.module.params["trust"]):
# update trust level
self.set_trust(fpr, self.module.params["trust"])
# get trust level as displayed by gpg
tru_level, tru_desc = self.get_trust(self.module.params["trust"])
keyinfo["keys"][0]["changed"] = True
keyinfo["keys"][0]["trust_level"] = tru_level
keyinfo["keys"][0]["trust_level_desc"] = tru_desc
trucnt += 1
# set keyinfo
self.set_keyinfo(keyinfo)
# check import count
if updcnt > 0 or trucnt > 0:
self.result["changed"] = True
# set message and return
self.result["msg"] = "[{}] keys were updated; [{}] trust levels updated".format(updcnt, trucnt)
return True
def run_fpr_info(self):
"""
method to only return current key info
will never report changed as it doesn't change anything on the target
"""
# frp shorthand
fpr = self.module.params["fpr"]
keycount = 0
# check if the request is for a single key or all
if fpr == "*":
keyinfo = self.installed_keys
keycount = len(self.installed_keys["keys"])
else:
# then see if the key is already installed
# ik = installed key
installed = False
keycount = 1
keyinfo = {
"fprs": [],
"keys": [],
}
for ik in self.installed_keys["keys"]:
if (fpr == ik["fingerprint"]):
self._vv("fingerprint [{}] installed".format(fpr))
keyinfo["fprs"].append(fpr)
keyinfo["fprs"].append(ik)
keyinfo["keys"][0]["state"] = "present"
keyinfo["keys"][0]["changed"] = False
installed = True
continue
if not installed:
# set state
self._vv("fingerprint [{}] not installed".format(fpr))
keyinfo["fprs"].append(fpr)
keyinfo["keys"].append({})
keyinfo["keys"][0]["fingerprint"] = fpr
keyinfo["keys"][0]["state"] = "absent"
keyinfo["keys"][0]["changed"] = False
# set keyinfo
self.set_keyinfo(keyinfo)
self.result["msg"] = "listing info for [{}] key(s)".format(keycount)
def prepare_content(self, content):
"""
prepare content
"""
# create temporary file and write contents
filename = "tmp-gpg-{}.asc".format(time.time())
self._vv("writing content to temporary file [{}]".format(filename))
tmpfile = open("{}".format(filename),"w+")
tmpfile.write(content)
tmpfile.close()
# return filename
return filename
def delete_content(self, filename):
"""
delete temporary content
"""
# cleanup
self._vv("deleting temporary file [{}]".format(filename))
os.remove(filename)
def prepare_command(self, action, state):
"""
prepare any gpg command
"""
# set base command
cmd = [self.module.params["gpgbin"]]
# determine dry run / check mode
if self.module.check_mode:
cmd.append("--dry-run")
# determine if homedir was set
if self.module.params["homedir"]:
cmd.append("--homedir")
cmd.append(self.module.params["homedir"])
# check versions
if action == "check" and state == "versions":
args = ["--version"]
# check installed public keys
if action == "check" and state == "installed_public":
args = [
"--with-colons",
"--list-keys",
]
# check installed secret keys
if action == "check" and state == "installed_secret":
args = [
"--with-colons",
"--list-secret-keys",
]
# check file
if action == "check" and state == "file":
args = [
"--with-colons",
"--import-options",
"show-only",
"--import",
self.module.params["file"],
]
# file present
if action == "file" and state == "present":
args = [
"--import",
self.module.params["file"],
]
# file absent
if action == "file" and state == "absent":
args = [
"--batch",
"--yes",
"--delete-secret-and-public-key",
]
# set ownertrust
if action == "set" and state == "trust":
args = ["--import-ownertrust"]
# fpr present
if action == "fpr" and state == "present":
args = ["--recv-keys"]
# determine if keyserver
if self.module.params["keyserver"]:
cmd.append("--keyserver")
cmd.append(self.module.params["keyserver"])
# fpr absent
if action == "fpr" and state == "absent":
args = [
"--batch",
"--yes",
"--delete-secret-and-public-key",
]
# fpr latest
if action == "fpr" and state == "latest":
args = ["--refresh-keys"]
# determine if keyserver
if self.module.params["keyserver"]:
cmd.append("--keyserver")
cmd.append(self.module.params["keyserver"])
# merge cmd and args and return
cmd += args
self._vv("running command [{}]".format(" ".join(cmd)))
return cmd
def check_versions(self):
"""
function to verify we have the right gnupg2 version
"""
self._vv("checking gnupg and libgcrypt versions")
# set command
cmd = self.prepare_command("check", "versions")
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
# stdout lines - run_command returns a single string and we need the first and second line only
lines = stdout.splitlines()
# find gpg version
regex_gpg = r"gpg\s+\(GnuPG[^)]*\)\s+(\d+\.\d+\.?\d*)$"
match_gpg = re.search(regex_gpg, lines[0])
# sanity check
if match_gpg is None or match_gpg.group(1) is None:
self.module.fail_json(msg="could not find a valid gpg version number in string [{}]".format(lines[0]))
# find libgcrypt version
regex_libgcrypt = r"libgcrypt\s+(\d+\.\d+\.?\d*)"
match_libgcrypt = re.match(regex_libgcrypt, lines[1])
# sanity check
if match_libgcrypt is None or match_libgcrypt.group(1) is None:
self.module.fail_json(msg="could not find a valid libgcrypt version number in string [{}]".format(lines[1]))
# check versions
versions = {'gpg' : match_gpg.group(1),
'libgcrypt' : match_libgcrypt.group(1),
}
req_gpg = '2.1.17'
req_libgcrypt = '1.8.1'
# display minimum versions
self._vv("gpg_key module requires at least gnupg version [{}] and libgcrypt version [{}]".format(versions['gpg'], versions['libgcrypt']))
# sanity check
if version.parse(versions['gpg']) < version.parse(req_gpg) or version.parse(versions['libgcrypt']) < version.parse(req_libgcrypt):
self.module.fail_json(msg="gpg version [{}] and libgcrypt version [{}] are required; [{}] and [{}] given".format(req_gpg, req_libgcrypt, versions['gpg'], versions['libgcrypt']))
else:
self._vv("gnupg version [{}] and libgcrypt version [{}] detected".format(versions['gpg'], versions['libgcrypt']))
return True
def check_installed_keys(self):
"""
get list of keyfiles from current gpg homedir
"""
self._vv("checking installed public keys on target host")
# set command
cmd = self.prepare_command("check", "installed_public")
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
# get public key info
pubkeyinfo = self.process_colons(stdout)
self._vv("found a total of [{}] public keys on target host".format(len(pubkeyinfo["fprs"])))
self._vv("checking installed secret keys on target host")
# set command
cmd = self.prepare_command("check", "installed_secret")
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
# get public key info
seckeyinfo = self.process_colons(stdout)
self._vv("found a total of [{}] secret keys on target host".format(len(seckeyinfo["fprs"])))
# merge keys
keyinfo = {
'fprs': pubkeyinfo["fprs"]+seckeyinfo["fprs"],
'keys': pubkeyinfo["keys"]+seckeyinfo["keys"],
}
# remove any duplicate fingerprints which may occur in both pub and sec keys
keyinfo["fprs"] = list(dict.fromkeys(keyinfo["fprs"]))
# set keyinfo
self.installed_keys = keyinfo
def check_homedir(self):
"""
check homedir
"""
# check if homedir exists, if not, fail
if self.module.params["homedir"] and not os.path.isdir(self.module.params["homedir"]):
self.module.fail_json(msg="given homedir [{}] does not exist or not accessible by current ansible user".format(self.module.params["homedir"]))
self._vv("homedir set to [{}]".format(self.module.params["homedir"]))
return True
def check_file(self):
"""
check if param file exists on target machine
check if file is a valid keyfile
check for fingerprints
"""
self._vv("checking keyfile on target host")
# sanity check
if not os.path.isfile(self.module.params["file"]):
self.module.fail_json(msg="the keyfile [{}] does not exist on the target machine".format(self.module.params["file"]))
else:
self._vv("keyfile [{}] exists on target host".format(self.module.params["file"]))
# get key info from file
cmd = self.prepare_command("check", "file")
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
keyinfo = self.process_colons(stdout)
return keyinfo
def process_colons(self, cinfo):
"""
fetch key information from colon output
"""
#
# SAMPLE DATA
#
# sec:u:256:22:41343326127FD34F:1566067845:::u:::cC:::+::ed25519:::0:
# fpr:::::::::0D18E4B6B2698560729D00CE41343326127FD34F:
# grp:::::::::54AA357FD85BA4D4B7CE86016A3734F00B1BDD07:
# uid:u::::1566067845::00B9F0DC33EE293CC1E687FFA54A5EA805FD78F8::testing145 (TESTINGCOMM) <test@netson.nld>::::::::::0:
#
#
# line types
#
# *** Field 1 - Type of record
#
# - pub :: Public key
# - crt :: X.509 certificate
# - crs :: X.509 certificate and private key available
# - sub :: Subkey (secondary key)
# - sec :: Secret key
# - ssb :: Secret subkey (secondary key)
# - uid :: User id
# - uat :: User attribute (same as user id except for field 10).
# - sig :: Signature
# - rev :: Revocation signature
# - rvs :: Revocation signature (standalone) [since 2.2.9]
# - fpr :: Fingerprint (fingerprint is in field 10)
# - pkd :: Public key data [*]
# - grp :: Keygrip
# - rvk :: Revocation key
# - tfs :: TOFU statistics [*]
# - tru :: Trust database information [*]
# - spk :: Signature subpacket [*]
# - cfg :: Configuration data [*]
#
# Records marked with an asterisk are described at [[*Special%20field%20formats][*Special fields]].
#
#
# *** Field 12 - Key capabilities
#
# The defined capabilities are:
#
# - e :: Encrypt
# - s :: Sign
# - c :: Certify
# - a :: Authentication
# - ? :: Unknown capability
#
# A key may have any combination of them in any order. In addition
# to these letters, the primary key has uppercase versions of the
# letters to denote the _usable_ capabilities of the entire key, and
# a potential letter 'D' to indicate a disabled key.
#
#
# FIELD TYPES:
#
# - Field 1 - Type of record
# - Field 2 - Validity
# - Field 3 - Key length
# - Field 4 - Public key algorithm
# - Field 5 - KeyID
# - Field 6 - Creation date
# - Field 7 - Expiration date
# - Field 8 - Certificate S/N, UID hash, trust signature info
# - Field 9 - Ownertrust
# - Field 10 - User-ID
# - Field 11 - Signature class
# - Field 12 - Key capabilities
# - Field 13 - Issuer certificate fingerprint or other info
# - Field 14 - Flag field
# - Field 15 - S/N of a token
# - Field 16 - Hash algorithm
# - Field 17 - Curve name
# - Field 18 - Compliance flags
# - Field 19 - Last update
# - Field 20 - Origin
# - Field 21 - Comment
#
# determine the correct line
main_lines = ['sec','ssb','pub','sub']
follow_lines = ['fpr','grp','uid']
# indexes start at 0
# main parts are for main_lines only
mainparts = {
'type' : 0,
'trust_level' : 1,
'key_length' : 2,
'pubkey_algorithm' : 3,
'keyid' : 4,
'creationdate' : 5,
'expirationdate' : 6,
'key_capabilities' : 11,
'hash_algorithm' : 15,
'curve_name' : 16,
}
# indexes start at 0
# follow parts for follow_lines only
followparts = {
'type' : 0,
'userid' : 9, # this is the fingerprint for fpr records and the keygrip for grp records
}
#
# 9.1. Public-Key Algorithms
#
# ID Algorithm
# -- ---------
# 1 - RSA (Encrypt or Sign) [HAC]
# 2 - RSA Encrypt-Only [HAC]
# 3 - RSA Sign-Only [HAC]
# 16 - Elgamal (Encrypt-Only) [ELGAMAL] [HAC]
# 17 - DSA (Digital Signature Algorithm) [FIPS186] [HAC]
# 18 - Reserved for Elliptic Curve
# 19 - Reserved for ECDSA
# 20 - Reserved (formerly Elgamal Encrypt or Sign)
# 21 - Reserved for Diffie-Hellman (X9.42,
# as defined for IETF-S/MIME)
# 22 - Ed25519
# 100 to 110 - Private/Experimental algorithm
#
pubkeys = {
'1' : 'RSA (Encrypt or Sign)',
'2' : 'RSA Encrypt-Only',
'3' : 'RSA Sign-Only',
'16' : 'Elgamal (Encrypt-Only)',
'17' : 'DSA [FIPS186]',
'18' : 'Cv25519',
'22' : 'Ed25519',
}
#
# 2. Field: A letter describing the calculated trust. This is a single
# letter, but be prepared that additional information may follow
# in some future versions. (not used for secret keys)
#
# o = Unknown (this key is new to the system)
# i = The key is invalid (e.g. due to a missing self-signature)
# d = The key has been disabled
# r = The key has been revoked
# e = The key has expired
# - = Unknown trust (i.e. no value assigned)
# q = Undefined trust; '-' and 'q' may safely be treated as the same value for most purposes
# n = Don't trust this key at all
# m = There is marginal trust in this key
# f = The key is full trusted.
# u = The key is ultimately trusted; this is only used for
# keys for which the secret key is also available.
#
trustlevels = {
'o' : 'Unknown/new',
'i' : 'The key is invalid',
'd' : 'The key has been disabled',
'r' : 'The key has been revoked',
'e' : 'The key has expired',
'-' : 'Unknown trust',
'q' : 'Undefined trust',
'n' : 'Dont trust this key at all',
'm' : 'There is marginal trust in this key',
'f' : 'The key is fully trusted',
'u' : 'The key is ultimately trusted',
}
# set list of keys and list of fingerprints
keys = []
fprs = []
# set empty key dict
curKey = {}
# loop through lines
for l in cinfo.splitlines():
# split line into pieces
pieces = l.split(":")
# get current line type
curType = pieces[mainparts.get('type')]
# check for usage/capabilities
if curType in main_lines:
# check if curKey has values; if so add them to keys list first
if "type" in curKey:
self._vv("found [{}] key with fingerprint [{}]".format(curKey["type"], curKey["fingerprint"]))
keys.append(curKey)
# get pubkey algorithm
p = pieces[mainparts.get('pubkey_algorithm')]
z = pubkeys.get(p) if p is not None else ''
# get trustlevel description
p = pieces[mainparts.get('trust_level')]
t = trustlevels.get(p) if p is not None else ''
curKey = {
'type': pieces[mainparts.get('type')],
'trust_level': pieces[mainparts.get('trust_level')],
'trust_level_desc': t,
'key_length': pieces[mainparts.get('key_length')],
'pubkey_algorithm': z,
'keyid': pieces[mainparts.get('keyid')],
'creationdate': pieces[mainparts.get('creationdate')],
'expirationdate': pieces[mainparts.get('expirationdate')],
'key_capabilities': pieces[mainparts.get('key_capabilities')],
'hash_algorithm': pieces[mainparts.get('hash_algorithm')],
'curve_name': pieces[mainparts.get('curve_name')],
}
elif curType in follow_lines:
# check follow line type
if curType == "fpr":
curKey["fingerprint"] = pieces[followparts.get('userid')]
fprs.append(curKey["fingerprint"])
elif curType == "grp":
curKey["keygrip"] = pieces[followparts.get('userid')]
elif curType == "uid":
curKey["userid"] = pieces[followparts.get('userid')]
# if we make it here we have encountered an unknown linetype
# we should add the key info we have gathered so far to the keylist
# and reset the key dict so it won't get added again in case more
# keys will follow in the next lines
else:
if "type" in curKey:
self._vv("found [{}] key with fingerprint [{}]".format(curKey["type"], curKey["fingerprint"]))
keys.append(curKey)
curKey = {}
# after the last line, see if any keys remain which need to be added
if "type" in curKey:
self._vv("found [{}] key with fingerprint [{}]".format(curKey["type"], curKey["fingerprint"]))
keys.append(curKey)
#
# set and return results
#
return {
'keys': keys,
'fprs': fprs,
}
def compare_trust(self, trust1, trust2):
"""
method to compare 2 trust levels
"""
# check if we are managing trust
if not self.module.params["manage_trust"]:
self._vv("we're not managing trust")
return True
#
# trust level returned by GnuPG
# 'o' : 'Unknown/new',
# 'i' : 'The key is invalid',
# 'd' : 'The key has been disabled',
# 'r' : 'The key has been revoked',
# 'e' : 'The key has expired',
# '-' : 'Unknown trust',
# 'q' : 'Undefined trust',
# 'n' : 'Dont trust this key at all',
# 'm' : 'There is marginal trust in this key',
# 'f' : 'The key is fully trusted',
# 'u' : 'The key is ultimately trusted',
#
trust_map = {
'o' : "0",
'i' : "0",
'd' : "0",
'r' : "0",
'e' : "0",
'-' : "1",
'q' : "1",
'n' : "2",
'm' : "3",
'f' : "4",
'u' : "5",
}
# convert trust if necessary
if trust1 in trust_map.keys():
trust1 = trust_map[trust1]
if trust2 in trust_map.keys():
trust2 = trust_map[trust2]
self._vv("comparing trust [{}] and [{}]".format(trust1, trust2))
# compare trust
return trust1 == trust2
def get_trust(self, trust):
"""
method to get trust indicator from value
"""
gpg_map = {
'1' : '-',
'2' : 'n',
'3' : 'm',
'4' : 'f',
'5' : 'u',
}
trust_map = {
'-' : 'Unknown trust',
'n' : 'Dont trust this key at all',
'm' : 'There is marginal trust in this key',
'f' : 'The key is fully trusted',
'u' : 'The key is ultimately trusted',
}
# return trust value
return gpg_map[trust], trust_map[gpg_map[trust]]
def set_trust(self, fingerprint, trust):
"""
method to set ownertrust
"""
#
# Trust | Description | Value ownertrust | Value with colons
# 1 | I don't know or won't say | 2 | -|q|o
# 2 | I do NOT trust | 3 | n
# 3 | I trust marginally | 4 | m
# 4 | I trust fully | 5 | f
# 5 | I trust ultimately | 6 | u
#
self._vv("update trust level to [{}]".format(trust))
# IMPORTANT: please keep in mind that with trust levels other than 5
# the keys you import will need to be signed by a fully trusted key,
# or be signed using the web of trust; see:
# Using trust to validate keys: https://www.gnupg.org/gph/en/manual/x334.html
# signing keys is not handled by this module and should be done by yourself
# setting the trust.
# trust map
trust_map = {
'1' : '2',
'2' : '3',
'3' : '4',
'4' : '5',
'5' : '6',
}
# create temporary owner trust file
# the newline at the end is required to prevent a 'gpg: line too long' error
content = "{}:{}:\n".format(fingerprint, trust_map[trust])
filename = self.prepare_content(content)
# prepare command
cmd = self.prepare_command("set", "trust")
cmd += [filename]
# run subprocess
rc, stdout, stderr = self.module.run_command(args=cmd, check_rc=True)
# delete content
self.delete_content(filename)
# return
return True
def set_keyinfo(self, keyinfo):
"""
sets the keyinfo in an easy to process format
starting with the fingerprint as the key,
then the value is a dict with the key details
"""
self._vv("setting key info to return to playbook")
# loop through keyinfo and set fprs as dict key
for key in keyinfo["keys"]:
if "fingerprint" in key:
self.result["keys"][key["fingerprint"]] = key
def main():
# define available arguments/parameters a user can pass to the module
module_args = dict(
fpr=dict(type='str', required=False),
keyserver=dict(type='str', default='keyserver.ubuntu.com'),
file=dict(type='path', required=False),
content=dict(type='str', required=False),
trust=dict(type='str', default='1', choices=['1','2','3','4','5']),
manage_trust=dict(type='bool', default=True),
state=dict(type='str', default='present', choices=['info', 'present', 'absent', 'latest']),
gpgbin=dict(type='path', default=None),
homedir=dict(type='path', default=None),
)
# set mutually exclusive params
mutually_exclusive = [
['fpr', 'file', 'content'],
]
# set at least one required field
required_one_of = [
['fpr', 'file', 'content']
]
# the AnsibleModule object will be our abstraction working with Ansible
# this includes instantiation, a couple of common attr would be the
# args/params passed to the execution, as well as if the module
# supports check mode
module = AnsibleModule(
argument_spec=module_args,
mutually_exclusive=mutually_exclusive,
required_one_of=required_one_of,
supports_check_mode=True
)
# run module
gpgkey = GpgKey(module)
result = gpgkey.run()
# in the event of a successful module execution, you will want to
# simple AnsibleModule.exit_json(), passing the key/value results
module.exit_json(**result)
# if the user is working with this module in only check mode we do not
# want to make any changes to the environment, just return the current
# state with no modifications
if module.check_mode:
module.exit_json(**result)
# module.get_bin_path / def get_bin_path
# module.run_command / def run_command
if __name__ == '__main__':
main()