diff --git a/lib/snmpquery.py b/lib/snmpquery.py index 6886fff..9538558 100644 --- a/lib/snmpquery.py +++ b/lib/snmpquery.py @@ -1,11 +1,30 @@ import logging from asyncsnmplib.client import Snmp, SnmpV1, SnmpV3 -from asyncsnmplib.exceptions import SnmpNoConnection, SnmpNoAuthParams -from asyncsnmplib.mib.utils import on_result_base +from asyncsnmplib.exceptions import SnmpException +from asyncsnmplib.exceptions import SnmpNoAuthParams +from asyncsnmplib.exceptions import SnmpNoConnection +from asyncsnmplib.mib.utils import on_result from asyncsnmplib.v3.auth import AUTH_PROTO from asyncsnmplib.v3.encr import PRIV_PROTO from libprobe.asset import Asset -from libprobe.exceptions import CheckException + + +class InvalidCredentialsException(SnmpException): + message = 'Invalid SNMP v3 credentials.' + + +class InvalidClientConfigException(SnmpException): + message = 'Invalid SNMP v3 client configuration.' + + +class InvalidSnmpVersionException(SnmpException): + message = 'Invalid SNMP version.' + + +class ParseResultException(SnmpException): + def __init__(self, message: str): + super().__init__(message) + self.message = message def snmpv3_credentials(asset_config: dict): @@ -66,15 +85,13 @@ async def snmpquery( address = asset.name version = asset_config.get('version', '2c') - community = asset_config.get('community', 'public') - if not isinstance(community, str): - try: - community = community['secret'] - assert isinstance(community, str) - except Exception: - raise CheckException('missing credentials') if version == '2c': + community = asset_config.get('community', 'public') + if isinstance(community, dict): + community = community.get('secret') + if not isinstance(community, str): + raise TypeError('SNMP community must be a string.') cl = Snmp( host=address, community=community, @@ -83,49 +100,53 @@ async def snmpquery( try: cred = snmpv3_credentials(asset_config) except Exception as e: - logging.warning(f'invalid SNMP v3 credentials {asset}: {e}') - raise Exception('invalid SNMP v3 credentials') + logging.warning(f'invalid snmpv3 credentials {asset}: {e}') + raise InvalidCredentialsException try: cl = SnmpV3( host=address, **cred, ) except Exception as e: - logging.warning(f'invalid SNMP v3 client config {asset}: {e}') - raise Exception('invalid SNMP v3 credentials') + logging.warning(f'invalid snmpv3 client config {asset}: {e}') + raise InvalidClientConfigException elif version == '1': + community = asset_config.get('community', 'public') + if isinstance(community, dict): + community = community.get('secret') + if not isinstance(community, str): + raise TypeError('SNMP community must be a string.') cl = SnmpV1( host=address, community=community, ) else: - logging.warning(f'unsupported SNMP version {asset}: {version}') - raise CheckException('unsupported SNMP version') + logging.warning(f'unsupported snmp version {asset}: {version}') + raise InvalidSnmpVersionException try: await cl.connect() except SnmpNoConnection: - raise CheckException('failed to connect') + raise except SnmpNoAuthParams: - raise CheckException('failed to authenticate') + logging.warning('unable to connect: failed to set auth params') + raise else: results = {} try: for oid in queries: result = await cl.walk(oid) try: - name, result = on_result_base(oid, result) + name, result = on_result(oid, result) except Exception as e: msg = str(e) or type(e).__name__ - raise CheckException( - f'parse result error: {msg}') + raise ParseResultException( + f'Failed to parse result. Exception: {msg}' + ) else: results[name] = result - except CheckException: + except Exception: raise - except Exception as e: - msg = str(e) or type(e).__name__ - raise CheckException(msg) else: return results finally: diff --git a/requirements.txt b/requirements.txt index 4653bf5..2416908 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -asyncsnmplib==0.1.10 +asyncsnmplib==0.1.11 libprobe==0.2.33