Skip to content

Commit

Permalink
Merge pull request #3 from xarblu/dev
Browse files Browse the repository at this point in the history
v2.0
  • Loading branch information
xarblu authored Jun 13, 2023
2 parents 027da43 + 9b606d5 commit c399d9e
Showing 1 changed file with 107 additions and 48 deletions.
155 changes: 107 additions & 48 deletions netdev-automount
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#!/usr/bin/env python3

import sys,os,re,subprocess,tomllib
import sys
import os
import re
import subprocess
import tomllib
from time import sleep
import concurrent.futures

'''
Automatically (un)mount network filesystems
Expand All @@ -24,7 +30,8 @@ Hosts are as configured in fstab.
# hosts = [ 'host_1', 'host_2', 'host_3' ]
'''

# returns a list of tuples containing (host, mountpoint)
# parse /etc/fstab for netmounts
# @return - list of tuples containing (host, mountpoint)
def parse_fstab() -> list[tuple[str,str]]:
shares = list[tuple[str,str]]()
with open('/etc/fstab') as fstab:
Expand All @@ -37,63 +44,114 @@ def parse_fstab() -> list[tuple[str,str]]:
if not re.match(r'.*?:.*?(?:\s+.*?){5}', line):
continue
host = re.sub(r'\s*(.*?):.*?\s+.*', r'\1', line.rstrip())
mountpoint = re.sub(r'\s*.*?:(.*?)\s+.*', r'\1', line.rstrip())
mountpoint = re.sub(r'\s*.*?\s+(.*?)\s+.*', r'\1', line.rstrip())
shares.append((host, mountpoint))
return shares

# returns True if host is reachable
def reachable(host: str) -> bool:
# resolve host outide of ping because ping's resolver is slow
getent = subprocess.run(['getent', 'hosts', host], capture_output=True, text=True)
if getent.returncode != 0:
return False
ip = getent.stdout.split(' ')[0]
ping = subprocess.run(['ping', '-c1', ip], capture_output=True)
if ping.returncode == 0:
return True
# check if host is reachable
# retry 'tries' times waiting 3s between tries
# @param host - host to check
# @param tries - try check n times (default 1)
# @return - True if reachable, else False
def reachable(host: str, tries: int = 1) -> bool:
for t in range(tries):
if t > 0:
sleep(3)
try:
# resolve host outide of ping because ping's resolver is slow
getent = subprocess.run(['getent', 'hosts', host], capture_output=True, text=True, timeout=5)
if getent.returncode != 0:
continue
ip = getent.stdout.split(' ')[0]
ping = subprocess.run(['ping', '-c1', ip], capture_output=True, timeout=5)
if ping.returncode == 0:
return True
else:
continue
except subprocess.TimeoutExpired:
continue
else:
return False

# returns True if mountpoint already has a FS mounted
# check if mountpoint is mounted
# assume timeout means unreachable zombie netmount
# @param mountpoint - path to a local mountpoint
# @return - True if mountpoint mounted or timeout, else False
def mounted(mountpoint: str) -> bool:
p = subprocess.run(['mountpoint', mountpoint], capture_output=True)
if p.returncode == 0:
try:
p = subprocess.run(['mountpoint', mountpoint], capture_output=True, timeout=5)
return p.returncode == 0
except subprocess.TimeoutExpired:
return True
else:
return False

# mount <mountpoints> print error on fail
def mount(mountpoints: list[str]):
for mountpoint in mountpoints:
if not mounted(mountpoint):
p = subprocess.run(['mount', mountpoint], capture_output=True)
if p.returncode == 0:
print(f'Successfully mounted {mountpoint}')
else:
print(f'Failed mounting {mountpoint}')
# mount <netmounts>
# @param netmount - tuple of (host, mount)
# @param check_reachable - None disables check, int retries check n times (default disable)
def mount(netmount: tuple[str,str], check_reachable: None|int) -> None:
host,mount = netmount
if mounted(mount):
print(f'{mount} already mounted')
return
if check_reachable and not reachable(host, check_reachable):
print(f'{host} unreachable - not mounting')
return
try:
p = subprocess.run(['mount', mount], capture_output=True, timeout=5)
if p.returncode == 0:
print(f'Successfully mounted {mount} from {host}')
else:
print(f'{mountpoint} already mounted')

# unmount <mountpoints> print error on fail
def umount(mountpoints: list[str]):
for mountpoint in mountpoints:
if mounted(mountpoint):
p = subprocess.run(['umount', '-lf', mountpoint], capture_output=True)
print(f'Failed mounting {mount} from {host} - mount failed')
except subprocess.TimeoutExpired:
print(f'Failed mounting {mount} from {host} - timeout reached')

# unmount <netmounts>
# @param netmount - tuple of (host, mount)
# @param check_reachable - None disables check, int retries check n times (default disable)
def unmount(netmount: tuple[str,str], check_reachable: None|int) -> None:
host,mount = netmount
if not mounted(mount):
print(f'{mount} already unmounted')
return
if check_reachable and reachable(host, check_reachable):
print(f'{host} reachable - not unmounting')
return
# try to unmount clean first
for args in [(), ('-f'), ('-l', '-f')]:
try:
p = subprocess.run(['umount', *args, mount], capture_output=True, timeout=5)
if p.returncode == 0:
print(f'Successfully unmounted {mountpoint}')
print(f'Successfully unmounted {mount} from {host} with args {args}')
return
else:
print(f'Failed unmounting {mountpoint}')
else:
print(f'{mountpoint} not mounted')

# returns True if ran from a valid NetworkManager/dispatcher.d
print(f'Failed unmounting {mount} from {host} with args {args} - umount failed')
except subprocess.TimeoutExpired:
print(f'Failed unmounting {mount} from {host} with args {args} - timeout reached')

# start parallel mountjobs
# @param action - one of mount or unmount
# @param netmounts - list of (host, mountpoint) tuples
# @param check_reachable - None disables check, int retries check n times (default disable)
def mounter(action: str, netmounts: list[tuple[str,str]], check_reachable: None|int = None) -> None:
if action not in ['mount', 'unmount']:
raise ValueError("action should be one of [mount, unmount]")
mountjobs = concurrent.futures.ThreadPoolExecutor()
for netmount in netmounts:
if action == 'mount':
mountjobs.submit(mount, netmount, check_reachable)
elif action == 'unmount':
mountjobs.submit(unmount, netmount, check_reachable)
mountjobs.shutdown(wait=True)

# check if NetworkManager-dispatcher
# @return - True if ran from a valid NetworkManager/dispatcher.d, else False
def nm_dispatcher() -> bool:
if re.match(r'/(?:etc|usr/lib)/NetworkManager/dispatcher.d/.*', os.path.abspath(__file__)):
return True
else:
return False

# returns list of hosts associated with NetworkManager <connection>
# parse configured hosts from /etc/nm-netdev-automount.tomle
# @return - list of hosts associated with NetworkManager <connection>
def nm_dispatcher_hosts(connection: str) -> list[str] | None:
config_path = '/etc/nm-netdev-automount.toml'
with open(config_path, 'rb') as f:
Expand All @@ -119,19 +177,20 @@ def nm_dispatcher_events():
else:
exit()

# mount based on dispatched event and reachable status
# unmount based only on dispatched event
# act based on dispatched event
if sys.argv[2] in ['up', 'vpn-up']:
mount([ x[1] for x in parse_fstab() if x[0] in hosts and reachable(x[0]) ])
# mount valid hosts and allow 5 retries for checking reachability
mounter('mount', [x for x in parse_fstab() if x[0] in hosts], check_reachable=5)
elif sys.argv[2] in ['pre-down', 'down', 'vpn-pre-down', 'vpn-down']:
umount([ x[1] for x in parse_fstab() if x[0] in hosts])
# unmount valid hosts without checking reachability
mounter('unmount', [x for x in parse_fstab() if x[0] in hosts], check_reachable=None)
exit()

# Events when running directly
def direct():
# (un)mount based on reachable status
mount([ x[1] for x in parse_fstab() if reachable(x[0]) ])
umount([ x[1] for x in parse_fstab() if not reachable(x[0]) ])
# (un)mount hosts, only check reachability once
mounter('mount', [x for x in parse_fstab()], check_reachable=1)
mounter('unmount', [x for x in parse_fstab()], check_reachable=1)
exit()

# create default conf if it doesn't exist
Expand Down

0 comments on commit c399d9e

Please sign in to comment.