From 0dd5e6b889ec7cd0c5e8c3584408ea54bd94f2ec Mon Sep 17 00:00:00 2001 From: Maciej Ziaja Date: Fri, 25 Mar 2022 20:02:35 +0100 Subject: [PATCH] Docstrings and small fixes --- README.md | 2 +- it_jobs_meta/__main__.py | 2 + it_jobs_meta/common/cli.py | 19 +++ it_jobs_meta/common/utils.py | 6 + it_jobs_meta/dashboard/dashboard.py | 18 +-- .../dashboard/dashboard_components.py | 17 ++- it_jobs_meta/dashboard/data_provision.py | 26 ++-- it_jobs_meta/dashboard/layout.py | 2 + it_jobs_meta/data_pipeline/data_etl.py | 136 +++++++++++++----- it_jobs_meta/data_pipeline/data_formats.py | 30 ++-- it_jobs_meta/data_pipeline/data_lake.py | 8 +- it_jobs_meta/data_pipeline/data_pipeline.py | 12 ++ it_jobs_meta/data_pipeline/data_validation.py | 2 + it_jobs_meta/data_pipeline/geolocator.py | 27 +++- 14 files changed, 228 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 86773ab..99102bb 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ end ``` Notice that the terms *data lake* and *data warehouse* are used in a rather -loose way in the following descriptions. +loose/naive way in the following descriptions. ## Setup diff --git a/it_jobs_meta/__main__.py b/it_jobs_meta/__main__.py index df9aed7..0ea16e2 100644 --- a/it_jobs_meta/__main__.py +++ b/it_jobs_meta/__main__.py @@ -1,3 +1,5 @@ +"""Run the data pipeline or dashboard using CLI options.""" + from it_jobs_meta.common.cli import CliArgumentParser from it_jobs_meta.common.utils import setup_logging from it_jobs_meta.dashboard.dashboard import ( diff --git a/it_jobs_meta/common/cli.py b/it_jobs_meta/common/cli.py index 9813b6b..e2817f2 100644 --- a/it_jobs_meta/common/cli.py +++ b/it_jobs_meta/common/cli.py @@ -1,3 +1,5 @@ +"""Command line parser for the it-jobs-meta application.""" + import argparse from pathlib import Path from typing import Any @@ -8,6 +10,8 @@ class CliArgumentParser: + """Command line parser for the it-jobs-meta application.""" + PROG = 'it-jobs-meta' DESCRIPTION = ( 'Data pipeline and meta-analysis dashboard for IT job postings' @@ -39,6 +43,11 @@ def args(self) -> dict[str, Any]: return self._args def extract_data_lake(self) -> tuple[DataLakeImpl, Path]: + """Extract data lake setup from the arguments. + + :return: Tuple with the selected data lake implementation type and + the config path. + """ match self.args: case {'redis': Path(), 's3_bucket': None}: return DataLakeImpl.REDIS, self.args['redis'] @@ -51,6 +60,11 @@ def extract_data_lake(self) -> tuple[DataLakeImpl, Path]: ) def extract_etl_loader(self) -> tuple[EtlLoaderImpl, Path]: + """Get the ETL loader setup from the arguments. + + :return: Tuple with the selected etl loader implementation type and + the config path. + """ match self.args: case {'mongodb': Path(), 'sql': None}: return EtlLoaderImpl.MONGODB, self.args['mongodb'] @@ -63,6 +77,11 @@ def extract_etl_loader(self) -> tuple[EtlLoaderImpl, Path]: ) def extract_data_provider(self) -> tuple[DashboardProviderImpl, Path]: + """Get the dashboard data provider setup from the arguments. + + :return: Tuple with the selected data provider implementation type and + the config path. + """ match self.args: case {'mongodb': Path()}: return DashboardProviderImpl.MONGODB, self.args['mongodb'] diff --git a/it_jobs_meta/common/utils.py b/it_jobs_meta/common/utils.py index 6b0a501..4287b35 100644 --- a/it_jobs_meta/common/utils.py +++ b/it_jobs_meta/common/utils.py @@ -1,3 +1,5 @@ +"""Utility tools shared across the application.""" + import logging import sys from pathlib import Path @@ -7,6 +9,10 @@ def setup_logging(*args: Path): + """Enable logging to stdout and the given files. + + :param *args: Paths to log output files. + """ log_file_handlers = [] for log_path in args: log_path.parent.mkdir(exist_ok=True, parents=True) diff --git a/it_jobs_meta/dashboard/dashboard.py b/it_jobs_meta/dashboard/dashboard.py index 26bc407..dfa1cae 100644 --- a/it_jobs_meta/dashboard/dashboard.py +++ b/it_jobs_meta/dashboard/dashboard.py @@ -1,3 +1,5 @@ +"""Dashboard server for job postings data visualization.""" + import logging from datetime import timedelta from pathlib import Path @@ -14,7 +16,6 @@ from it_jobs_meta.dashboard.data_provision import ( DashboardDataProviderFactory, DashboardProviderImpl, - GatheredData, ) from it_jobs_meta.dashboard.layout import DynamicContent, make_layout @@ -66,11 +67,11 @@ def cache(self) -> AppCache: def render_layout(self) -> DashComponent: logging.info('Rendering dashboard') logging.info('Attempting to retrieve data') - data = self._data_provider_factory.make().gather_data() + metadata_df, data_df = self._data_provider_factory.make().gather_data() logging.info('Data retrieval succeeded') logging.info('Making layout') - dynamic_content = self.make_dynamic_content(data) + dynamic_content = self.make_dynamic_content(metadata_df, data_df) layout = make_layout(dynamic_content) logging.info('Making layout succeeded') logging.info('Rendering dashboard succeeded') @@ -98,17 +99,18 @@ def run(self, with_wsgi=False): raise @staticmethod - def make_dynamic_content(data: GatheredData) -> DynamicContent: - obtained_datetime = pd.to_datetime( - data.metadata['obtained_datetime'][0] - ) - graphs = GraphRegistry.make(data.postings) + def make_dynamic_content( + metadata_df: pd.DataFrame, data_df: pd.DataFrame + ) -> DynamicContent: + obtained_datetime = pd.to_datetime(metadata_df['obtained_datetime'][0]) + graphs = GraphRegistry.make(data_df) return DynamicContent( obtained_datetime=obtained_datetime, graphs=graphs ) def main(): + """Run the demo dashboard with short cache timout (for development).""" setup_logging() data_warehouse_config_path = Path('config/mongodb_config.yml') data_provider_factory = DashboardDataProviderFactory( diff --git a/it_jobs_meta/dashboard/dashboard_components.py b/it_jobs_meta/dashboard/dashboard_components.py index d9b4fc1..d20620e 100644 --- a/it_jobs_meta/dashboard/dashboard_components.py +++ b/it_jobs_meta/dashboard/dashboard_components.py @@ -1,3 +1,5 @@ +"""Data dashboard components and graphs.""" + from abc import ABC, abstractmethod from enum import Enum, auto from typing import Any @@ -22,7 +24,14 @@ def get_rows_with_n_most_frequent_vals_in_col( def sort_by_seniority(df: pd.DataFrame) -> pd.DataFrame: - SENIORITY_ORDER = {'Trainee': 0, 'Junior': 1, 'Mid': 2, 'Senior': 3} + """Sorts rows according to the seniority---least to most experienced.""" + SENIORITY_ORDER = { + 'Trainee': 0, + 'Junior': 1, + 'Mid': 2, + 'Senior': 3, + 'Expert': 4, + } sorted = df.sort_values('seniority', key=lambda x: x.map(SENIORITY_ORDER)) return sorted @@ -65,18 +74,22 @@ class GraphFigure(ABC): @classmethod @abstractmethod def make_fig(cls, postings_df: pd.DataFrame) -> go.Figure: - pass + """Make the figure using the given data frame.""" class GraphRegistry: + """Registry for automatic gathering and creation of graph figures.""" + _graph_makers: dict[Graph, GraphFigure] = {} @classmethod def register(cls, key: Graph): + """Add given graph implementation to the registry.""" return lambda graph_figure: cls._register_inner(key, graph_figure) @classmethod def make(cls, postings_df: pd.DataFrame) -> dict[Graph, dcc.Graph]: + """Make all registered graphs using the given data and get them.""" graphs: dict[Graph, go.Figure] = {} for graph_key in cls._graph_makers: graphs[graph_key] = dcc.Graph( diff --git a/it_jobs_meta/dashboard/data_provision.py b/it_jobs_meta/dashboard/data_provision.py index fc7d6e1..f3c69bc 100644 --- a/it_jobs_meta/dashboard/data_provision.py +++ b/it_jobs_meta/dashboard/data_provision.py @@ -1,5 +1,6 @@ +"""Data provision and data source for the data dashboard.""" + from abc import ABC, abstractmethod -from dataclasses import dataclass from enum import Enum, auto from pathlib import Path @@ -9,16 +10,14 @@ from it_jobs_meta.common.utils import load_yaml_as_dict -@dataclass -class GatheredData: - metadata: pd.DataFrame - postings: pd.DataFrame - - class DashboardDataProvider(ABC): @abstractmethod - def gather_data(self) -> GatheredData: - pass + def gather_data(self) -> tuple[pd.DataFrame, pd.DataFrame]: + """Gather data for the dashboard. + + :return: Tuple with metadata and data dataframes as (metadata_df, + data_df) + """ class MongodbDashboardDataProvider(DashboardDataProvider): @@ -41,14 +40,19 @@ def from_config_file( ) -> 'MongodbDashboardDataProvider': return cls(**load_yaml_as_dict(config_file_path)) - def gather_data(self) -> GatheredData: + def gather_data(self) -> tuple[pd.DataFrame, pd.DataFrame]: + """Gather data for the dashboard. + + :return: Tuple with metadata and data dataframes as (metadata_df, + data_df) + """ metadata_df = pd.json_normalize(self._db['metadata'].find()) postings_df = pd.json_normalize(self._db['postings'].find()) if metadata_df.empty or postings_df.empty: raise RuntimeError( 'Data gather for the dashboard resulted in empty datasets' ) - return GatheredData(metadata=metadata_df, postings=postings_df) + return metadata_df, postings_df class DashboardProviderImpl(Enum): diff --git a/it_jobs_meta/dashboard/layout.py b/it_jobs_meta/dashboard/layout.py index d208a5c..b2c703a 100644 --- a/it_jobs_meta/dashboard/layout.py +++ b/it_jobs_meta/dashboard/layout.py @@ -1,3 +1,5 @@ +"""Dashboard layout and components stitching.""" + from dataclasses import dataclass from datetime import datetime diff --git a/it_jobs_meta/data_pipeline/data_etl.py b/it_jobs_meta/data_pipeline/data_etl.py index c4cbba9..b54ced6 100644 --- a/it_jobs_meta/data_pipeline/data_etl.py +++ b/it_jobs_meta/data_pipeline/data_etl.py @@ -1,3 +1,5 @@ +"""Data Extraction, Transformations, and Loading for the job postings data.""" + import dataclasses import re from abc import ABC, abstractmethod @@ -14,17 +16,39 @@ from it_jobs_meta.data_pipeline.data_validation import Schemas from it_jobs_meta.data_pipeline.geolocator import Geolocator -DataType = TypeVar('DataType') +# Data type used internally by the data transformation pipeline. +ProcessDataType = TypeVar('ProcessDataType') +# Data type accepted as the input to the data pipeline by the data extraction +# engine. PipelineInputType = TypeVar('PipelineInputType') +# E.g. Data pipeline input data type is JSON string, and internal processing +# data type is Pandas Data Frame. + + +class EtlExtractionEngine(Generic[PipelineInputType, ProcessDataType], ABC): + """Extraction engine for the ETL pipeline. + Should handle inputs in the PipelineInputType and provide the pipeline with + the PipelineInputType data. + """ -class EtlExtractionEngine(Generic[PipelineInputType, DataType], ABC): @abstractmethod - def extract(self, input_: PipelineInputType) -> tuple[DataType, DataType]: - pass + def extract( + self, input_: PipelineInputType + ) -> tuple[ProcessDataType, ProcessDataType]: + """Extract data from the input and get it the pipeline compatible form. + + :return: Tuple with metadata and data dataframes (metadata, data). + """ + +class EtlTransformationEngine(Generic[ProcessDataType], ABC): + """ETL operations actions template on the given processing data type. + + Includes constants with values to drop, replace, or transform, and + interfaces for methods necessary to build the processing pipeline. + """ -class EtlTransformationEngine(Generic[DataType], ABC): COLS_TO_DROP = [ 'renewed', 'logo', @@ -43,10 +67,13 @@ class EtlTransformationEngine(Generic[DataType], ABC): 'react': 'javascript', } + # Title case text is like "Sample Text". COLS_TO_TITLE_CASE = ['category'] + # Capitalized text is like: "Sample text". COLS_TO_CAPITALIZE = ['technology', 'contract_type'] + # Names that require specific mappings instead of normal capitalizations. CAPITALIZE_SPECIAL_NAMES = { '.net': '.Net', 'aws': 'AWS', @@ -57,60 +84,91 @@ class EtlTransformationEngine(Generic[DataType], ABC): 'b2b': 'B2B', } + # Limit locations to the given countries. + COUNTRY_FILTERS = ['Polska'] + @abstractmethod - def drop_unwanted(self, data: DataType) -> DataType: - pass + def drop_unwanted(self, data: ProcessDataType) -> ProcessDataType: + """Drop unwanted columns in the COLS_TO_DROP.""" @abstractmethod - def drop_duplicates(self, data: DataType) -> DataType: - pass + def drop_duplicates(self, data: ProcessDataType) -> ProcessDataType: + """Drop duplicated rows in the dataset.""" @abstractmethod - def replace_values(self, data: DataType) -> DataType: - pass + def replace_values(self, data: ProcessDataType) -> ProcessDataType: + """Replace values specified in COLS_TO_DROP.""" @abstractmethod - def to_title_case(self, data: DataType) -> DataType: - pass + def to_title_case(self, data: ProcessDataType) -> ProcessDataType: + """Transform columns in COLS_TO_TITLE_CASE to title case. + + Title case text is like "Sample Text". + """ @abstractmethod - def to_capitalized(self, data: DataType) -> DataType: - pass + def to_capitalized(self, data: ProcessDataType) -> ProcessDataType: + """Capitalize columns in COLS_TO_CAPITALIZE. + + Capitalized text is like "Sample text". + """ @abstractmethod - def extract_remote(self, data: DataType) -> DataType: - pass + def extract_remote(self, data: ProcessDataType) -> ProcessDataType: + """Extract remote work option and place it in the "remote" column.""" @abstractmethod - def extract_locations(self, data: DataType) -> DataType: + def extract_locations(self, data: ProcessDataType) -> ProcessDataType: + """Extract work location as cities and place them in the "city" column. + + Should ensure consistent naming and coordinates for given locations. + The values in the "city" column should be gathered in a tuple of + (city_name, latitude, longitude). The results should be limited to the + countries in COUNTRY_FILTERS. + """ pass @abstractmethod - def extract_contract_type(self, data: DataType) -> DataType: - pass + def extract_contract_type(self, data: ProcessDataType) -> ProcessDataType: + """Extract contract type and place it in the "contract_type" column.""" @abstractmethod - def extract_salaries(self, data: DataType) -> DataType: + def extract_salaries(self, data: ProcessDataType) -> ProcessDataType: + """Extract salaries to columns: "salary_max", "min", "salary_mean".""" pass @abstractmethod - def unify_missing_values(self, data: DataType) -> DataType: + def unify_missing_values(self, data: ProcessDataType) -> ProcessDataType: + """Unify missing values (NaNs, empty, etc.) into Nones.""" pass -class EtlLoadingEngine(Generic[DataType], ABC): +class EtlLoadingEngine(Generic[ProcessDataType], ABC): + """Loader for placing processing results in databases.""" + @abstractmethod - def load_tables_to_warehouse(self, metadata: DataType, data: DataType): - pass + def load_tables_to_warehouse( + self, metadata: ProcessDataType, data: ProcessDataType + ): + """Load processed data into a database.""" -class EtlPipeline(Generic[DataType, PipelineInputType]): +class EtlPipeline(Generic[ProcessDataType, PipelineInputType]): + """ETL pipeline coordinating extraction, transformations, and loading.""" + def __init__( self, - extraction_engine: EtlExtractionEngine[PipelineInputType, DataType], - transformation_engine: EtlTransformationEngine[DataType], - loading_engine: EtlLoadingEngine[DataType], + extraction_engine: EtlExtractionEngine[ + PipelineInputType, ProcessDataType + ], + transformation_engine: EtlTransformationEngine[ProcessDataType], + loading_engine: EtlLoadingEngine[ProcessDataType], ): + """Data pipeline runner for ETL jobs. + + Notice that the extraction, transformation, and loading engines must + work on the same ProcessDataType type to build a proper pipeline. + """ self._extraction_engine = extraction_engine self._transformation_engine = transformation_engine @@ -121,10 +179,12 @@ def run(self, input_: PipelineInputType): data = self.transform(data) self.load(metadata, data) - def extract(self, input_: PipelineInputType) -> tuple[DataType, DataType]: + def extract( + self, input_: PipelineInputType + ) -> tuple[ProcessDataType, ProcessDataType]: return self._extraction_engine.extract(input_) - def transform(self, data: DataType) -> DataType: + def transform(self, data: ProcessDataType) -> ProcessDataType: data = self._transformation_engine.drop_unwanted(data) data = self._transformation_engine.drop_duplicates(data) data = self._transformation_engine.extract_remote(data) @@ -137,7 +197,7 @@ def transform(self, data: DataType) -> DataType: data = self._transformation_engine.unify_missing_values(data) return data - def load(self, metadata: DataType, data: DataType): + def load(self, metadata: ProcessDataType, data: ProcessDataType): self._loading_engine.load_tables_to_warehouse(metadata, data) @@ -146,6 +206,14 @@ def __init__(self): pass def extract(self, input_: str) -> tuple[pd.DataFrame, pd.DataFrame]: + """Extract job postings data from JSON str to dataframes. + + The input JSON should have keys: + 'metadata': Json dump of 'PostingsMetadata' with keys: + 'source_name': Name of the data source. + 'obtained_datetime': Timestamp in format 'YYYY-MM-DD HH:MM:SS'. + 'raw_data': Raw data in format of a JSON string. + """ data = NoFluffJObsPostingsData.from_json_str(input_) self.validate_nofluffjobs_data(data) metadata_df = pd.DataFrame( @@ -176,7 +244,9 @@ def validate_nofluffjobs_data(data: NoFluffJObsPostingsData): class PandasEtlTransformationEngine(EtlTransformationEngine[pd.DataFrame]): def __init__(self): - self._geolocator = Geolocator(country_filter=('Polska')) + self._geolocator = Geolocator( + country_filter=EtlTransformationEngine.COUNTRY_FILTERS + ) def drop_unwanted(self, data: pd.DataFrame) -> pd.DataFrame: return data.drop(columns=EtlTransformationEngine.COLS_TO_DROP) diff --git a/it_jobs_meta/data_pipeline/data_formats.py b/it_jobs_meta/data_pipeline/data_formats.py index 43ee58d..ec5da42 100644 --- a/it_jobs_meta/data_pipeline/data_formats.py +++ b/it_jobs_meta/data_pipeline/data_formats.py @@ -18,13 +18,13 @@ class PostingsData(ABC): @classmethod @abstractmethod def from_json_str(cls, json_str) -> 'PostingsData': - """Make the data structure from json string. + """Make the data structure from JSON string. - The input json should have keys: + The input JSON should have keys: 'metadata': Json dump of 'PostingsMetadata' with keys: 'source_name': Name of the data source. 'obtained_datetime': Timestamp with fmt 'YYYY-MM-DD HH:MM:SS'. - 'raw_data': Raw data in format of a json string. + 'raw_data': Raw data in format of a JSON string. """ @property @@ -43,13 +43,13 @@ def make_key_for_data(self) -> str: @abstractmethod def make_json_str_from_data(self) -> str: - """Get the data in form of json string. + """Get the data in form of JSON string. - The returned json should have keys: - 'metadata': Json dump of 'PostingsMetadata' with keys: + The returned JSON should have keys: + 'metadata': JSON dump of 'PostingsMetadata' with keys: 'source_name': Name of the data source. 'obtained_datetime': Timestamp in format 'YYYY-MM-DD HH:MM:SS'. - 'data': Raw data in format of a json string. + 'raw_data': Raw data in format of a JSON string. """ @@ -61,13 +61,13 @@ def __init__(self, metadata: PostingsMetadata, raw_data: Any): @classmethod def from_json_str(cls, json_str: str) -> 'NoFluffJObsPostingsData': - """Make the data structure from json string. + """Make the data structure from JSON string. - The input json should have keys: - 'metadata': Json dump of 'PostingsMetadata' with keys: + The input JSON should have keys: + 'metadata': JSON dump of 'PostingsMetadata' with keys: 'source_name': Name of the data source. 'obtained_datetime': Timestamp with fmt 'YYYY-MM-DD HH:MM:SS'. - 'raw_data': Raw data in format of a json string. + 'raw_data': Raw data in format of a JSON string. """ data_dict = json.loads(json_str) source_name = data_dict['metadata']['source_name'] @@ -97,13 +97,13 @@ def make_key_for_data(self) -> str: return f'{timestamp}_{source_name}' def make_json_str_from_data(self) -> str: - """Get the data in form of json string. + """Get the data in form of JSON string. - The returned json should have keys: - 'metadata': Json dump of 'PostingsMetadata' with keys: + The returned JSON should have keys: + 'metadata': JSON dump of 'PostingsMetadata' with keys: 'source_name': Name of the data source. 'obtained_datetime': Timestamp with fmt 'YYYY-MM-DD HH:MM:SS'. - 'raw_data': Raw scraped data in format of a json string. For + 'raw_data': Raw scraped data in format of a JSON string. For No Fluff Jobs scraped data the structure is assumed to be: 'postings': List of postings. 'totalCount': The length of the list of postings. diff --git a/it_jobs_meta/data_pipeline/data_lake.py b/it_jobs_meta/data_pipeline/data_lake.py index a348df4..7b93c88 100644 --- a/it_jobs_meta/data_pipeline/data_lake.py +++ b/it_jobs_meta/data_pipeline/data_lake.py @@ -15,11 +15,11 @@ class DataLake(ABC): @abstractmethod def set_data(self, key: str, data: str): - """Store data under key. Data is assumed to be a json string.""" + """Store data under key. Data is assumed to be a JSON string.""" @abstractmethod def get_data(self, key: str) -> str: - """Get data stored under key. Data is assumed ot be a json string.""" + """Get data stored under key. Data is assumed ot be a JSON string.""" class RedisDataLake(DataLake): @@ -38,11 +38,9 @@ def from_config_file(cls, config_path: Path) -> 'RedisDataLake': return cls(**load_yaml_as_dict(config_path)) def set_data(self, key: str, data: str): - """Store data under key. Data is assumed to be json string.""" self._db.set(key, data) def get_data(self, key: str) -> str: - """Get data stored under key. Data is assumed ot be json string.""" data = self._db.get(key) if data is None: raise KeyError(f'No data stored in db under key: {key}') @@ -50,6 +48,8 @@ def get_data(self, key: str) -> str: class S3DataLake(DataLake): + """AWS S3 data lake key-value object storage.""" + def __init__(self, bucket_name: str): self._s3 = boto3.resource('s3') self._bucket = self._s3.Bucket(bucket_name) diff --git a/it_jobs_meta/data_pipeline/data_pipeline.py b/it_jobs_meta/data_pipeline/data_pipeline.py index d862445..26a4b1b 100644 --- a/it_jobs_meta/data_pipeline/data_pipeline.py +++ b/it_jobs_meta/data_pipeline/data_pipeline.py @@ -1,3 +1,5 @@ +"""Full data pipeline for job postings data from No Fluff Jobs.""" + import datetime as dt import logging from pathlib import Path @@ -19,6 +21,11 @@ class DataPipeline: + """Full data pipeline for job postings data from No Fluff Jobs. + + Includes data scraping, ingestion, data lake storage, and running ETL job. + """ + def __init__( self, data_lake_factory: DataLakeFactory, @@ -78,6 +85,11 @@ def run(self): def main(): + """Demo main function for ad-hock tests. + + Reads postings data from test JSON file and feeds it to the ETL pipeline. + """ + test_json_file_path = Path( 'it_jobs_meta/data_pipeline/test/1640874783_nofluffjobs.json' ) diff --git a/it_jobs_meta/data_pipeline/data_validation.py b/it_jobs_meta/data_pipeline/data_validation.py index a972567..b34475a 100644 --- a/it_jobs_meta/data_pipeline/data_validation.py +++ b/it_jobs_meta/data_pipeline/data_validation.py @@ -1,3 +1,5 @@ +"""Data validation schemas for postings data.""" + from dataclasses import dataclass from datetime import datetime diff --git a/it_jobs_meta/data_pipeline/geolocator.py b/it_jobs_meta/data_pipeline/geolocator.py index 325dd4d..c32a0dc 100644 --- a/it_jobs_meta/data_pipeline/geolocator.py +++ b/it_jobs_meta/data_pipeline/geolocator.py @@ -1,3 +1,5 @@ +"""Geolocation services.""" + import functools from geopy.geocoders import Nominatim @@ -5,6 +7,11 @@ class Geolocator: def __init__(self, country_filter: tuple[str, ...] | None = None): + """Create geolocator instance. + + :param country_filter: Tuple of country names that the geolocation + should be limited to. + """ self._geolocator = Nominatim(user_agent='it-jobs-meta') self._country_filter = country_filter @@ -12,33 +19,43 @@ def __init__(self, country_filter: tuple[str, ...] | None = None): def __call__( self, city_name: str ) -> tuple[str, float, float] | tuple[None, None, None]: + """For given city name get it's location. + + :param city_name: Name of the city to geolocate, can be in native + language or in English, different name variants will be unified on + return. + :return: Tuple with location as (unified_city_name, latitude, + longitude), if geolocation fails or country is not in + "contry_filters" will return Nones". + """ return self.get_universal_city_name_lat_lon(city_name) def get_universal_city_name_lat_lon( self, city_name: str ) -> tuple[str, float, float] | tuple[None, None, None]: + """Same as __call__.""" location = self._geolocator.geocode(city_name) if location is None: - return Geolocator.make_none_location() + return Geolocator._make_none_location() - city_name, country_name = Geolocator.address_str_to_city_country_name( + city_name, country_name = Geolocator._address_str_to_city_country_name( location.address ) filter_ = self._country_filter if filter_ is not None and country_name not in filter_: - return Geolocator.make_none_location() + return Geolocator._make_none_location() return city_name, location.latitude, location.longitude @staticmethod - def address_str_to_city_country_name(address: str) -> tuple[str, str]: + def _address_str_to_city_country_name(address: str) -> tuple[str, str]: split_loc = address.split(',') city_name, country_name = split_loc[0].strip(), split_loc[-1].strip() return city_name, country_name @staticmethod - def make_none_location() -> tuple[None, None, None]: + def _make_none_location() -> tuple[None, None, None]: return None, None, None