Skip to content

Commit

Permalink
Fix up openvpn management script
Browse files Browse the repository at this point in the history
  • Loading branch information
HippocampusGirl committed Oct 25, 2024
1 parent 0f3c270 commit b01f035
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 12 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# ignore direnv
.direnv

machines/laptop/openvpn_management.py
10 changes: 5 additions & 5 deletions machines/laptop/networking.nix
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
sops = {
secrets."charite/openvpn/config" = { sopsFile = ./secrets.yaml; };
secrets."charite/openvpn/secrets" = { sopsFile = ./secrets.yaml; };
secrets."charite/openvpn/management-script" = { sopsFile = ./secrets.yaml; mode = "0500"; };
};
systemd.services.openvpn-charite-management = {
enable = true;
Expand All @@ -32,8 +31,9 @@

serviceConfig = {
Type = "simple";
ExecStart = ''
${pkgs.python3}/bin/python ${config.sops.secrets."charite/openvpn/management-script".path} \
Environment = "DISPLAY=:0";
ExecStart = let python = pkgs.python3.withPackages (ps: with ps; [ tkinter ]); in ''
${python}/bin/python ${./openvpn_management.py} \
${config.sops.secrets."charite/openvpn/secrets".path}
'';
Restart = "always";
Expand All @@ -53,12 +53,12 @@
{
description = "GlobalProtect/OpenConnect instance '${portal}'";

enable = false;
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
restartTriggers = [ config.environment.etc."vpnc/post-connect.d/update-systemd-resolved".source ];

path = [ pkgs.openconnect pkgs.gp-saml-gui pkgs.sudo ];
path = [ pkgs.gp-saml-gui pkgs.openconnect pkgs.sudo ];

serviceConfig = {
Type = "simple";
Expand Down
91 changes: 91 additions & 0 deletions machines/laptop/openvpn_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
import re
from argparse import ArgumentParser
from asyncio import open_unix_connection, run
from typing import Mapping, Protocol
from tkinter.simpledialog import askstring

logging.basicConfig(level="INFO")
logger = logging.getLogger("openvpn_management")

argument_parser = ArgumentParser()
argument_parser.add_argument("secrets")
arguments = argument_parser.parse_args()

with open(arguments.secrets, "r") as file_handle:
username, password = map(str.strip, file_handle.read().splitlines())


def totp() -> str:
while (
totp := askstring(title="Charité", prompt="Please confirm with second factor")
) is None:
pass
return totp


class Handler(Protocol):
def __call__(self, **kwargs: str) -> None: ...


def log_failed_authentication(**kwargs: str) -> None:
label = kwargs["label"]
logger.error(f"Failed to authenticate with '{label}'")


def log_auth_token(**_: str) -> None:
logger.info("Received auth-token from server")


def log(**kwargs: str) -> None:
level = kwargs["level"]
message = kwargs["message"].strip()
{
"FATAL": logger.critical,
"ERROR": logger.error,
"SUCCESS": logger.info,
"INFO": logger.info,
}[level](message)


# Adapted from https://github.com/larsks/openvpn-askpass/blob/b04ed01a6352ca7b300f5792a75e4402d1cbbe22/openvpn_askpass/askpass.py#L46
async def main() -> None:
reader, writer = await open_unix_connection("/run/openvpn-charite-management.sock")

def send(command: str) -> None:
writer.write(f"{command}\n".encode())

def send_username_password(**kwargs: str) -> None:
label = kwargs["label"]

send(f'username "{label}" "{username}"')
send(f'password "{label}" "{password}{totp()}"')

commands: Mapping[str, Handler] = {
r">PASSWORD:Need '(?P<label>[^']*)' username/password": send_username_password,
r">PASSWORD:Auth-Token:.+": log_auth_token,
r">PASSWORD:Verification Failed: '(?P<label>[^']*)'": log_failed_authentication,
r">?(?P<level>(SUCCESS|FATAL|ERROR|INFO)):(?P<message>.+)": log,
}

try:
async for encoded_line in reader:
line = encoded_line.decode().strip()
for pattern, handler in commands.items():
if match := re.match(pattern, line):
handler(**match.groupdict())
break
else:
logger.info(line)
finally:
try:
send("quit")
except OSError:
pass


if __name__ == "__main__":
try:
run(main())
except (ConnectionRefusedError, KeyboardInterrupt):
pass
Loading

0 comments on commit b01f035

Please sign in to comment.