Skip to content

Commit

Permalink
Add SHARED_ACCOUNTS
Browse files Browse the repository at this point in the history
  • Loading branch information
lvps committed Apr 13, 2024
1 parent a196656 commit 7f3dd1b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 16 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ Three ways to pass options are possible:
2. Environment variables
3. A `.env` file, see `.env.example`

If a variable is set multiple times, the one higher in this list will take precedence.
All UPPER_CASE names in `caco_mela.py --help` correspond to an environment variable with the same name.
If a variable is set multiple times, the one higher in this list will take precedence.

All UPPER_CASE names in `caco_mela.py --help` correspond to an environment variable with the same name.
If a variable takes a list, the values are space separated when using command line arguments and comma separated when using environment variables.

## Run

Expand Down
65 changes: 51 additions & 14 deletions caco-mela.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ def set_boolean(config: dict, var: str):
config[var] = config[var].lower() not in ("0", "no", "false", "n", "off")


def set_comma(config: dict, var: str):
if isinstance(config[var], list):
config[var] = set(config[var])
else:
config[var] = set(os.environ.get(var, default="").split(","))


def parse_args(tests_env: Optional[str] = None) -> dict[str, Union[str, bool]]:
parser = argparse.ArgumentParser(description="Provision SSH keys from a LDAP server, without syncing UIDs.", prog="caco-mela")
parser.add_argument("--version", action="version", version="%(prog)s 1.0.0")
Expand All @@ -31,12 +38,13 @@ def parse_args(tests_env: Optional[str] = None) -> dict[str, Union[str, bool]]:
parser.add_argument("--uid", dest="LDAP_SEARCH_SSH_UID_ATTR", type=str, help="Attribute containing the username")
parser.add_argument("-a", "--authorized", dest="SSH_AUTHORIZED_KEYS_FILES", type=str, help="Value of sshd option AuthorizedKeysFile")
parser.add_argument("--user-owns-file", dest="SSH_USER_OWNS_FILE", action="store_true", help="Users are set to owners of their authorized_keys file, if the file is created")
parser.add_argument("-i", "--ignored", dest="IGNORED_ACCOUNTS", nargs="+", type=str, help="Accounts to ignore")
parser.add_argument("--shared", dest="SHARED_ACCOUNTS", nargs="+", type=str, help="Shared accounts: add everyone's keys to them")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("IGNORED_ACCOUNTS", nargs="*", type=str, help="Accounts to ignore")
parser.add_argument("--dry-run", action="store_true", help="Do not write to authorized_keys file, just print what would have been written")
if tests_env is not None:
args = parser.parse_args([])
load_dotenv(tests_env, override=True)
print(os.environ)
else:
args = parser.parse_args()
load_dotenv()
Expand All @@ -48,10 +56,8 @@ def parse_args(tests_env: Optional[str] = None) -> dict[str, Union[str, bool]]:
set_default(config, "SSH_AUTHORIZED_KEYS_FILES", "")
set_boolean(config, "LDAP_STARTTLS")
set_boolean(config, "SSH_USER_OWNS_FILE")
if len(config["IGNORED_ACCOUNTS"]) == 0:
config["IGNORED_ACCOUNTS"] = set(os.environ.get("IGNORED_ACCOUNTS", default="").split(","))
else:
config["IGNORED_ACCOUNTS"] = set(config["IGNORED_ACCOUNTS"])
set_comma(config, 'IGNORED_ACCOUNTS')
set_comma(config, 'SHARED_ACCOUNTS')

return config

Expand Down Expand Up @@ -154,19 +160,41 @@ def ssh_authorized_keys_file(config, user: pwd.struct_passwd, create: bool = Tru
return path


def update_file(ssh_file, text) -> bool:
def update_file(ssh_file, text: str, dry_run: bool) -> bool:
with open(ssh_file, "r") as file:
current = file.read()
if text != current:
with open(ssh_file, "w") as file:
file.write(text)
if dry_run:
print(f"Dry run, would have written this to {ssh_file}:")
print(text)
return True
else:
with open(ssh_file, "w") as file:
file.write(text)
return True
return False


def generate_text(keys: list[str]):
def _keys_to_text(keys):
keys_text = "\n".join(keys) if len(keys) else "# No SSH keys for this user"
write_this = f"#\n# This file is managed by Caco mela ({__file__})\n# All manual changes will be overwritten.\n#\n{keys_text}\n"
return keys_text


def _warning_text():
return f"#\n# This file is managed by Caco mela ({__file__})\n# All manual changes will be overwritten.\n#"


def generate_text(keys: list[str]):
write_this = f"{_warning_text()}\n{_keys_to_text(keys)}\n"
return write_this


def generate_text_shared(results: dict[str, list[str]]):
write_almost_this = [_warning_text()]
for user in results:
write_almost_this.append(f"# Keys for {user}:")
write_almost_this.append(_keys_to_text(results[user]))
write_this = "\n".join(write_almost_this)
return write_this


Expand All @@ -181,10 +209,19 @@ def main(tests_env: Optional[str] = None):
if config["verbose"]:
print(f"Ignoring user {user.pw_name} due to IGNORED_ACCOUNTS")
continue
if user.pw_name in results:
if user.pw_name in config["SHARED_ACCOUNTS"]:
if config["verbose"]:
print(f"User {user.pw_name} is a shared account")
text = generate_text_shared(results)
ssh_file = ssh_authorized_keys_file(config, user)
if update_file(ssh_file, text, config['dry_run']):
print(f"Updated user {user.pw_name} with all available SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with all SSH keys")
elif user.pw_name in results:
text = generate_text(results[user.pw_name])
ssh_file = ssh_authorized_keys_file(config, user)
if update_file(ssh_file, text):
if update_file(ssh_file, text, config['dry_run']):
print(f"Updated user {user.pw_name} with {str(len(results[user.pw_name]))} SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with {str(len(results[user.pw_name]))} SSH keys")
Expand All @@ -194,7 +231,7 @@ def main(tests_env: Optional[str] = None):
if config["verbose"]:
print(f"User {user.pw_name} not found in LDAP server, removing keys")
text = generate_text([])
if update_file(ssh_file, text):
if update_file(ssh_file, text, config['dry_run']):
print(f"Updated user {user.pw_name} by removing all SSH keys")
elif config["verbose"]:
print(f"No change for user {user.pw_name} with 0 SSH keys")
Expand Down
49 changes: 49 additions & 0 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,52 @@ def test_user_not_owns_file():
"userUnknown",
):
assert not os.path.exists(f"/home/{user}/.ssh/authorized_keys")


def test_sharing_is_caring():
env_test_file = get_env_test_file()
with open(env_test_file, "w") as file:
file.write('LDAP_SEARCH_BASE="ou=people,dc=example,dc=test"\n')
file.write('LDAP_FILTER="(&(memberOf=cn=sysadmin,ou=groups,dc=example,dc=test)(!(nsAccountLock=true)))"\n')
file.write('LDAP_SEARCH_SSH_KEY_ATTR="nsSshPublicKey"\n')
file.write('SSH_USER_OWNS_FILE="1"\n')
file.write('SHARED_ACCOUNTS="user1,user2,user3"\n')
caco_mela.main(env_test_file)

assert os.path.isfile("/home/user1/.ssh/authorized_keys")
assert os.path.isfile("/home/user2/.ssh/authorized_keys")
assert os.path.isfile("/home/user3/.ssh/authorized_keys")
assert os.path.isfile("/home/user5/.ssh/authorized_keys")
assert not os.path.isfile("/home/user4/.ssh/authorized_keys")
assert os.path.isfile("/home/userTest/.ssh/authorized_keys")
assert not os.path.isfile("/home/userUnknown/.ssh/authorized_keys")

for user in ("user1", "user2", "user3"):
file = f"/home/{user}/.ssh/authorized_keys"
assert 0o600 == stat.S_IMODE(os.stat(file).st_mode)
assert getpwnam(user).pw_uid == os.stat(file).st_uid
assert getpwnam(user).pw_gid == os.stat(file).st_gid

with open(file, "r") as f:
as_string = f.read()
with open(file, "r") as f:
as_list = filter_lines_in_file(f.readlines())
assert "This file is managed by Caco mela" in as_string
as_list_2 = []
for line in as_list:
if not line.startswith("#") and len(line.lstrip()) > 0:
as_list_2.append(line)
assert ["ssh-ed25519 AAAAAAAi9s0dvjvjewjio0wevjwejvwejvowiwvesd foobarbaz\n", "ssh-ed25519 AAAAmviuewjuivjrvenuvlejnreiuvwejievwojviovewiofoobar\n", "ssh-ed25519 AAAACvSDMI62OVMImv2eMVMS5DIOV346EMVWIEO something\n", "ssh-ed25519 AAAAArei90vw3jb49r8hvb738uewjvuierverv foobarbaz\n"] == as_list_2

for user in ("user5",):
file = f"/home/{user}/.ssh/authorized_keys"
with open(file, "r") as f:
as_string = f.read()
with open(file, "r") as f:
as_list = filter_lines_in_file(f.readlines())
assert "This file is managed by Caco mela" in as_string
as_list_2 = []
for line in as_list:
if not line.startswith("#") and len(line.lstrip()) > 0:
as_list_2.append(line)
assert ["ssh-ed25519 AAAAArei90vw3jb49r8hvb738uewjvuierverv foobarbaz\n"] == as_list_2

0 comments on commit 7f3dd1b

Please sign in to comment.