diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..2d31711 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length: 88 diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..202ffc7 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,30 @@ +--- +name: Bug report +about: Create a report to help us improve. +title: '' +labels: '' +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +Steps to reproduce the behaviour: +1. Go to '...' +2. Click on '....' +3. Scroll down to '....' +4. See error + +**Expected behaviour** +A clear and concise description of what you expected to happen. + +**Environment** +Help us understand where the code is running. + - OS/Distribution: [e.g. Linux/Ubuntu/Debian] + - Python version [e.g. Python 3.10] + - Mastodon version [e.g. Mastodon 4.10] + +**Additional context** +Add any other context about the problem here that could help us find and fix the bug. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..24dc3b5 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project. +title: '' +labels: '' +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. E.g. "I am frustrated when [...]" + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. diff --git a/src/fediblockhole/__init__.py b/src/fediblockhole/__init__.py index c97816f..a786251 100755 --- a/src/fediblockhole/__init__.py +++ b/src/fediblockhole/__init__.py @@ -1,45 +1,57 @@ """A tool for managing federated Mastodon blocklists """ + from __future__ import annotations + import argparse -import toml import csv -import requests import json -import time import os.path import sys +import time import urllib.request as urlr +from importlib.metadata import version -from .blocklists import Blocklist, BlockAuditList, parse_blocklist -from .const import DomainBlock, BlockSeverity, BlockAudit +import requests +import toml -from importlib.metadata import version -__version__ = version('fediblockhole') +from .blocklists import BlockAuditList, Blocklist, parse_blocklist +from .const import BlockAudit, BlockSeverity, DomainBlock + +__version__ = version("fediblockhole") import logging -logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(levelname)s %(message)s') -log = logging.getLogger('fediblockhole') + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger("fediblockhole") # Max size of a URL-fetched blocklist -URL_BLOCKLIST_MAXSIZE = 1024 ** 3 +URL_BLOCKLIST_MAXSIZE = 1024**3 # Wait at most this long for a remote server to respond REQUEST_TIMEOUT = 30 # Time to wait between instance API calls to we don't melt them # The default Mastodon rate limit is 300 calls per 5 minutes -API_CALL_DELAY = 5 * 60 / 300 # 300 calls per 5 minutes +API_CALL_DELAY = 5 * 60 / 300 # 300 calls per 5 minutes # We always import the domain and the severity -IMPORT_FIELDS = ['domain', 'severity'] +IMPORT_FIELDS = ["domain", "severity"] # Allowlists always import these fields -ALLOWLIST_IMPORT_FIELDS = ['domain', 'severity', 'public_comment', 'private_comment', 'reject_media', 'reject_reports', 'obfuscate'] +ALLOWLIST_IMPORT_FIELDS = [ + "domain", + "severity", + "public_comment", + "private_comment", + "reject_media", + "reject_reports", + "obfuscate", +] # We always export the domain and the severity -EXPORT_FIELDS = ['domain', 'severity'] +EXPORT_FIELDS = ["domain", "severity"] + def sync_blocklists(conf: argparse.Namespace): """Sync instance blocklists from remote sources. @@ -62,16 +74,36 @@ def sync_blocklists(conf: argparse.Namespace): blocklists = [] # Fetch blocklists from URLs if not conf.no_fetch_url: - blocklists.extend(fetch_from_urls(conf.blocklist_url_sources, - import_fields, conf.save_intermediate, conf.savedir, export_fields)) + blocklists.extend( + fetch_from_urls( + conf.blocklist_url_sources, + import_fields, + conf.save_intermediate, + conf.savedir, + export_fields, + ) + ) # Fetch blocklists from remote instances if not conf.no_fetch_instance: - blocklists.extend(fetch_from_instances(conf.blocklist_instance_sources, - import_fields, conf.save_intermediate, conf.savedir, export_fields)) + blocklists.extend( + fetch_from_instances( + conf.blocklist_instance_sources, + import_fields, + conf.save_intermediate, + conf.savedir, + export_fields, + ) + ) # Merge blocklists into an update dict - merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type, conf.blocklist_auditfile) + merged = merge_blocklists( + blocklists, + conf.mergeplan, + conf.merge_threshold, + conf.merge_threshold_type, + conf.blocklist_auditfile, + ) # Remove items listed in allowlists, if any allowlists = fetch_allowlists(conf) @@ -86,15 +118,26 @@ def sync_blocklists(conf: argparse.Namespace): if not conf.no_push_instance: log.info("Pushing domain blocks to instances...") for dest in conf.blocklist_instance_destinations: - target = dest['domain'] - token = dest['token'] - scheme = dest.get('scheme', 'https') - max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence')) - push_blocklist(token, target, merged, conf.dryrun, import_fields, max_followed_severity, scheme, conf.override_private_comment) + target = dest["domain"] + token = dest["token"] + scheme = dest.get("scheme", "https") + max_followed_severity = BlockSeverity( + dest.get("max_followed_severity", "silence") + ) + push_blocklist( + token, + target, + merged, + conf.dryrun, + import_fields, + max_followed_severity, + scheme, + conf.override_private_comment, + ) + def apply_allowlists(merged: Blocklist, conf: argparse.Namespace, allowlists: dict): - """Apply allowlists - """ + """Apply allowlists""" # Apply allows specified on the commandline for domain in conf.allow_domains: log.info(f"'{domain}' allowed by commandline, removing any blocks...") @@ -113,18 +156,27 @@ def apply_allowlists(merged: Blocklist, conf: argparse.Namespace, allowlists: di return merged + def fetch_allowlists(conf: argparse.Namespace) -> Blocklist: - """ - """ + """ """ if conf.allowlist_url_sources: - allowlists = fetch_from_urls(conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS, conf.save_intermediate, conf.savedir) + allowlists = fetch_from_urls( + conf.allowlist_url_sources, + ALLOWLIST_IMPORT_FIELDS, + conf.save_intermediate, + conf.savedir, + ) return allowlists return Blocklist() -def fetch_from_urls(url_sources: dict, - import_fields: list=IMPORT_FIELDS, - save_intermediate: bool=False, - savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: + +def fetch_from_urls( + url_sources: dict, + import_fields: list = IMPORT_FIELDS, + save_intermediate: bool = False, + savedir: str = None, + export_fields: list = EXPORT_FIELDS, +) -> dict: """Fetch blocklists from URL sources @param blocklists: A dict of existing blocklists, keyed by source @param url_sources: A dict of configuration info for url sources @@ -133,28 +185,32 @@ def fetch_from_urls(url_sources: dict, log.info("Fetching domain blocks from URLs...") blocklists = [] for item in url_sources: - url = item['url'] + url = item["url"] # If import fields are provided, they override the global ones passed in - source_import_fields = item.get('import_fields', None) + source_import_fields = item.get("import_fields", None) if source_import_fields: # Ensure we always use the default fields import_fields = IMPORT_FIELDS.extend(source_import_fields) - max_severity = item.get('max_severity', 'suspend') - listformat = item.get('format', 'csv') + max_severity = item.get("max_severity", "suspend") + listformat = item.get("format", "csv") with urlr.urlopen(url) as fp: - rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8') + rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode("utf-8") bl = parse_blocklist(rawdata, url, listformat, import_fields, max_severity) blocklists.append(bl) if save_intermediate: save_intermediate_blocklist(bl, savedir, export_fields) - + return blocklists -def fetch_from_instances(sources: dict, - import_fields: list=IMPORT_FIELDS, - save_intermediate: bool=False, - savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: + +def fetch_from_instances( + sources: dict, + import_fields: list = IMPORT_FIELDS, + save_intermediate: bool = False, + savedir: str = None, + export_fields: list = EXPORT_FIELDS, +) -> dict: """Fetch blocklists from other instances @param blocklists: A dict of existing blocklists, keyed by source @param url_sources: A dict of configuration info for url sources @@ -163,14 +219,14 @@ def fetch_from_instances(sources: dict, log.info("Fetching domain blocks from instances...") blocklists = [] for item in sources: - domain = item['domain'] - admin = item.get('admin', False) - token = item.get('token', None) - scheme = item.get('scheme', 'https') + domain = item["domain"] + admin = item.get("admin", False) + token = item.get("token", None) + scheme = item.get("scheme", "https") # itemsrc = f"{scheme}://{domain}/api" # If import fields are provided, they override the global ones passed in - source_import_fields = item.get('import_fields', None) + source_import_fields = item.get("import_fields", None) if source_import_fields: # Ensure we always use the default fields import_fields = IMPORT_FIELDS.extend(source_import_fields) @@ -181,10 +237,14 @@ def fetch_from_instances(sources: dict, save_intermediate_blocklist(bl, savedir, export_fields) return blocklists -def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', - threshold: int=0, - threshold_type: str='count', - save_block_audit_file: str=None) -> Blocklist: + +def merge_blocklists( + blocklists: list[Blocklist], + mergeplan: str = "max", + threshold: int = 0, + threshold_type: str = "count", + save_block_audit_file: str = None, +) -> Blocklist: """Merge fetched remote blocklists into a bulk update @param blocklists: A dict of lists of DomainBlocks, keyed by source. Each value is a list of DomainBlocks @@ -201,8 +261,8 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', count_of_mentions / number_of_blocklists. @param returns: A dict of DomainBlocks keyed by domain """ - merged = Blocklist('fediblockhole.merge_blocklists') - audit = BlockAuditList('fediblockhole.merge_blocklists') + merged = Blocklist("fediblockhole.merge_blocklists") + audit = BlockAuditList("fediblockhole.merge_blocklists") num_blocklists = len(blocklists) @@ -211,25 +271,29 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', for bl in blocklists: for block in bl.values(): - if '*' in block.domain: + if "*" in block.domain: log.debug(f"Domain '{block.domain}' is obfuscated. Skipping it.") continue elif block.domain in domain_blocks: domain_blocks[block.domain].append(block) else: - domain_blocks[block.domain] = [block,] + domain_blocks[block.domain] = [ + block, + ] # Only merge items if `threshold` is met or exceeded for domain in domain_blocks: domain_matches_count = len(domain_blocks[domain]) domain_matches_percent = domain_matches_count / num_blocklists * 100 - if threshold_type == 'count': + if threshold_type == "count": domain_threshold_level = domain_matches_count - elif threshold_type == 'pct': + elif threshold_type == "pct": domain_threshold_level = domain_matches_percent # log.debug(f"domain threshold level: {domain_threshold_level}") else: - raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'") + raise ValueError( + f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'" # noqa + ) log.debug(f"Checking if {domain_threshold_level} >= {threshold} for {domain}") if domain_threshold_level >= threshold: @@ -243,10 +307,10 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', merged.blocks[block.domain] = block if save_block_audit_file: - blockdata:BlockAudit = { - 'domain': domain, - 'count': domain_matches_count, - 'percent': domain_matches_percent, + blockdata: BlockAudit = { + "domain": domain, + "count": domain_matches_count, + "percent": domain_matches_percent, } audit.blocks[domain] = blockdata @@ -256,9 +320,12 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', return merged -def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict: + +def apply_mergeplan( + oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str = "max" +) -> dict: """Use a mergeplan to decide how to merge two overlapping block definitions - + @param oldblock: The existing block definition. @param newblock: The new block definition we want to merge in. @param mergeplan: How to merge. Choices are 'max', the default, and 'min'. @@ -267,46 +334,48 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str blockdata = oldblock._asdict() # Merge comments - keylist = ['public_comment', 'private_comment'] + keylist = ["public_comment", "private_comment"] for key in keylist: try: oldcomment = getattr(oldblock, key) newcomment = getattr(newblock, key) blockdata[key] = merge_comments(oldcomment, newcomment) except KeyError: - log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...") + log.debug( + f"Key '{key}' missing from block definition so cannot compare. Continuing..." # noqa + ) continue - + # How do we override an earlier block definition? - if mergeplan in ['max', None]: + if mergeplan in ["max", None]: # Use the highest block level found (the default) # log.debug(f"Using 'max' mergeplan.") if newblock.severity > oldblock.severity: # log.debug(f"New block severity is higher. Using that.") - blockdata['severity'] = newblock.severity - + blockdata["severity"] = newblock.severity + # For 'reject_media', 'reject_reports', and 'obfuscate' if # the value is set and is True for the domain in # any blocklist then the value is set to True. - for key in ['reject_media', 'reject_reports', 'obfuscate']: + for key in ["reject_media", "reject_reports", "obfuscate"]: newval = getattr(newblock, key) - if newval == True: + if newval is True: blockdata[key] = True - elif mergeplan in ['min']: + elif mergeplan in ["min"]: # Use the lowest block level found - log.debug(f"Using 'min' mergeplan.") + log.debug("Using 'min' mergeplan.") if newblock.severity < oldblock.severity: - blockdata['severity'] = newblock.severity + blockdata["severity"] = newblock.severity # For 'reject_media', 'reject_reports', and 'obfuscate' if # the value is set and is False for the domain in # any blocklist then the value is set to False. - for key in ['reject_media', 'reject_reports', 'obfuscate']: + for key in ["reject_media", "reject_reports", "obfuscate"]: newval = getattr(newblock, key) - if newval == False: + if newval is False: blockdata[key] = False else: @@ -316,23 +385,24 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str return DomainBlock(**blockdata) -def merge_comments(oldcomment:str, newcomment:str) -> str: - """ Merge two comments + +def merge_comments(oldcomment: str, newcomment: str) -> str: + """Merge two comments @param oldcomment: The original comment we're merging into @param newcomment: The new commment we want to merge in @returns: a new str of the merged comment """ # Don't merge if both comments are None or '' - if oldcomment in ['', None] and newcomment in ['', None]: - return '' + if oldcomment in ["", None] and newcomment in ["", None]: + return "" # If both comments are the same, or new comment is empty, don't merge - if oldcomment == newcomment or newcomment in ['', None]: + if oldcomment == newcomment or newcomment in ["", None]: return oldcomment # If old comment is empty, just return the new one - if oldcomment in ['', None]: + if oldcomment in ["", None]: return newcomment # We want to skip duplicate fragments so we don't end up @@ -343,14 +413,14 @@ def merge_comments(oldcomment:str, newcomment:str) -> str: # This means "boring, lack of moderation, nazis, scrapers" merging # with "lack of moderation, scrapers" should result in # "boring, lack of moderation, nazis, scrapers" - old_tokens = oldcomment.split(', ') - new_tokens = newcomment.split(', ') - + old_tokens = oldcomment.split(", ") + new_tokens = newcomment.split(", ") + # Remove any empty string tokens that we get - while '' in old_tokens: - old_tokens.remove('') - while '' in new_tokens: - new_tokens.remove('') + while "" in old_tokens: + old_tokens.remove("") + while "" in new_tokens: + new_tokens.remove("") # Remove duplicate tokens for token in old_tokens: @@ -362,21 +432,25 @@ def merge_comments(oldcomment:str, newcomment:str) -> str: tokenset.extend(new_tokens) # Return the merged string - return ', '.join(tokenset) + return ", ".join(tokenset) -def requests_headers(token: str=None): + +def requests_headers(token: str = None): """Set common headers for requests""" - headers = { - 'User-Agent': f"FediBlockHole/{__version__}" - } + headers = {"User-Agent": f"FediBlockHole/{__version__}"} if token: - headers['Authorization'] = f"Bearer {token}" + headers["Authorization"] = f"Bearer {token}" return headers -def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False, - import_fields: list=['domain', 'severity'], - scheme: str='https') -> list[DomainBlock]: + +def fetch_instance_blocklist( + host: str, + token: str = None, + admin: bool = False, + import_fields: list = ["domain", "severity"], + scheme: str = "https", +) -> list[DomainBlock]: """Fetch existing block list from server @param host: The remote host to connect to. @@ -389,10 +463,10 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False, if admin: api_path = "/api/v1/admin/domain_blocks" - parse_format = 'json' + parse_format = "json" else: api_path = "/api/v1/instance/domain_blocks" - parse_format = 'mastodon_api_public' + parse_format = "mastodon_api_public" headers = requests_headers(token) @@ -410,47 +484,52 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False, # so we parse them and append them to the fetched list # of JSON data we need to parse. - blockdata.extend(json.loads(response.content.decode('utf-8'))) + blockdata.extend(json.loads(response.content.decode("utf-8"))) # Parse the link header to find the next url to fetch # This is a weird and janky way of doing pagination but # hey nothing we can do about it we just have to deal - link = response.headers.get('Link', None) + link = response.headers.get("Link", None) if link is None: break - pagination = link.split(', ') + pagination = link.split(", ") if len(pagination) != 2: link = None break else: next = pagination[0] # prev = pagination[1] - - urlstring, rel = next.split('; ') - url = urlstring.strip('<').rstrip('>') + + urlstring, rel = next.split("; ") + url = urlstring.strip("<").rstrip(">") blocklist = parse_blocklist(blockdata, url, parse_format, import_fields) return blocklist -def delete_block(token: str, host: str, id: int, scheme: str='https'): + +def delete_block(token: str, host: str, id: int, scheme: str = "https"): """Remove a domain block""" log.debug(f"Removing domain block {id} at {host}...") api_path = "/api/v1/admin/domain_blocks/" url = f"{scheme}://{host}{api_path}{id}" - response = requests.delete(url, - headers=requests_headers(token), - timeout=REQUEST_TIMEOUT + response = requests.delete( + url, headers=requests_headers(token), timeout=REQUEST_TIMEOUT ) if response.status_code != 200: if response.status_code == 404: log.warning(f"No such domain block: {id}") return - raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") + raise ValueError( + f"Something went wrong: {response.status_code}: {response.content}" + ) + -def fetch_instance_follows(token: str, host: str, domain: str, scheme: str='https') -> int: +def fetch_instance_follows( + token: str, host: str, domain: str, scheme: str = "https" +) -> int: """Fetch the followers of the target domain at the instance @param token: the Bearer authentication token for OAuth access @@ -461,37 +540,42 @@ def fetch_instance_follows(token: str, host: str, domain: str, scheme: str='http api_path = "/api/v1/admin/measures" url = f"{scheme}://{host}{api_path}" - key = 'instance_follows' + key = "instance_follows" # This data structure only allows us to request a single domain # at a time, which limits the load on the remote instance of each call data = { - 'keys': [ - key - ], - key: { 'domain': domain }, + "keys": [key], + key: {"domain": domain}, } # The Mastodon API only accepts JSON formatted POST data for measures - response = requests.post(url, - headers=requests_headers(token), - json=data, - timeout=REQUEST_TIMEOUT + response = requests.post( + url, headers=requests_headers(token), json=data, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: if response.status_code == 403: - log.error(f"Cannot fetch follow information for {domain} from {host}: {response.content}") + log.error( + f"Cannot fetch follow information for {domain} from {host}: {response.content}" # noqa + ) - raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") + raise ValueError( + f"Something went wrong: {response.status_code}: {response.content}" + ) # Get the total returned - follows = int(response.json()[0]['total']) + follows = int(response.json()[0]["total"]) return follows -def check_followed_severity(host: str, token: str, domain: str, + +def check_followed_severity( + host: str, + token: str, + domain: str, severity: BlockSeverity, - max_followed_severity: BlockSeverity=BlockSeverity('silence'), - scheme: str='https'): + max_followed_severity: BlockSeverity = BlockSeverity("silence"), + scheme: str = "https", +): """Check an instance to see if it has followers of a to-be-blocked instance""" log.debug("Checking followed severity...") @@ -507,63 +591,77 @@ def check_followed_severity(host: str, token: str, domain: str, if follows > 0: log.debug(f"Instance {host} has {follows} followers of accounts at {domain}.") if severity > max_followed_severity: - log.warning(f"Instance {host} has {follows} followers of accounts at {domain}. Limiting block severity to {max_followed_severity}.") + log.warning( + f"Instance {host} has {follows} followers of accounts at {domain}. " + f"Limiting block severity to {max_followed_severity}." + ) return max_followed_severity return severity + def is_change_needed(oldblock: dict, newblock: dict, import_fields: list): change_needed = oldblock.compare_fields(newblock, import_fields) return change_needed -def update_known_block(token: str, host: str, block: DomainBlock, scheme: str='https'): + +def update_known_block( + token: str, host: str, block: DomainBlock, scheme: str = "https" +): """Update an existing domain block with information in blockdict""" api_path = "/api/v1/admin/domain_blocks/" id = block.id blockdata = block._asdict() - del blockdata['id'] + del blockdata["id"] url = f"{scheme}://{host}{api_path}{id}" - response = requests.put(url, - headers=requests_headers(token), - json=blockdata, - timeout=REQUEST_TIMEOUT + response = requests.put( + url, headers=requests_headers(token), json=blockdata, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: - raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") + raise ValueError( + f"Something went wrong: {response.status_code}: {response.content}" + ) -def add_block(token: str, host: str, blockdata: DomainBlock, scheme: str='https'): - """Block a domain on Mastodon host - """ + +def add_block(token: str, host: str, blockdata: DomainBlock, scheme: str = "https"): + """Block a domain on Mastodon host""" log.debug(f"Adding block entry for {blockdata.domain} at {host}...") api_path = "/api/v1/admin/domain_blocks" url = f"{scheme}://{host}{api_path}" - response = requests.post(url, + response = requests.post( + url, headers=requests_headers(token), json=blockdata._asdict(), - timeout=REQUEST_TIMEOUT + timeout=REQUEST_TIMEOUT, ) if response.status_code == 422: # A stricter block already exists. Probably for the base domain. err = json.loads(response.content) - log.warning(err['error']) + log.warning(err["error"]) elif response.status_code != 200: - - raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") - -def push_blocklist(token: str, host: str, blocklist: list[DomainBlock], - dryrun: bool=False, - import_fields: list=['domain', 'severity'], - max_followed_severity:BlockSeverity=BlockSeverity('silence'), - scheme: str='https', - override_private_comment: str=None - ): + + raise ValueError( + f"Something went wrong: {response.status_code}: {response.content}" + ) + + +def push_blocklist( + token: str, + host: str, + blocklist: list[DomainBlock], + dryrun: bool = False, + import_fields: list = ["domain", "severity"], + max_followed_severity: BlockSeverity = BlockSeverity("silence"), + scheme: str = "https", + override_private_comment: str = None, +): """Push a blocklist to a remote instance. - + Updates existing entries if they exist, creates new blocks if they don't. @param token: The Bearer token for OAUTH API authentication @@ -574,8 +672,8 @@ def push_blocklist(token: str, host: str, blocklist: list[DomainBlock], log.info(f"Pushing blocklist to host {host} ...") # Fetch the existing blocklist from the instance # Force use of the admin API, and add 'id' to the list of fields - if 'id' not in import_fields: - import_fields.append('id') + if "id" not in import_fields: + import_fields.append("id") serverblocks = fetch_instance_blocklist(host, token, True, import_fields, scheme) # # Convert serverblocks to a dictionary keyed by domain name @@ -585,28 +683,44 @@ def push_blocklist(token: str, host: str, blocklist: list[DomainBlock], log.debug(f"Processing block: {newblock}") if newblock.domain in serverblocks: - log.debug(f"Block already exists for {newblock.domain}, checking for differences...") + log.debug( + f"Block already exists for {newblock.domain}, " + f"checking for differences..." + ) oldblock = serverblocks[newblock.domain] change_needed = is_change_needed(oldblock, newblock, import_fields) # Is the severity changing? - if 'severity' in change_needed: + if "severity" in change_needed: log.debug("Severity change requested, checking...") if newblock.severity > oldblock.severity: # Confirm if we really want to change the severity - # If we still have followers of the remote domain, we may not - # want to go all the way to full suspend, depending on the configuration - newseverity = check_followed_severity(host, token, oldblock.domain, newblock.severity, max_followed_severity, scheme) + # If we still have followers of the remote domain, + # we may not want to go all the way to full suspend, + # depending on the configuration + newseverity = check_followed_severity( + host, + token, + oldblock.domain, + newblock.severity, + max_followed_severity, + scheme, + ) if newseverity != oldblock.severity: newblock.severity = newseverity else: - log.info("Keeping severity of block the same to avoid disrupting followers.") - change_needed.remove('severity') + log.info( + "Keeping severity of block the same to avoid disrupting followers." # noqa + ) + change_needed.remove("severity") if change_needed: - log.info(f"Change detected. Need to update {change_needed} for domain block for {oldblock.domain}") + log.info( + f"Change detected. Need to update {change_needed} " + f"for domain block for {oldblock.domain}" + ) log.info(f"Old block definition: {oldblock}") log.info(f"Pushing new block definition: {newblock}") blockdata = oldblock.copy() @@ -635,7 +749,14 @@ def push_blocklist(token: str, host: str, blocklist: list[DomainBlock], log.debug(f"Block as dict: {newblock._asdict()}") # Make sure the new block doesn't clobber a domain with followers - newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity, scheme) + newblock.severity = check_followed_severity( + host, + token, + newblock.domain, + newblock.severity, + max_followed_severity, + scheme, + ) if not dryrun: add_block(token, host, newblock, scheme) # add a pause here so we don't melt the instance @@ -643,32 +764,34 @@ def push_blocklist(token: str, host: str, blocklist: list[DomainBlock], else: log.info("Dry run selected. Not adding block.") + def load_config(configfile: str): """Augment commandline arguments with config file parameters - + Config file is expected to be in TOML format """ conf = toml.load(configfile) return conf -def save_intermediate_blocklist(blocklist: Blocklist, filedir: str, - export_fields: list=['domain','severity']): - """Save a local copy of a blocklist we've downloaded - """ + +def save_intermediate_blocklist( + blocklist: Blocklist, filedir: str, export_fields: list = ["domain", "severity"] +): + """Save a local copy of a blocklist we've downloaded""" # Invent a filename based on the remote source # If the source was a URL, convert it to something less messy # If the source was a remote domain, just use the name of the domain source = blocklist.origin log.debug(f"Saving intermediate blocklist from {source}") - source = source.replace('/','-') + source = source.replace("/", "-") filename = f"{source}.csv" filepath = os.path.join(filedir, filename) save_blocklist_to_file(blocklist, filepath, export_fields) + def save_blocklist_to_file( - blocklist: Blocklist, - filepath: str, - export_fields: list=['domain','severity']): + blocklist: Blocklist, filepath: str, export_fields: list = ["domain", "severity"] +): """Save a blocklist we've downloaded from a remote source @param blocklist: A dictionary of block definitions, keyed by domain @@ -683,25 +806,25 @@ def save_blocklist_to_file( except AttributeError: log.error("Attribute error!") import pdb + pdb.set_trace() log.debug(f"export fields: {export_fields}") with open(filepath, "w") as fp: - writer = csv.DictWriter(fp, export_fields, extrasaction='ignore') + writer = csv.DictWriter(fp, export_fields, extrasaction="ignore") writer.writeheader() for key, value in sorted_list: writer.writerow(value) -def save_domain_block_audit_to_file( - blocklist: BlockAuditList, - filepath: str): + +def save_domain_block_audit_to_file(blocklist: BlockAuditList, filepath: str): """Save an audit log of domains blocked @param blocklist: A dictionary of block definitions, keyed by domain @param filepath: The path to the file the list should be saved in. """ - export_fields = ['domain', 'count', 'percent'] + export_fields = ["domain", "count", "percent"] try: sorted_list = sorted(blocklist.blocks.items()) @@ -711,19 +834,21 @@ def save_domain_block_audit_to_file( except AttributeError: log.error("Attribute error!") import pdb + pdb.set_trace() log.debug("exporting audit file") with open(filepath, "w") as fp: - writer = csv.DictWriter(fp, export_fields, extrasaction='ignore') + writer = csv.DictWriter(fp, export_fields, extrasaction="ignore") writer.writeheader() for key, value in sorted_list: writer.writerow(value) -def augment_args(args, tomldata: str=None): + +def augment_args(args, tomldata: str = None): """Augment commandline arguments with config file parameters - + If tomldata is provided, uses that data instead of loading from a config file. """ @@ -733,83 +858,165 @@ def augment_args(args, tomldata: str=None): conf = toml.load(args.config) if not args.no_fetch_url: - args.no_fetch_url = conf.get('no_fetch_url', False) + args.no_fetch_url = conf.get("no_fetch_url", False) if not args.no_fetch_instance: - args.no_fetch_instance = conf.get('no_fetch_instance', False) + args.no_fetch_instance = conf.get("no_fetch_instance", False) if not args.no_push_instance: - args.no_push_instance = conf.get('no_push_instance', False) + args.no_push_instance = conf.get("no_push_instance", False) if not args.blocklist_savefile: - args.blocklist_savefile = conf.get('blocklist_savefile', None) + args.blocklist_savefile = conf.get("blocklist_savefile", None) if not args.save_intermediate: - args.save_intermediate = conf.get('save_intermediate', False) + args.save_intermediate = conf.get("save_intermediate", False) if not args.override_private_comment: - args.override_private_comment = conf.get('override_private_comment', None) - + args.override_private_comment = conf.get("override_private_comment", None) + if not args.savedir: - args.savedir = conf.get('savedir', '/tmp') + args.savedir = conf.get("savedir", "/tmp") if not args.blocklist_auditfile: - args.blocklist_auditfile = conf.get('blocklist_auditfile', None) + args.blocklist_auditfile = conf.get("blocklist_auditfile", None) if not args.export_fields: - args.export_fields = conf.get('export_fields', []) + args.export_fields = conf.get("export_fields", []) if not args.import_fields: - args.import_fields = conf.get('import_fields', []) + args.import_fields = conf.get("import_fields", []) if not args.mergeplan: - args.mergeplan = conf.get('mergeplan', 'max') + args.mergeplan = conf.get("mergeplan", "max") if not args.merge_threshold: - args.merge_threshold = conf.get('merge_threshold', 0) + args.merge_threshold = conf.get("merge_threshold", 0) if not args.merge_threshold_type: - args.merge_threshold_type = conf.get('merge_threshold_type', 'count') + args.merge_threshold_type = conf.get("merge_threshold_type", "count") - args.blocklist_url_sources = conf.get('blocklist_url_sources', []) - args.blocklist_instance_sources = conf.get('blocklist_instance_sources', []) - args.allowlist_url_sources = conf.get('allowlist_url_sources', []) - args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations', []) + args.blocklist_url_sources = conf.get("blocklist_url_sources", []) + args.blocklist_instance_sources = conf.get("blocklist_instance_sources", []) + args.allowlist_url_sources = conf.get("allowlist_url_sources", []) + args.blocklist_instance_destinations = conf.get( + "blocklist_instance_destinations", [] + ) return args + def setup_argparse(): - """Setup the commandline arguments - """ + """Setup the commandline arguments""" ap = argparse.ArgumentParser( description="Bulk blocklist tool", epilog=f"Part of FediBlockHole v{__version__}", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - ap.add_argument('-c', '--config', default='/etc/default/fediblockhole.conf.toml', help="Config file") - ap.add_argument('-V', '--version', action='store_true', help="Show version and exit.") - - ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.") - ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") - ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") - ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.") - ap.add_argument('-b', '--block-audit-file', dest="blocklist_auditfile", help="Save blocklist auditfile to this location.") - ap.add_argument('--merge-threshold', type=int, help="Merge threshold value") - ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.") - ap.add_argument('--override-private-comment', dest='override_private_comment', help="Override private_comment with this string for new blocks when pushing blocklists.") - - ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.") - ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.") - ap.add_argument('-A', '--allow', dest="allow_domains", action='append', default=[], help="Override any blocks to allow this domain.") - - ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.") - ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.") - ap.add_argument('--no-push-instance', dest='no_push_instance', action='store_true', help="Don't push to instances, even if configured.") - - ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.") - ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.") + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + ap.add_argument( + "-c", + "--config", + default="/etc/default/fediblockhole.conf.toml", + help="Config file", + ) + ap.add_argument( + "-V", "--version", action="store_true", help="Show version and exit." + ) + + ap.add_argument( + "-o", + "--outfile", + dest="blocklist_savefile", + help="Save merged blocklist to a local file.", + ) + ap.add_argument( + "-S", + "--save-intermediate", + dest="save_intermediate", + action="store_true", + help="Save intermediate blocklists we fetch to local files.", + ) + ap.add_argument( + "-D", + "--savedir", + dest="savedir", + help="Directory path to save intermediate lists.", + ) + ap.add_argument("-m", "--mergeplan", choices=["min", "max"], help="Set mergeplan.") + ap.add_argument( + "-b", + "--block-audit-file", + dest="blocklist_auditfile", + help="Save blocklist auditfile to this location.", + ) + ap.add_argument("--merge-threshold", type=int, help="Merge threshold value") + ap.add_argument( + "--merge-threshold-type", + choices=["count", "pct"], + help="Type of merge threshold to use.", + ) + ap.add_argument( + "--override-private-comment", + dest="override_private_comment", + help="Override private_comment with this string for new blocks when pushing blocklists.", # noqa + ) + + ap.add_argument( + "-I", + "--import-field", + dest="import_fields", + action="append", + help="Extra blocklist fields to import.", + ) + ap.add_argument( + "-E", + "--export-field", + dest="export_fields", + action="append", + help="Extra blocklist fields to export.", + ) + ap.add_argument( + "-A", + "--allow", + dest="allow_domains", + action="append", + default=[], + help="Override any blocks to allow this domain.", + ) + + ap.add_argument( + "--no-fetch-url", + dest="no_fetch_url", + action="store_true", + help="Don't fetch from URLs, even if configured.", + ) + ap.add_argument( + "--no-fetch-instance", + dest="no_fetch_instance", + action="store_true", + help="Don't fetch from instances, even if configured.", + ) + ap.add_argument( + "--no-push-instance", + dest="no_push_instance", + action="store_true", + help="Don't push to instances, even if configured.", + ) + + ap.add_argument( + "--loglevel", + choices=["debug", "info", "warning", "error", "critical"], + help="Set log output level.", + ) + ap.add_argument( + "--dryrun", + action="store_true", + help="Don't actually push updates, just show what would happen.", + ) return ap + def main(): ap = setup_argparse() diff --git a/src/fediblockhole/blocklists.py b/src/fediblockhole/blocklists.py index 30781b9..c3fc62f 100644 --- a/src/fediblockhole/blocklists.py +++ b/src/fediblockhole/blocklists.py @@ -1,22 +1,26 @@ """Parse various blocklist data formats """ + from __future__ import annotations + import csv import json -from typing import Iterable +import logging from dataclasses import dataclass, field +from typing import Iterable -from .const import DomainBlock, BlockSeverity, BlockAudit +from .const import BlockAudit, BlockSeverity, DomainBlock + +log = logging.getLogger("fediblockhole") -import logging -log = logging.getLogger('fediblockhole') @dataclass class Blocklist: - """ A Blocklist object + """A Blocklist object A Blocklist is a list of DomainBlocks from an origin """ + origin: str = None blocks: dict[str, DomainBlock] = field(default_factory=dict) @@ -38,12 +42,14 @@ def items(self): def values(self): return self.blocks.values() -@dataclass + +@dataclass class BlockAuditList: - """ A BlockAuditlist object + """A BlockAuditlist object A BlockAuditlist is a list of BlockAudits from an origin """ + origin: str = None blocks: dict[str, BlockAudit] = field(default_factory=dict) @@ -65,14 +71,19 @@ def items(self): def values(self): return self.blocks.values() + class BlocklistParser(object): """ Base class for parsing blocklists """ + do_preparse = False - def __init__(self, import_fields: list=['domain', 'severity'], - max_severity: str='suspend'): + def __init__( + self, + import_fields: list = ["domain", "severity"], + max_severity: str = "suspend", + ): """Create a Parser @param import_fields: an optional list of fields to limit the parser to. @@ -82,11 +93,10 @@ def __init__(self, import_fields: list=['domain', 'severity'], self.max_severity = BlockSeverity(max_severity) def preparse(self, blockdata) -> Iterable: - """Some raw datatypes need to be converted into an iterable - """ + """Some raw datatypes need to be converted into an iterable""" raise NotImplementedError - def parse_blocklist(self, blockdata, origin:str=None) -> Blocklist: + def parse_blocklist(self, blockdata, origin: str = None) -> Blocklist: """Parse an iterable of blocklist items @param blocklist: An Iterable of blocklist items @returns: A dict of DomainBlocks, keyed by domain @@ -99,7 +109,7 @@ def parse_blocklist(self, blockdata, origin:str=None) -> Blocklist: block = self.parse_item(blockitem) parsed_list.blocks[block.domain] = block return parsed_list - + def parse_item(self, blockitem) -> DomainBlock: """Parse an individual block item @@ -108,13 +118,15 @@ def parse_item(self, blockitem) -> DomainBlock: """ raise NotImplementedError + class BlocklistParserJSON(BlocklistParser): """Parse a JSON formatted blocklist""" + do_preparse = True def preparse(self, blockdata) -> Iterable: """Parse the blockdata as JSON if needed""" - if type(blockdata) == type(''): + if type(blockdata) is type(""): return json.loads(blockdata) return blockdata @@ -126,51 +138,53 @@ def parse_item(self, blockitem: dict) -> DomainBlock: del blockitem[key] # Convert dict to NamedTuple with the double-star operator - # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments + # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments # noqa block = DomainBlock(**blockitem) if block.severity > self.max_severity: block.severity = self.max_severity return block + class BlocklistParserMastodonAPIPublic(BlocklistParserJSON): """The public blocklist API is slightly different to the admin one""" - + def parse_item(self, blockitem: dict) -> DomainBlock: # Remove fields we don't want to import origitem = blockitem.copy() for key in origitem: # The Mastodon public API uses the 'public' field # to mean 'public_comment' because what even is consistency? - if key == 'comment': - key = 'public_comment' - blockitem['public_comment'] = blockitem['comment'] - del blockitem['comment'] + if key == "comment": + key = "public_comment" + blockitem["public_comment"] = blockitem["comment"] + del blockitem["comment"] if key not in self.import_fields: del blockitem[key] # Convert dict to NamedTuple with the double-star operator - # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments + # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments # noqa block = DomainBlock(**blockitem) if block.severity > self.max_severity: block.severity = self.max_severity return block + class BlocklistParserCSV(BlocklistParser): - """ Parse CSV formatted blocklists + """Parse CSV formatted blocklists The parser expects the CSV data to include a header with the field names. """ + do_preparse = True def preparse(self, blockdata) -> Iterable: - """Use a csv.DictReader to create an iterable from the blockdata - """ - return csv.DictReader(blockdata.split('\n')) + """Use a csv.DictReader to create an iterable from the blockdata""" + return csv.DictReader(blockdata.split("\n")) def parse_item(self, blockitem: dict) -> DomainBlock: # Coerce booleans from string to Python bool # FIXME: Is this still necessary with the DomainBlock object? - for boolkey in ['reject_media', 'reject_reports', 'obfuscate']: + for boolkey in ["reject_media", "reject_reports", "obfuscate"]: if boolkey in blockitem: blockitem[boolkey] = str2bool(blockitem[boolkey]) @@ -182,71 +196,73 @@ def parse_item(self, blockitem: dict) -> DomainBlock: del blockitem[key] # Convert dict to DomainBlock with the double-star operator - # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments + # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments # noqa block = DomainBlock(**blockitem) if block.severity > self.max_severity: block.severity = self.max_severity return block + class BlocklistParserMastodonCSV(BlocklistParserCSV): - """ Parse Mastodon CSV formatted blocklists + """Parse Mastodon CSV formatted blocklists The Mastodon v4.1.x domain block CSV export prefixes its field names with a '#' character becauseā€¦ reasons? """ + do_preparse = True def parse_item(self, blockitem: dict) -> DomainBlock: - """Build a new blockitem dict with new un-#ed keys - """ + """Build a new blockitem dict with new un-#ed keys""" newdict = {} for key in blockitem: - newkey = key.lstrip('#') + newkey = key.lstrip("#") newdict[newkey] = blockitem[key] return super().parse_item(newdict) + class RapidBlockParserCSV(BlocklistParserCSV): - """ Parse RapidBlock CSV blocklists + """Parse RapidBlock CSV blocklists RapidBlock CSV blocklists are just a newline separated list of domains. """ + def preparse(self, blockdata) -> Iterable: - """Prepend a 'domain' field header to the data - """ + """Prepend a 'domain' field header to the data""" log.debug(f"blockdata: {blockdata[:100]}") - blockdata = ''.join(["domain\r\n", blockdata]) + blockdata = "".join(["domain\r\n", blockdata]) + + return csv.DictReader(blockdata.split("\r\n")) - return csv.DictReader(blockdata.split('\r\n')) class RapidBlockParserJSON(BlocklistParserJSON): - """Parse RapidBlock JSON formatted blocklists - """ + """Parse RapidBlock JSON formatted blocklists""" + def preparse(self, blockdata) -> Iterable: rb_dict = json.loads(blockdata) # We want to iterate over all the dictionary items - return rb_dict['blocks'].items() + return rb_dict["blocks"].items() def parse_item(self, blockitem: tuple) -> DomainBlock: - """Parse an individual item in a RapidBlock list - """ + """Parse an individual item in a RapidBlock list""" # Each item is a tuple of: # (domain, {dictionary of attributes}) domain = blockitem[0] # RapidBlock has a binary block level which we map # to 'suspend' if True, and 'noop' if False. - isblocked = blockitem[1]['isBlocked'] + isblocked = blockitem[1]["isBlocked"] if isblocked: - severity = 'suspend' + severity = "suspend" else: - severity = 'noop' - - if 'public_comment' in self.import_fields: - public_comment = blockitem[1]['reason'] + severity = "noop" + + if "public_comment" in self.import_fields: + public_comment = blockitem[1]["reason"] else: - public_comment = '' - + public_comment = "" + # There's a 'tags' field as well, but we can't # do much with that in Mastodon yet @@ -256,36 +272,38 @@ def parse_item(self, blockitem: tuple) -> DomainBlock: return block + def str2bool(boolstring: str) -> bool: - """Helper function to convert boolean strings to actual Python bools - """ + """Helper function to convert boolean strings to actual Python bools""" boolstring = boolstring.lower() - if boolstring in ['true', 't', '1', 'y', 'yes']: + if boolstring in ["true", "t", "1", "y", "yes"]: return True - elif boolstring in ['', 'false', 'f', '0', 'n', 'no']: + elif boolstring in ["", "false", "f", "0", "n", "no"]: return False else: raise ValueError(f"Cannot parse value '{boolstring}' as boolean") + FORMAT_PARSERS = { - 'csv': BlocklistParserCSV, - 'mastodon_csv': BlocklistParserMastodonCSV, - 'json': BlocklistParserJSON, - 'mastodon_api_public': BlocklistParserMastodonAPIPublic, - 'rapidblock.csv': RapidBlockParserCSV, - 'rapidblock.json': RapidBlockParserJSON, + "csv": BlocklistParserCSV, + "mastodon_csv": BlocklistParserMastodonCSV, + "json": BlocklistParserJSON, + "mastodon_api_public": BlocklistParserMastodonAPIPublic, + "rapidblock.csv": RapidBlockParserCSV, + "rapidblock.json": RapidBlockParserJSON, } + # helper function to select the appropriate Parser def parse_blocklist( blockdata, origin, format="csv", - import_fields: list=['domain', 'severity'], - max_severity: str='suspend'): - """Parse a blocklist in the given format - """ + import_fields: list = ["domain", "severity"], + max_severity: str = "suspend", +): + """Parse a blocklist in the given format""" log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...") parser = FORMAT_PARSERS[format](import_fields, max_severity) - return parser.parse_blocklist(blockdata, origin) \ No newline at end of file + return parser.parse_blocklist(blockdata, origin) diff --git a/src/fediblockhole/const.py b/src/fediblockhole/const.py index f741304..7c9a5f8 100644 --- a/src/fediblockhole/const.py +++ b/src/fediblockhole/const.py @@ -1,27 +1,29 @@ """ Constant objects used by FediBlockHole """ + from __future__ import annotations -import enum -from typing import NamedTuple, Optional, TypedDict -from dataclasses import dataclass +import enum import logging -log = logging.getLogger('fediblockhole') + +log = logging.getLogger("fediblockhole") + class SeverityLevel(enum.IntEnum): - """How severe should a block be? Higher is more severe. - """ + """How severe should a block be? Higher is more severe.""" + NONE = enum.auto() SILENCE = enum.auto() SUSPEND = enum.auto() + class BlockSeverity(object): """A representation of a block severity We add some helpful functions rather than using a bare IntEnum """ - def __init__(self, severity:str=None): + def __init__(self, severity: str = None): self._level = self.str2level(severity) @property @@ -32,21 +34,21 @@ def level(self): def level(self, value): if isinstance(value, SeverityLevel): self._level = value - elif type(value) == type(''): + elif type(value) is type(""): self._level = self.str2level(value) else: raise ValueError(f"Invalid level value '{value}'") - def str2level(self, severity:str=None): + def str2level(self, severity: str = None): """Convert a string severity level to an internal enum""" - if severity in [None, '', 'noop']: + if severity in [None, "", "noop"]: return SeverityLevel.NONE - elif severity in ['silence']: + elif severity in ["silence"]: return SeverityLevel.SILENCE - - elif severity in ['suspend']: + + elif severity in ["suspend"]: return SeverityLevel.SUSPEND else: @@ -56,12 +58,11 @@ def __repr__(self): return f"'{str(self)}'" def __str__(self): - """A string version of the severity level - """ + """A string version of the severity level""" levelmap = { - SeverityLevel.NONE: 'noop', - SeverityLevel.SILENCE: 'silence', - SeverityLevel.SUSPEND: 'suspend', + SeverityLevel.NONE: "noop", + SeverityLevel.SILENCE: "silence", + SeverityLevel.SUSPEND: "suspend", } return levelmap[self.level] @@ -84,43 +85,34 @@ def __le__(self, other): def __ge__(self, other): if self._level >= other._level: return True - + + class BlockAudit(object): fields = [ - 'domain', - 'count', - 'percent', + "domain", + "count", + "percent", ] - all_fields = [ - 'domain', - 'count', - 'percent', - 'id' - ] + all_fields = ["domain", "count", "percent", "id"] - def __init__(self, domain:str, - count: int=0, - percent: int=0, - id: int=None): - """Initialize the BlockAudit - """ + def __init__(self, domain: str, count: int = 0, percent: int = 0, id: int = None): + """Initialize the BlockAudit""" self.domain = domain self.count = count self.percent = percent self.id = id def _asdict(self): - """Return a dict version of this object - """ + """Return a dict version of this object""" dictval = { - 'domain': self.domain, - 'count': self.count, - 'percent': self.percent, + "domain": self.domain, + "count": self.count, + "percent": self.percent, } if self.id: - dictval['id'] = self.id + dictval["id"] = self.id return dictval @@ -129,14 +121,12 @@ def __repr__(self): return f"" def copy(self): - """Make a copy of this object and return it - """ + """Make a copy of this object and return it""" retval = BlockAudit(**self._asdict()) return retval def update(self, dict): - """Update my kwargs - """ + """Update my kwargs""" for key in dict: setattr(self, key, dict[key]) @@ -144,8 +134,8 @@ def __iter__(self): """Be iterable""" keys = self.fields - if getattr(self, 'id', False): - keys.append('id') + if getattr(self, "id", False): + keys.append("id") for k in keys: yield k @@ -160,6 +150,7 @@ def __getitem__(self, k, default=None): def get(self, k, default=None): return self.__getitem__(k, default) + # class _DomainBlock(NamedTuple): # domain: str # FIXME: Use an actual Domain object from somewhere? # severity: BlockSeverity = BlockSeverity.SUSPEND @@ -169,39 +160,42 @@ def get(self, k, default=None): # reject_reports: bool = False # obfuscate: bool = False + class DomainBlock(object): fields = [ - 'domain', - 'severity', - 'public_comment', - 'private_comment', - 'reject_media', - 'reject_reports', - 'obfuscate', + "domain", + "severity", + "public_comment", + "private_comment", + "reject_media", + "reject_reports", + "obfuscate", ] all_fields = [ - 'domain', - 'severity', - 'public_comment', - 'private_comment', - 'reject_media', - 'reject_reports', - 'obfuscate', - 'id' + "domain", + "severity", + "public_comment", + "private_comment", + "reject_media", + "reject_reports", + "obfuscate", + "id", ] - def __init__(self, domain:str, - severity: BlockSeverity=BlockSeverity('suspend'), - public_comment: str="", - private_comment: str="", - reject_media: bool=False, - reject_reports: bool=False, - obfuscate: bool=False, - id: int=None): - """Initialize the DomainBlock - """ + def __init__( + self, + domain: str, + severity: BlockSeverity = BlockSeverity("suspend"), + public_comment: str = "", + private_comment: str = "", + reject_media: bool = False, + reject_reports: bool = False, + obfuscate: bool = False, + id: int = None, + ): + """Initialize the DomainBlock""" self.domain = domain self.severity = severity self.public_comment = public_comment @@ -223,23 +217,22 @@ def severity(self, sev): self._severity = BlockSeverity(sev) def _asdict(self): - """Return a dict version of this object - """ + """Return a dict version of this object""" dictval = { - 'domain': self.domain, - 'severity': str(self.severity), - 'public_comment': self.public_comment, - 'private_comment': self.private_comment, - 'reject_media': self.reject_media, - 'reject_reports': self.reject_reports, - 'obfuscate': self.obfuscate, + "domain": self.domain, + "severity": str(self.severity), + "public_comment": self.public_comment, + "private_comment": self.private_comment, + "reject_media": self.reject_media, + "reject_reports": self.reject_reports, + "obfuscate": self.obfuscate, } if self.id: - dictval['id'] = self.id + dictval["id"] = self.id return dictval - def compare_fields(self, other, fields=None)->list: + def compare_fields(self, other, fields=None) -> list: """Compare two DomainBlocks on specific fields. If all the fields are equal, the DomainBlocks are equal. @@ -254,9 +247,6 @@ def compare_fields(self, other, fields=None)->list: diffs = [] # Check if all the fields are equal for field in self.fields: - a = getattr(self, field) - b = getattr(other, field) - # log.debug(f"Comparing field {field}: '{a}' <> '{b}'") if getattr(self, field) != getattr(other, field): diffs.append(field) return diffs @@ -271,14 +261,12 @@ def __repr__(self): return f"" def copy(self): - """Make a copy of this object and return it - """ + """Make a copy of this object and return it""" retval = DomainBlock(**self._asdict()) return retval def update(self, dict): - """Update my kwargs - """ + """Update my kwargs""" for key in dict: setattr(self, key, dict[key]) @@ -286,8 +274,8 @@ def __iter__(self): """Be iterable""" keys = self.fields - if getattr(self, 'id', False): - keys.append('id') + if getattr(self, "id", False): + keys.append("id") for k in keys: yield k @@ -300,4 +288,4 @@ def __getitem__(self, k, default=None): return getattr(self, k, default) def get(self, k, default=None): - return self.__getitem__(k, default) \ No newline at end of file + return self.__getitem__(k, default) diff --git a/tests/conftest.py b/tests/conftest.py index 501ed17..f0de967 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ -import sys import os -sys.path.append(os.path.join(os.path.dirname(__file__), 'helpers')) \ No newline at end of file +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), "helpers")) diff --git a/tests/test_allowlist.py b/tests/test_allowlist.py index ddd53b9..0f471c3 100644 --- a/tests/test_allowlist.py +++ b/tests/test_allowlist.py @@ -1,75 +1,91 @@ """ Test allowlists """ -import pytest +import pytest from util import shim_argparse -from fediblockhole.const import DomainBlock + +from fediblockhole import apply_allowlists from fediblockhole.blocklists import Blocklist -from fediblockhole import fetch_allowlists, apply_allowlists +from fediblockhole.const import DomainBlock + def test_cmdline_allow_removes_domain(): - """Test that -A removes entries from merged - """ - conf = shim_argparse(['-A', 'removeme.org']) + """Test that -A removes entries from merged""" + conf = shim_argparse(["-A", "removeme.org"]) - merged = Blocklist('test_allowlist.merged', { - 'example.org': DomainBlock('example.org'), - 'example2.org': DomainBlock('example2.org'), - 'removeme.org': DomainBlock('removeme.org'), - 'keepblockingme.org': DomainBlock('keepblockingme.org'), - }) + merged = Blocklist( + "test_allowlist.merged", + { + "example.org": DomainBlock("example.org"), + "example2.org": DomainBlock("example2.org"), + "removeme.org": DomainBlock("removeme.org"), + "keepblockingme.org": DomainBlock("keepblockingme.org"), + }, + ) merged = apply_allowlists(merged, conf, {}) with pytest.raises(KeyError): - merged['removeme.org'] + merged["removeme.org"] + def test_allowlist_removes_domain(): - """Test that an item in an allowlist removes entries from merged - """ + """Test that an item in an allowlist removes entries from merged""" conf = shim_argparse() - merged = Blocklist('test_allowlist.merged', { - 'example.org': DomainBlock('example.org'), - 'example2.org': DomainBlock('example2.org'), - 'removeme.org': DomainBlock('removeme.org'), - 'keepblockingme.org': DomainBlock('keepblockingme.org'), - }) + merged = Blocklist( + "test_allowlist.merged", + { + "example.org": DomainBlock("example.org"), + "example2.org": DomainBlock("example2.org"), + "removeme.org": DomainBlock("removeme.org"), + "keepblockingme.org": DomainBlock("keepblockingme.org"), + }, + ) allowlists = [ - Blocklist('test_allowlist', { - 'removeme.org': DomainBlock('removeme.org', 'noop'), - }) + Blocklist( + "test_allowlist", + { + "removeme.org": DomainBlock("removeme.org", "noop"), + }, + ) ] - + merged = apply_allowlists(merged, conf, allowlists) with pytest.raises(KeyError): - merged['removeme.org'] + merged["removeme.org"] + def test_allowlist_removes_tld(): - """Test that an item in an allowlist removes entries from merged - """ + """Test that an item in an allowlist removes entries from merged""" conf = shim_argparse() - merged = Blocklist('test_allowlist.merged', { - '.cf': DomainBlock('.cf'), - 'example.org': DomainBlock('example.org'), - '.tk': DomainBlock('.tk'), - 'keepblockingme.org': DomainBlock('keepblockingme.org'), - }) + merged = Blocklist( + "test_allowlist.merged", + { + ".cf": DomainBlock(".cf"), + "example.org": DomainBlock("example.org"), + ".tk": DomainBlock(".tk"), + "keepblockingme.org": DomainBlock("keepblockingme.org"), + }, + ) allowlists = [ - Blocklist('test_allowlist.list1', { - '.cf': DomainBlock('.cf', 'noop'), - '.tk': DomainBlock('.tk', 'noop'), - }) + Blocklist( + "test_allowlist.list1", + { + ".cf": DomainBlock(".cf", "noop"), + ".tk": DomainBlock(".tk", "noop"), + }, + ) ] - + merged = apply_allowlists(merged, conf, allowlists) with pytest.raises(KeyError): - merged['.cf'] + merged[".cf"] with pytest.raises(KeyError): - merged['.tk'] \ No newline at end of file + merged[".tk"] diff --git a/tests/test_blockseverity.py b/tests/test_blockseverity.py index b0e50f8..1327024 100644 --- a/tests/test_blockseverity.py +++ b/tests/test_blockseverity.py @@ -1,68 +1,73 @@ -from fediblockhole.const import BlockSeverity, SeverityLevel +from fediblockhole.const import BlockSeverity + def test_severity_eq(): - s1 = BlockSeverity('suspend') - s2 = BlockSeverity('suspend') + s1 = BlockSeverity("suspend") + s2 = BlockSeverity("suspend") assert s1 == s2 - s3 = BlockSeverity('silence') - s4 = BlockSeverity('silence') + s3 = BlockSeverity("silence") + s4 = BlockSeverity("silence") assert s3 == s4 - s5 = BlockSeverity('noop') - s6 = BlockSeverity('noop') + s5 = BlockSeverity("noop") + s6 = BlockSeverity("noop") assert s5 == s6 + def test_severity_ne(): - s1 = BlockSeverity('noop') - s2 = BlockSeverity('silence') - s3 = BlockSeverity('suspend') + s1 = BlockSeverity("noop") + s2 = BlockSeverity("silence") + s3 = BlockSeverity("suspend") assert s1 != s2 assert s2 != s3 assert s1 != s3 + def test_severity_lt(): - s1 = BlockSeverity('noop') - s2 = BlockSeverity('silence') - s3 = BlockSeverity('suspend') + s1 = BlockSeverity("noop") + s2 = BlockSeverity("silence") + s3 = BlockSeverity("suspend") assert s1 < s2 assert s2 < s3 assert s1 < s3 + def test_severity_gt(): - s1 = BlockSeverity('noop') - s2 = BlockSeverity('silence') - s3 = BlockSeverity('suspend') + s1 = BlockSeverity("noop") + s2 = BlockSeverity("silence") + s3 = BlockSeverity("suspend") assert s2 > s1 assert s3 > s2 assert s3 > s1 + def test_severity_le(): - s1 = BlockSeverity('noop') - s2 = BlockSeverity('silence') - s2a = BlockSeverity('silence') - s3 = BlockSeverity('suspend') + s1 = BlockSeverity("noop") + s2 = BlockSeverity("silence") + s2a = BlockSeverity("silence") + s3 = BlockSeverity("suspend") assert s1 <= s2 assert s2a <= s2 assert s2 <= s3 assert s1 <= s3 + def test_severity_ge(): - s1 = BlockSeverity('noop') - s2 = BlockSeverity('silence') - s2a = BlockSeverity('silence') - s3 = BlockSeverity('suspend') + s1 = BlockSeverity("noop") + s2 = BlockSeverity("silence") + s2a = BlockSeverity("silence") + s3 = BlockSeverity("suspend") assert s2 >= s1 assert s2a >= s1 assert s3 >= s2 assert s3 >= s1 - diff --git a/tests/test_cmdline.py b/tests/test_cmdline.py index 46b5748..b5bbb6f 100644 --- a/tests/test_cmdline.py +++ b/tests/test_cmdline.py @@ -1,47 +1,51 @@ """Test the commandline defined parameters correctly """ -from util import shim_argparse -from fediblockhole import setup_argparse, augment_args + +from fediblockhole import setup_argparse + def test_cmdline_no_configfile(): - """ Test bare command with no configfile - """ + """Test bare command with no configfile""" ap = setup_argparse() args = ap.parse_args([]) - assert args.config == '/etc/default/fediblockhole.conf.toml' - assert args.mergeplan == None - assert args.blocklist_savefile == None - assert args.save_intermediate == False - assert args.savedir == None - assert args.import_fields == None - assert args.export_fields == None + assert args.config == "/etc/default/fediblockhole.conf.toml" + assert args.mergeplan is None + assert args.blocklist_savefile is None + assert args.save_intermediate is False + assert args.savedir is None + assert args.import_fields is None + assert args.export_fields is None + + assert args.no_fetch_url is False + assert args.no_fetch_instance is False + assert args.no_push_instance is False + assert args.dryrun is False - assert args.no_fetch_url == False - assert args.no_fetch_instance == False - assert args.no_push_instance == False - assert args.dryrun == False + assert args.loglevel is None - assert args.loglevel == None def test_cmdline_mergeplan_min(): - """ Test setting mergeplan min - """ + """Test setting mergeplan min""" ap = setup_argparse() - args = ap.parse_args(['-m', 'min']) + args = ap.parse_args(["-m", "min"]) + + assert args.mergeplan == "min" - assert args.mergeplan == 'min' def test_set_allow_domain(): """Set a single allow domain on commandline""" ap = setup_argparse() - args = ap.parse_args(['-A', 'example.org']) + args = ap.parse_args(["-A", "example.org"]) + + assert args.allow_domains == ["example.org"] - assert args.allow_domains == ['example.org'] def test_set_multiple_allow_domains(): """Set multiple allow domains on commandline""" ap = setup_argparse() - args = ap.parse_args(['-A', 'example.org', '-A', 'example2.org', '-A', 'example3.org']) + args = ap.parse_args( + ["-A", "example.org", "-A", "example2.org", "-A", "example3.org"] + ) - assert args.allow_domains == ['example.org', 'example2.org', 'example3.org'] \ No newline at end of file + assert args.allow_domains == ["example.org", "example2.org", "example3.org"] diff --git a/tests/test_configfile.py b/tests/test_configfile.py index 9e31c9d..9f2410c 100644 --- a/tests/test_configfile.py +++ b/tests/test_configfile.py @@ -1,15 +1,18 @@ """Test the config file is loading parameters correctly """ + from util import shim_argparse -from fediblockhole import setup_argparse, augment_args + +from fediblockhole import augment_args, setup_argparse + def test_parse_tomldata(): tomldata = """ # Test TOML config for FediBlockHole -blocklist_instance_sources = [] +blocklist_instance_sources = [] -blocklist_url_sources = [] +blocklist_url_sources = [] save_intermediate = true @@ -21,42 +24,49 @@ def test_parse_tomldata(): assert args.blocklist_instance_sources == [] assert args.blocklist_url_sources == [] - assert args.save_intermediate == True - assert args.import_fields == ['public_comment'] + assert args.save_intermediate is True + assert args.import_fields == ["public_comment"] + def test_set_mergeplan_max(): tomldata = """mergeplan = 'max' """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'max' + assert args.mergeplan == "max" + def test_set_mergeplan_min(): tomldata = """mergeplan = 'min' """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'min' + assert args.mergeplan == "min" + def test_set_allowlists(): tomldata = """# Comment on config -allowlist_url_sources = [ { url='file:///path/to/allowlist', format='csv'} ] +allowlist_url_sources = [ { url='file:///path/to/allowlist', format='csv'} ] """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'max' - assert args.allowlist_url_sources == [{ - 'url': 'file:///path/to/allowlist', - 'format': 'csv', - }] + assert args.mergeplan == "max" + assert args.allowlist_url_sources == [ + { + "url": "file:///path/to/allowlist", + "format": "csv", + } + ] + def test_set_merge_thresold_default(): tomldata = """ """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'max' - assert args.merge_threshold_type == 'count' + assert args.mergeplan == "max" + assert args.merge_threshold_type == "count" + def test_set_merge_thresold_count(): tomldata = """# Add a merge threshold @@ -65,10 +75,11 @@ def test_set_merge_thresold_count(): """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'max' - assert args.merge_threshold_type == 'count' + assert args.mergeplan == "max" + assert args.merge_threshold_type == "count" assert args.merge_threshold == 2 + def test_set_merge_thresold_pct(): tomldata = """# Add a merge threshold merge_threshold_type = 'pct' @@ -76,6 +87,6 @@ def test_set_merge_thresold_pct(): """ args = shim_argparse([], tomldata) - assert args.mergeplan == 'max' - assert args.merge_threshold_type == 'pct' + assert args.mergeplan == "max" + assert args.merge_threshold_type == "pct" assert args.merge_threshold == 35 diff --git a/tests/test_domainblock.py b/tests/test_domainblock.py index 2db0b51..9eab4b4 100644 --- a/tests/test_domainblock.py +++ b/tests/test_domainblock.py @@ -1,74 +1,88 @@ """Test the DomainBlock structure """ + import pytest -from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel +from fediblockhole.const import BlockSeverity, DomainBlock, SeverityLevel + def test_blocksev_blankstring(): - a = BlockSeverity('') + a = BlockSeverity("") assert a.level == SeverityLevel.NONE + def test_blocksev_string_noop(): - a = BlockSeverity('noop') + a = BlockSeverity("noop") assert a.level == SeverityLevel.NONE + def test_blocksev_none(): a = BlockSeverity(None) assert a.level == SeverityLevel.NONE + def test_empty_domainblock_fails(): with pytest.raises(TypeError): - a = DomainBlock() + a = DomainBlock() # noqa + def test_default_suspend(): - a = DomainBlock('example.org') - assert a.domain == 'example.org' + a = DomainBlock("example.org") + assert a.domain == "example.org" assert a.severity.level == SeverityLevel.SUSPEND + def test_severity_suspend(): - a = DomainBlock('example.org', 'suspend') - assert a.domain == 'example.org' + a = DomainBlock("example.org", "suspend") + assert a.domain == "example.org" assert a.severity.level == SeverityLevel.SUSPEND + def test_severity_silence(): - a = DomainBlock('example.org', 'silence') - assert a.domain == 'example.org' + a = DomainBlock("example.org", "silence") + assert a.domain == "example.org" assert a.severity.level == SeverityLevel.SILENCE + def test_severity_noop_string(): - a = DomainBlock('example.org', 'noop') - assert a.domain == 'example.org' + a = DomainBlock("example.org", "noop") + assert a.domain == "example.org" assert a.severity.level == SeverityLevel.NONE + def test_severity_none(): - a = DomainBlock('example.org', None) - assert a.domain == 'example.org' + a = DomainBlock("example.org", None) + assert a.domain == "example.org" assert a.severity.level == SeverityLevel.NONE + def test_compare_equal_blocks(): - a = DomainBlock('example1.org', 'suspend') - b = DomainBlock('example1.org', 'suspend') + a = DomainBlock("example1.org", "suspend") + b = DomainBlock("example1.org", "suspend") assert a == b + def test_compare_diff_domains(): - a = DomainBlock('example1.org', 'suspend') - b = DomainBlock('example2.org', 'suspend') + a = DomainBlock("example1.org", "suspend") + b = DomainBlock("example2.org", "suspend") assert a != b + def test_compare_diff_sevs(): - a = DomainBlock('example1.org', 'suspend') - b = DomainBlock('example1.org', 'silence') + a = DomainBlock("example1.org", "suspend") + b = DomainBlock("example1.org", "silence") assert a != b + def test_compare_diff_sevs_2(): - a = DomainBlock('example1.org', 'suspend') - b = DomainBlock('example1.org', 'noop') + a = DomainBlock("example1.org", "suspend") + b = DomainBlock("example1.org", "noop") assert a != b diff --git a/tests/test_merge_comments.py b/tests/test_merge_comments.py index a6ae8b1..ec3734f 100644 --- a/tests/test_merge_comments.py +++ b/tests/test_merge_comments.py @@ -1,68 +1,74 @@ """ Test merging of comments """ -import pytest from fediblockhole import merge_comments + def test_merge_blank_comments(): - - oldcomment = '' - newcomment = '' + + oldcomment = "" + newcomment = "" merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == '' + assert merged_comment == "" + def test_merge_None_comments(): - + oldcomment = None newcomment = None merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == '' + assert merged_comment == "" + def test_merge_oldstr_newNone(): - - oldcomment = 'fred, bibble' + + oldcomment = "fred, bibble" newcomment = None merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == 'fred, bibble' + assert merged_comment == "fred, bibble" + def test_merge_oldempty_newcomment(): - - oldcomment = '' - newcomment = 'fred, bibble' + + oldcomment = "" + newcomment = "fred, bibble" merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == 'fred, bibble' + assert merged_comment == "fred, bibble" + def test_merge_oldNone_newcomment(): - + oldcomment = None - newcomment = 'fred, bibble' + newcomment = "fred, bibble" merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == 'fred, bibble' + assert merged_comment == "fred, bibble" + def test_merge_two_different(): - - oldcomment = 'happy, medium, spinning' - newcomment = 'fred, bibble' + + oldcomment = "happy, medium, spinning" + newcomment = "fred, bibble" merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == 'happy, medium, spinning, fred, bibble' + assert merged_comment == "happy, medium, spinning, fred, bibble" + def test_merge_overlaps(): - - oldcomment = 'happy, medium, spinning' - newcomment = 'fred, medium, bibble, spinning' + + oldcomment = "happy, medium, spinning" + newcomment = "fred, medium, bibble, spinning" merged_comment = merge_comments(oldcomment, newcomment) - assert merged_comment == 'happy, medium, spinning, fred, bibble' \ No newline at end of file + assert merged_comment == "happy, medium, spinning, fred, bibble" diff --git a/tests/test_merge_thresholds.py b/tests/test_merge_thresholds.py index 4cde03e..4f6410f 100644 --- a/tests/test_merge_thresholds.py +++ b/tests/test_merge_thresholds.py @@ -1,25 +1,25 @@ """Test merge with thresholds """ +from fediblockhole import merge_blocklists from fediblockhole.blocklists import Blocklist, parse_blocklist -from fediblockhole import merge_blocklists, apply_mergeplan - -from fediblockhole.const import SeverityLevel, DomainBlock +from fediblockhole.const import DomainBlock datafile01 = "data-suspends-01.csv" datafile02 = "data-silences-01.csv" datafile03 = "data-noop-01.csv" import_fields = [ - 'domain', - 'severity', - 'public_comment', - 'private_comment', - 'reject_media', - 'reject_reports', - 'obfuscate' + "domain", + "severity", + "public_comment", + "private_comment", + "reject_media", + "reject_reports", + "obfuscate", ] + def load_test_blocklist_data(datafiles): blocklists = [] @@ -27,127 +27,239 @@ def load_test_blocklist_data(datafiles): for df in datafiles: with open(df) as fp: data = fp.read() - bl = parse_blocklist(data, df, 'csv', import_fields) + bl = parse_blocklist(data, df, "csv", import_fields) blocklists.append(bl) - - return blocklists - -def test_mergeplan_count_2(): - """Only merge a block if present in 2 or more lists - """ - bl_1 = Blocklist('test01', { - 'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True), - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_2 = Blocklist('test2', { - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) + return blocklists - bl_3 = Blocklist('test3', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) - ml = merge_blocklists([bl_1, bl_2, bl_3], 'max', threshold=2) +def test_mergeplan_count_2(): + """Only merge a block if present in 2 or more lists""" + + bl_1 = Blocklist( + "test01", + { + "onemention.example.org": DomainBlock( + "onemention.example.org", "suspend", "", "", True, True, True + ), + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_2 = Blocklist( + "test2", + { + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_3 = Blocklist( + "test3", + { + "threemention.example.org": DomainBlock( # noqa + "threemention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( # noqa + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + ml = merge_blocklists([bl_1, bl_2, bl_3], "max", threshold=2) + + assert "onemention.example.org" not in ml + assert "twomention.example.org" in ml + assert "threemention.example.org" in ml - assert 'onemention.example.org' not in ml - assert 'twomention.example.org' in ml - assert 'threemention.example.org' in ml def test_mergeplan_count_3(): - """Only merge a block if present in 3 or more lists - """ - - bl_1 = Blocklist('test01', { - 'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True), - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) + """Only merge a block if present in 3 or more lists""" + + bl_1 = Blocklist( + "test01", + { + "onemention.example.org": DomainBlock( + "onemention.example.org", "suspend", "", "", True, True, True + ), + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_2 = Blocklist( + "test2", + { + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_3 = Blocklist( + "test3", + { + "threemention.example.org": DomainBlock( # noqa + "threemention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( # noqa + "threemention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + ml = merge_blocklists([bl_1, bl_2, bl_3], "max", threshold=3) + + assert "onemention.example.org" not in ml + assert "twomention.example.org" not in ml + assert "threemention.example.org" in ml - bl_2 = Blocklist('test2', { - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_3 = Blocklist('test3', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - }) - - ml = merge_blocklists([bl_1, bl_2, bl_3], 'max', threshold=3) - - assert 'onemention.example.org' not in ml - assert 'twomention.example.org' not in ml - assert 'threemention.example.org' in ml def test_mergeplan_pct_30(): - """Only merge a block if present in 2 or more lists - """ - - bl_1 = Blocklist('test01', { - 'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True), - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - - }) - - bl_2 = Blocklist('test2', { - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_3 = Blocklist('test3', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_4 = Blocklist('test4', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - ml = merge_blocklists([bl_1, bl_2, bl_3, bl_4], 'max', threshold=30, threshold_type='pct') + """Only merge a block if present in 2 or more lists""" + + bl_1 = Blocklist( + "test01", + { + "onemention.example.org": DomainBlock( + "onemention.example.org", "suspend", "", "", True, True, True + ), + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_2 = Blocklist( + "test2", + { + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_3 = Blocklist( + "test3", + { + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_4 = Blocklist( + "test4", + { + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + ml = merge_blocklists( + [bl_1, bl_2, bl_3, bl_4], "max", threshold=30, threshold_type="pct" + ) + + assert "onemention.example.org" not in ml + assert "twomention.example.org" in ml + assert "threemention.example.org" in ml + assert "fourmention.example.org" in ml - assert 'onemention.example.org' not in ml - assert 'twomention.example.org' in ml - assert 'threemention.example.org' in ml - assert 'fourmention.example.org' in ml def test_mergeplan_pct_55(): - """Only merge a block if present in 2 or more lists - """ - - bl_1 = Blocklist('test01', { - 'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True), - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - - }) - - bl_2 = Blocklist('test2', { - 'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True), - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_3 = Blocklist('test3', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - bl_4 = Blocklist('test4', { - 'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True), - 'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True), - }) - - ml = merge_blocklists([bl_1, bl_2, bl_3, bl_4], 'max', threshold=55, threshold_type='pct') - - assert 'onemention.example.org' not in ml - assert 'twomention.example.org' not in ml - assert 'threemention.example.org' in ml - assert 'fourmention.example.org' in ml \ No newline at end of file + """Only merge a block if present in 2 or more lists""" + + bl_1 = Blocklist( + "test01", + { + "onemention.example.org": DomainBlock( + "onemention.example.org", "suspend", "", "", True, True, True + ), + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_2 = Blocklist( + "test2", + { + "twomention.example.org": DomainBlock( + "twomention.example.org", "suspend", "", "", True, True, True + ), + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_3 = Blocklist( + "test3", + { + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + bl_4 = Blocklist( + "test4", + { + "threemention.example.org": DomainBlock( + "threemention.example.org", "suspend", "", "", True, True, True + ), + "fourmention.example.org": DomainBlock( + "fourmention.example.org", "suspend", "", "", True, True, True + ), + }, + ) + + ml = merge_blocklists( + [bl_1, bl_2, bl_3, bl_4], "max", threshold=55, threshold_type="pct" + ) + + assert "onemention.example.org" not in ml + assert "twomention.example.org" not in ml + assert "threemention.example.org" in ml + assert "fourmention.example.org" in ml diff --git a/tests/test_mergeplan.py b/tests/test_mergeplan.py index 42d2816..4afe32e 100644 --- a/tests/test_mergeplan.py +++ b/tests/test_mergeplan.py @@ -1,25 +1,25 @@ """Various mergeplan tests """ +from fediblockhole import apply_mergeplan, merge_blocklists, merge_comments from fediblockhole.blocklists import parse_blocklist -from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan - -from fediblockhole.const import SeverityLevel, DomainBlock +from fediblockhole.const import DomainBlock, SeverityLevel datafile01 = "data-suspends-01.csv" datafile02 = "data-silences-01.csv" datafile03 = "data-noop-01.csv" import_fields = [ - 'domain', - 'severity', - 'public_comment', - 'private_comment', - 'reject_media', - 'reject_reports', - 'obfuscate' + "domain", + "severity", + "public_comment", + "private_comment", + "reject_media", + "reject_reports", + "obfuscate", ] + def load_test_blocklist_data(datafiles): blocklists = [] @@ -27,30 +27,33 @@ def load_test_blocklist_data(datafiles): for df in datafiles: with open(df) as fp: data = fp.read() - bl = parse_blocklist(data, df, 'csv', import_fields) + bl = parse_blocklist(data, df, "csv", import_fields) blocklists.append(bl) - + return blocklists + def test_mergeplan_max(): """Test 'max' mergeplan""" blocklists = load_test_blocklist_data([datafile01, datafile02]) - bl = merge_blocklists(blocklists, 'max') + bl = merge_blocklists(blocklists, "max") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.SUSPEND + def test_mergeplan_min(): """Test 'max' mergeplan""" blocklists = load_test_blocklist_data([datafile01, datafile02]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.SILENCE + def test_mergeplan_default(): """Default mergeplan is max, so see if it's chosen""" blocklists = load_test_blocklist_data([datafile01, datafile02]) @@ -61,86 +64,96 @@ def test_mergeplan_default(): for key in bl: assert bl[key].severity.level == SeverityLevel.SUSPEND + def test_mergeplan_3_max(): """3 datafiles and mergeplan of 'max'""" blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'max') + bl = merge_blocklists(blocklists, "max") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.SUSPEND - assert bl[key].reject_media == True - assert bl[key].reject_reports == True - assert bl[key].obfuscate == True + assert bl[key].reject_media is True + assert bl[key].reject_reports is True + assert bl[key].obfuscate is True + def test_mergeplan_3_min(): """3 datafiles and mergeplan of 'min'""" blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.NONE - assert bl[key].reject_media == False - assert bl[key].reject_reports == False - assert bl[key].obfuscate == False + assert bl[key].reject_media is False + assert bl[key].reject_reports is False + assert bl[key].obfuscate is False + def test_mergeplan_noop_v_silence_max(): """Mergeplan of max should choose silence over noop""" blocklists = load_test_blocklist_data([datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'max') + bl = merge_blocklists(blocklists, "max") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.SILENCE + def test_mergeplan_noop_v_silence_min(): """Mergeplan of min should choose noop over silence""" blocklists = load_test_blocklist_data([datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 for key in bl: assert bl[key].severity.level == SeverityLevel.NONE + def test_merge_public_comment(): blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 - assert bl['public-comment.example.org'].public_comment == 'This is a public comment' + assert bl["public-comment.example.org"].public_comment == "This is a public comment" + def test_merge_private_comment(): blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 - assert bl['private-comment.example.org'].private_comment == 'This is a private comment' + assert ( + bl["private-comment.example.org"].private_comment == "This is a private comment" + ) + def test_merge_public_comments(): blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 - assert bl['diff-comment.example.org'].public_comment == 'Suspend public comment, Silence public comment, Noop public comment' + assert ( + bl["diff-comment.example.org"].public_comment + == "Suspend public comment, Silence public comment, Noop public comment" + ) + def test_merge_duplicate_comments(): - """The same comment on multiple sources shouldn't get added - """ + """The same comment on multiple sources shouldn't get added""" blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03]) - bl = merge_blocklists(blocklists, 'min') + bl = merge_blocklists(blocklists, "min") assert len(bl) == 13 - # Nope, this breaks. Need to rethink duplicate comment merge. - # assert bl['2diff-comment.example.org'].public_comment == 'Suspend comment 1, Public duplicate' def test_merge_comments_none(): @@ -149,52 +162,58 @@ def test_merge_comments_none(): r = merge_comments(a, b) - assert r == '' + assert r == "" + def test_merge_comments_empty(): - a = '' - b = '' + a = "" + b = "" r = merge_comments(a, b) - assert r == '' + assert r == "" + def test_merge_comments_left(): - a = 'comment to merge' - b = '' + a = "comment to merge" + b = "" r = merge_comments(a, b) - assert r == 'comment to merge' + assert r == "comment to merge" + def test_merge_comments_right(): - a = '' - b = 'comment to merge' + a = "" + b = "comment to merge" r = merge_comments(a, b) - assert r == 'comment to merge' + assert r == "comment to merge" + def test_merge_comments_same(): - a = 'comment to merge' - b = 'comment to merge' + a = "comment to merge" + b = "comment to merge" r = merge_comments(a, b) - assert r == 'comment to merge' + assert r == "comment to merge" + def test_merge_comments_diff(): - a = 'comment A' - b = 'comment B' + a = "comment A" + b = "comment B" r = merge_comments(a, b) - assert r == 'comment A, comment B' + assert r == "comment A, comment B" + def test_merge_comments_dups(): @@ -203,38 +222,41 @@ def test_merge_comments_dups(): r = merge_comments(a, b) - assert r == 'boring, nazis, lack of moderation, flagged, special, spoon, happy, fork' + assert ( + r == "boring, nazis, lack of moderation, flagged, special, spoon, happy, fork" + ) + def test_mergeplan_same_min_bools_false(): - """Test merging with mergeplan 'max' and False values doesn't change them - """ - a = DomainBlock('example.org', 'noop', '', '', False, False, False) - b = DomainBlock('example.org', 'noop', '', '', False, False, False) + """Test merging with mergeplan 'max' and False values doesn't change them""" + a = DomainBlock("example.org", "noop", "", "", False, False, False) + b = DomainBlock("example.org", "noop", "", "", False, False, False) - r = apply_mergeplan(a, b, 'max') + r = apply_mergeplan(a, b, "max") + + assert r.reject_media is False + assert r.reject_reports is False + assert r.obfuscate is False - assert r.reject_media == False - assert r.reject_reports == False - assert r.obfuscate == False def test_mergeplan_same_min_bools_true(): - """Test merging with mergeplan 'max' and True values doesn't change them - """ - a = DomainBlock('example.org', 'noop', '', '', True, False, True) - b = DomainBlock('example.org', 'noop', '', '', True, False, True) + """Test merging with mergeplan 'max' and True values doesn't change them""" + a = DomainBlock("example.org", "noop", "", "", True, False, True) + b = DomainBlock("example.org", "noop", "", "", True, False, True) + + r = apply_mergeplan(a, b, "max") - r = apply_mergeplan(a, b, 'max') + assert r.reject_media is True + assert r.reject_reports is False + assert r.obfuscate is True - assert r.reject_media == True - assert r.reject_reports == False - assert r.obfuscate == True def test_mergeplan_max_bools(): - a = DomainBlock('example.org', 'suspend', '', '', True, True, True) - b = DomainBlock('example.org', 'noop', '', '', False, False, False) + a = DomainBlock("example.org", "suspend", "", "", True, True, True) + b = DomainBlock("example.org", "noop", "", "", False, False, False) - r = apply_mergeplan(a, b, 'max') + r = apply_mergeplan(a, b, "max") - assert r.reject_media == True - assert r.reject_reports == True - assert r.obfuscate == True \ No newline at end of file + assert r.reject_media is True + assert r.reject_reports is True + assert r.obfuscate is True diff --git a/tests/test_parser_csv.py b/tests/test_parser_csv.py index 703fe95..8c55833 100644 --- a/tests/test_parser_csv.py +++ b/tests/test_parser_csv.py @@ -1,7 +1,7 @@ """Tests of the CSV parsing """ -from fediblockhole.blocklists import BlocklistParserCSV, parse_blocklist +from fediblockhole.blocklists import BlocklistParserCSV from fediblockhole.const import SeverityLevel @@ -13,6 +13,7 @@ def test_single_line(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 0 + def test_header_only(): csvdata = "domain,severity,public_comment" origin = "csvfile" @@ -21,6 +22,7 @@ def test_header_only(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 0 + def test_2_blocks(): csvdata = """domain,severity example.org,silence @@ -32,7 +34,8 @@ def test_2_blocks(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 2 - assert 'example.org' in bl + assert "example.org" in bl + def test_4_blocks(): csvdata = """domain,severity,public_comment @@ -47,15 +50,16 @@ def test_4_blocks(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 4 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].severity.level == SeverityLevel.SILENCE + assert bl["example2.org"].severity.level == SeverityLevel.SUSPEND + assert bl["example3.org"].severity.level == SeverityLevel.NONE + assert bl["example4.org"].severity.level == SeverityLevel.SUSPEND - assert bl['example.org'].severity.level == SeverityLevel.SILENCE - assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND - assert bl['example3.org'].severity.level == SeverityLevel.NONE - assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND def test_ignore_comments(): csvdata = """domain,severity,public_comment,private_comment @@ -70,12 +74,12 @@ def test_ignore_comments(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 4 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl - - assert bl['example.org'].public_comment == '' - assert bl['example.org'].private_comment == '' - assert bl['example3.org'].public_comment == '' - assert bl['example4.org'].private_comment == '' \ No newline at end of file + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].public_comment == "" + assert bl["example.org"].private_comment == "" + assert bl["example3.org"].public_comment == "" + assert bl["example4.org"].private_comment == "" diff --git a/tests/test_parser_csv_mastodon.py b/tests/test_parser_csv_mastodon.py index 6e85c71..45f758f 100644 --- a/tests/test_parser_csv_mastodon.py +++ b/tests/test_parser_csv_mastodon.py @@ -13,6 +13,7 @@ def test_single_line(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 0 + def test_header_only(): csvdata = "#domain,#severity,#public_comment" origin = "csvfile" @@ -21,6 +22,7 @@ def test_header_only(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 0 + def test_2_blocks(): csvdata = """domain,severity example.org,silence @@ -32,7 +34,8 @@ def test_2_blocks(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 2 - assert 'example.org' in bl + assert "example.org" in bl + def test_4_blocks(): csvdata = """domain,severity,public_comment @@ -47,15 +50,16 @@ def test_4_blocks(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 4 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].severity.level == SeverityLevel.SILENCE + assert bl["example2.org"].severity.level == SeverityLevel.SUSPEND + assert bl["example3.org"].severity.level == SeverityLevel.NONE + assert bl["example4.org"].severity.level == SeverityLevel.SUSPEND - assert bl['example.org'].severity.level == SeverityLevel.SILENCE - assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND - assert bl['example3.org'].severity.level == SeverityLevel.NONE - assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND def test_ignore_comments(): csvdata = """domain,severity,public_comment,private_comment @@ -70,12 +74,12 @@ def test_ignore_comments(): bl = parser.parse_blocklist(csvdata, origin) assert len(bl) == 4 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl - - assert bl['example.org'].public_comment == '' - assert bl['example.org'].private_comment == '' - assert bl['example3.org'].public_comment == '' - assert bl['example4.org'].private_comment == '' \ No newline at end of file + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].public_comment == "" + assert bl["example.org"].private_comment == "" + assert bl["example3.org"].public_comment == "" + assert bl["example4.org"].private_comment == "" diff --git a/tests/test_parser_json.py b/tests/test_parser_json.py index b2fb0a1..e640643 100644 --- a/tests/test_parser_json.py +++ b/tests/test_parser_json.py @@ -1,45 +1,48 @@ """Tests of the CSV parsing """ -from fediblockhole.blocklists import BlocklistParserJSON, parse_blocklist +from fediblockhole.blocklists import BlocklistParserJSON from fediblockhole.const import SeverityLevel -datafile = 'data-mastodon.json' +datafile = "data-mastodon.json" + def load_data(): with open(datafile) as fp: return fp.read() + def test_json_parser(): data = load_data() parser = BlocklistParserJSON() - bl = parser.parse_blocklist(data, 'test_json') + bl = parser.parse_blocklist(data, "test_json") assert len(bl) == 10 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].severity.level == SeverityLevel.SUSPEND + assert bl["example2.org"].severity.level == SeverityLevel.SILENCE + assert bl["example3.org"].severity.level == SeverityLevel.SUSPEND + assert bl["example4.org"].severity.level == SeverityLevel.NONE - assert bl['example.org'].severity.level == SeverityLevel.SUSPEND - assert bl['example2.org'].severity.level == SeverityLevel.SILENCE - assert bl['example3.org'].severity.level == SeverityLevel.SUSPEND - assert bl['example4.org'].severity.level == SeverityLevel.NONE def test_ignore_comments(): data = load_data() parser = BlocklistParserJSON() - bl = parser.parse_blocklist(data, 'test_json') + bl = parser.parse_blocklist(data, "test_json") assert len(bl) == 10 - assert 'example.org' in bl - assert 'example2.org' in bl - assert 'example3.org' in bl - assert 'example4.org' in bl - - assert bl['example.org'].public_comment == '' - assert bl['example.org'].private_comment == '' - assert bl['example3.org'].public_comment == '' - assert bl['example4.org'].private_comment == '' \ No newline at end of file + assert "example.org" in bl + assert "example2.org" in bl + assert "example3.org" in bl + assert "example4.org" in bl + + assert bl["example.org"].public_comment == "" + assert bl["example.org"].private_comment == "" + assert bl["example3.org"].public_comment == "" + assert bl["example4.org"].private_comment == "" diff --git a/tests/test_parser_rapidblockcsv.py b/tests/test_parser_rapidblockcsv.py index 65d579d..36c3677 100644 --- a/tests/test_parser_rapidblockcsv.py +++ b/tests/test_parser_rapidblockcsv.py @@ -1,23 +1,27 @@ """Tests of the Rapidblock CSV parsing """ -from fediblockhole.blocklists import RapidBlockParserCSV, parse_blocklist -from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel +from fediblockhole.blocklists import RapidBlockParserCSV +from fediblockhole.const import SeverityLevel -csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n""" +csvdata = ( + """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n""" +) parser = RapidBlockParserCSV() + def test_basic_rapidblock(): bl = parser.parse_blocklist(csvdata) assert len(bl) == 4 - assert 'example.org' in bl - assert 'subdomain.example.org' in bl - assert 'anotherdomain.org' in bl - assert 'domain4.org' in bl + assert "example.org" in bl + assert "subdomain.example.org" in bl + assert "anotherdomain.org" in bl + assert "domain4.org" in bl + def test_severity_is_suspend(): bl = parser.parse_blocklist(csvdata) for block in bl.values(): - assert block.severity.level == SeverityLevel.SUSPEND \ No newline at end of file + assert block.severity.level == SeverityLevel.SUSPEND diff --git a/tests/test_parser_rapidblockjson.py b/tests/test_parser_rapidblockjson.py index ad13811..141bfae 100644 --- a/tests/test_parser_rapidblockjson.py +++ b/tests/test_parser_rapidblockjson.py @@ -1,34 +1,43 @@ """Test parsing the RapidBlock JSON format """ -from fediblockhole.blocklists import parse_blocklist +from fediblockhole.blocklists import parse_blocklist from fediblockhole.const import SeverityLevel rapidblockjson = "data-rapidblock.json" + def test_parse_rapidblock_json(): with open(rapidblockjson) as fp: data = fp.read() - bl = parse_blocklist(data, 'pytest', 'rapidblock.json') + bl = parse_blocklist(data, "pytest", "rapidblock.json") - assert '101010.pl' in bl - assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND - assert bl['101010.pl'].public_comment == '' + assert "101010.pl" in bl + assert bl["101010.pl"].severity.level == SeverityLevel.SUSPEND + assert bl["101010.pl"].public_comment == "" + + assert "berserker.town" in bl + assert bl["berserker.town"].severity.level == SeverityLevel.SUSPEND + assert bl["berserker.town"].public_comment == "" + assert bl["berserker.town"].private_comment == "" - assert 'berserker.town' in bl - assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND - assert bl['berserker.town'].public_comment == '' - assert bl['berserker.town'].private_comment == '' def test_parse_with_comments(): with open(rapidblockjson) as fp: data = fp.read() - bl = parse_blocklist(data, 'pytest', 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment']) - - assert '101010.pl' in bl - assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND - assert bl['101010.pl'].public_comment == 'cryptomining javascript, white supremacy' - - assert 'berserker.town' in bl - assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND - assert bl['berserker.town'].public_comment == 'freeze peach' \ No newline at end of file + bl = parse_blocklist( + data, + "pytest", + "rapidblock.json", + ["domain", "severity", "public_comment", "private_comment"], + ) + + assert "101010.pl" in bl + assert bl["101010.pl"].severity.level == SeverityLevel.SUSPEND + assert ( + bl["101010.pl"].public_comment == "cryptomining javascript, white supremacy" + ) + + assert "berserker.town" in bl + assert bl["berserker.town"].severity.level == SeverityLevel.SUSPEND + assert bl["berserker.town"].public_comment == "freeze peach"