Skip to content

Commit

Permalink
Merge pull request #63 from maciejzj/pipeline-duplication-fix
Browse files Browse the repository at this point in the history
Fix entires duplication
  • Loading branch information
maciejzj authored Dec 26, 2024
2 parents 0294c23 + 5cd92fe commit b3426b9
Show file tree
Hide file tree
Showing 9 changed files with 1,501 additions and 953 deletions.
1,785 changes: 1,092 additions & 693 deletions analysis/data_exploration.ipynb

Large diffs are not rendered by default.

21 changes: 20 additions & 1 deletion it_jobs_meta/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Utility tools shared across the application."""

import functools
import logging
import sys
import time
from pathlib import Path
from typing import Any
from typing import Any, Callable, ParamSpec, TypeVar

import yaml

Expand Down Expand Up @@ -31,3 +33,20 @@ def setup_logging(*args: Path, log_level: int = logging.INFO):
def load_yaml_as_dict(path: Path) -> dict[str, Any]:
with open(path, 'r', encoding='UTF-8') as yaml_file:
return yaml.safe_load(yaml_file)


P = ParamSpec("P")
R = TypeVar("R")


def throttle(seconds: float) -> Callable[[Callable[P, R]], Callable[P, R]]:
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
ret = func(*args, **kwargs)
time.sleep(seconds)
return ret

return wrapper

return decorator
23 changes: 14 additions & 9 deletions it_jobs_meta/dashboard/dashboard_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,12 @@ def make_fig(cls, postings_df: pd.DataFrame) -> go.Figure:
class SalariesMap(GraphFigure):
TITLE = 'Mean salary by location (PLN)'
N_MOST_FREQ = 15
POLAND_LAT, POLAND_LON = 52.0, 19.0
PROJECTION_SCALE = 10

@classmethod
def make_fig(cls, postings_df) -> go.Figure:
postings_df = postings_df.explode('city')
postings_df = postings_df.explode('city').dropna(subset=['city'])
postings_df[['city', 'lat', 'lon']] = postings_df['city'].transform(
lambda city: pd.Series([city[0], city[1], city[2]])
)
Expand All @@ -265,19 +267,26 @@ def make_fig(cls, postings_df) -> go.Figure:

fig = px.scatter_geo(
cities_salaries,
scope='europe',
lat='lat',
lon='lon',
size='job_counts',
color='salary_mean',
title=cls.TITLE,
fitbounds='locations',
labels={
'salary_mean': 'Mean salary',
'job_counts': 'Number of jobs',
},
hover_data={'city': True, 'lat': False, 'lon': False},
)

fig.update_layout(
geo=dict(
scope='europe',
center={'lat': cls.POLAND_LAT, 'lon': cls.POLAND_LON},
projection_scale=cls.PROJECTION_SCALE,
)
)

fig = center_title(fig)
return fig

Expand Down Expand Up @@ -357,9 +366,7 @@ def make_fig(
tech_most_freq = get_rows_with_n_most_frequent_vals_in_col(
postings_df, 'technology', cls.N_MOST_FREQ_TECH
)
limited = tech_most_freq[
tech_most_freq['salary_mean'] < cls.MAX_SALARY
]
limited = tech_most_freq[tech_most_freq['salary_mean'] < cls.MAX_SALARY]
limited = limited[
limited['seniority'].isin(('Junior', 'Mid', 'Senior'))
]
Expand Down Expand Up @@ -405,9 +412,7 @@ def make_fig(cls, postings_df) -> go.Figure:
tech_most_freq = get_rows_with_n_most_frequent_vals_in_col(
postings_df, 'technology', cls.N_MOST_FREQ_TECH
)
limited = tech_most_freq[
tech_most_freq['salary_mean'] < cls.MAX_SALARY
]
limited = tech_most_freq[tech_most_freq['salary_mean'] < cls.MAX_SALARY]
b2b_df = limited[limited['contract_type'] == 'B2B']
perm_df = limited[limited['contract_type'] == 'Permanent']

Expand Down
110 changes: 63 additions & 47 deletions it_jobs_meta/data_pipeline/data_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,20 @@ class EtlTransformationEngine(Generic[ProcessDataType], ABC):
interfaces for methods necessary to build the processing pipeline.
"""

COLS_TO_DROP = [
'renewed',
'logo',
'regions',
'flavors',
'topInSearch',
'highlighted',
'onlineInterviewAvailable',
'referralBonus',
'referralBonusCurrency',
# Columns that are unique per posting and can be used to identify duplicates.
COLS_UNIQUE_IDENTITY = ['name', 'posted', 'title']

# Columns to pick for further processing, the rest should be dropped.
COLS_TO_KEEP = [
'id',
'name',
'location',
'posted',
'title',
'technology',
'category',
'seniority',
'salary',
]

# Look at the ETL pipeline implementation to see the predefined order of
Expand All @@ -66,11 +70,13 @@ class EtlTransformationEngine(Generic[ProcessDataType], ABC):
'technology',
]

# In any column replace these strings
# Replace strings in columns specified by dict keys.
VALS_TO_REPLACE = {
'node.js': 'node',
'angular': 'javascript',
'react': 'javascript',
'technology': {
'node.js': 'node',
'angular': 'javascript',
'react': 'javascript',
}
}

# Apply transformation like 'businessAnalyst' -> 'business Analyst'
Expand All @@ -87,25 +93,34 @@ class EtlTransformationEngine(Generic[ProcessDataType], ABC):
# Names that require specific mappings instead of normal capitalizations.
# Input strings should be transformed to lower before applying these cases.
CAPITALIZE_SPECIAL_NAMES = {
'.net': '.Net',
'aws': 'AWS',
'ios': 'iOS',
'javascript': 'JavaScript',
'php': 'PHP',
'sql': 'SQL',
'b2b': 'B2B',
'technology': {
'.net': '.Net',
'aws': 'AWS',
'ios': 'iOS',
'javascript': 'JavaScript',
'php': 'PHP',
'sql': 'SQL',
},
'contract_type': {
'b2b': 'B2B',
'B2b': 'B2B',
},
}

# Limit locations to the given countries.
COUNTRY_FILTERS = ['Polska']
# Limit locations to the given countries, use ISO 3166-1alpha2 codes.
COUNTRY_FILTERS = ['pl']

# Minimum mean salary based on minimum wage in Poland (we assume that lower salaries are mistakes).
SALARY_CURRENCY = 'PLN'
MIN_MEAN_SALARY = 3500

@abstractmethod
def drop_unwanted(self, data: ProcessDataType) -> ProcessDataType:
"""Drop unwanted columns in the COLS_TO_DROP."""
def select_required(self, data: ProcessDataType) -> ProcessDataType:
"""Select only the required columns specified in COLS_TO_KEEP."""

@abstractmethod
def drop_duplicates(self, data: ProcessDataType) -> ProcessDataType:
"""Drop duplicated rows in the dataset."""
"""Drop duplicated rows in the dataset based on COLS_UNIQUE_IDENTITY."""

@abstractmethod
def unify_to_lower(self, data: ProcessDataType) -> ProcessDataType:
Expand Down Expand Up @@ -202,7 +217,7 @@ def extract(
return self._extraction_engine.extract(input_)

def transform(self, data: ProcessDataType) -> ProcessDataType:
data = self._transformation_engine.drop_unwanted(data)
data = self._transformation_engine.select_required(data)
data = self._transformation_engine.drop_duplicates(data)
data = self._transformation_engine.extract_remote(data)
data = self._transformation_engine.extract_locations(data)
Expand Down Expand Up @@ -235,11 +250,8 @@ def extract(self, input_: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
data = NoFluffJObsPostingsData.from_json_str(input_)
self.validate_nofluffjobs_data(data)
metadata_df = pd.DataFrame(
dataclasses.asdict(data.metadata), index=[0]
)
metadata_df = pd.DataFrame(dataclasses.asdict(data.metadata), index=[0])
data_df = pd.DataFrame(data.raw_data['postings'])
data_df = data_df.set_index('id')
return metadata_df, data_df

@staticmethod
Expand All @@ -251,9 +263,7 @@ def validate_nofluffjobs_data(data: NoFluffJObsPostingsData):
f'"nofluffjobs", got: {data.metadata.source_name}'
)
try:
assert data.raw_data['totalCount'] == len(
data.raw_data['postings']
)
assert data.raw_data['totalCount'] == len(data.raw_data['postings'])
except KeyError as error:
raise ValueError(
'Data extractor got correct date format type and '
Expand All @@ -267,19 +277,23 @@ def __init__(self):
country_filter=EtlTransformationEngine.COUNTRY_FILTERS
)

def drop_unwanted(self, data: pd.DataFrame) -> pd.DataFrame:
return data.drop(columns=EtlTransformationEngine.COLS_TO_DROP)
def select_required(self, data: pd.DataFrame) -> pd.DataFrame:
return data[self.COLS_TO_KEEP]

def drop_duplicates(self, data: pd.DataFrame) -> pd.DataFrame:
return data[~data.index.duplicated(keep='first')]
return data.drop_duplicates(subset=self.COLS_UNIQUE_IDENTITY)

def unify_to_lower(self, data: pd.DataFrame) -> pd.DataFrame:
for col in EtlTransformationEngine.COLS_TO_LOWER:
for col in self.COLS_TO_LOWER:
data[col] = data[col].str.lower()
return data

def replace_values(self, data: pd.DataFrame) -> pd.DataFrame:
return data.replace(to_replace=EtlTransformationEngine.VALS_TO_REPLACE)
for col in self.VALS_TO_REPLACE:
data[col].replace(
to_replace=self.VALS_TO_REPLACE[col], inplace=True
)
return data

def split_on_capitals(self, data: pd.DataFrame) -> pd.DataFrame:
for col in EtlTransformationEngine.COLS_TO_SPLIT_ON_CAPITAL_LETTERS:
Expand All @@ -297,7 +311,9 @@ def to_capitalized(self, data: pd.DataFrame) -> pd.DataFrame:
specials = EtlTransformationEngine.CAPITALIZE_SPECIAL_NAMES
for col in EtlTransformationEngine.COLS_TO_CAPITALIZE:
data[col] = data[col][data[col].notna()].transform(
lambda s: specials[s] if s in specials else s.capitalize()
lambda s: (
specials[col][s] if s in specials[col] else s.capitalize()
)
)
return data

Expand All @@ -309,10 +325,11 @@ def extract_remote(self, data: pd.DataFrame) -> pd.DataFrame:

def extract_locations(self, data: pd.DataFrame) -> pd.DataFrame:
data['city'] = data['location'].transform(
lambda location_dict: [
self._geolocator(loc['city'])
for loc in location_dict['places']
if 'city' in loc
lambda loc_dict: [
city
for location in loc_dict['places']
if 'city' in location
and (city := self._geolocator(location['city'])) is not None
]
)
return data
Expand All @@ -334,9 +351,10 @@ def extract_salaries(self, data: pd.DataFrame) -> pd.DataFrame:

data = data[
data['salary'].transform(
lambda salary_dict: salary_dict['currency'] == 'PLN'
lambda salary_dict: salary_dict['currency'] == self.SALARY_CURRENCY
)
]
data = data[data['salary_mean'] > self.MIN_MEAN_SALARY]
return data

def unify_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -354,7 +372,6 @@ class PandasEtlMongodbLoadingEngine(EtlLoadingEngine[pd.DataFrame]):
'title',
'technology',
'category',
'url',
'remote',
'contract_type',
'salary_min',
Expand Down Expand Up @@ -404,7 +421,6 @@ class PandasEtlSqlLoadingEngine(EtlLoadingEngine[pd.DataFrame]):
'title',
'technology',
'category',
'url',
'remote',
]

Expand Down
52 changes: 19 additions & 33 deletions it_jobs_meta/data_pipeline/geolocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,45 @@
from typing import Sequence

from geopy.geocoders import Nominatim
from retry import retry

from it_jobs_meta.common.utils import throttle


class Geolocator:
def __init__(self, country_filter: Sequence[str] | None = None):
"""Create geolocator instance.
:param country_filter: Tuple of country names that the geolocation
should be limited to.
should be limited to (use ISO 3166-1alpha2 codes).
"""
self._geolocator = Nominatim(user_agent='it-jobs-meta')
self._country_filter = country_filter

@functools.cache
def __call__(
@retry(TimeoutError, tries=3, delay=10)
@throttle(0.1)
def __call__(self, city_name: str) -> tuple[str, float, float] | None:
"""Call to get_universal_city_name_lat_lon method."""
return self.get_universal_city_name_lat_lon(city_name)

def get_universal_city_name_lat_lon(
self, city_name: str
) -> tuple[str, float, float] | tuple[None, None, None]:
) -> tuple[str, float, float] | None:
"""For given city name get it's location.
:param city_name: Name of the city to geolocate, can be in native
language or in English, different name variants will be unified on
return.
:return: Tuple with location as (unified_city_name, latitude,
longitude), if geolocation fails or country is not in
"contry_filters" will return Nones".
longitude) or None if location failed.
"""
return self.get_universal_city_name_lat_lon(city_name)

def get_universal_city_name_lat_lon(
self, city_name: str
) -> tuple[str, float, float] | tuple[None, None, None]:
"""Same as __call__."""

location = self._geolocator.geocode(city_name)

if location is None:
return Geolocator._make_none_location()

city_name, country_name = Geolocator._address_str_to_city_country_name(
location.address
location = self._geolocator.geocode(
city_name, country_codes=self._country_filter
)

filter_ = self._country_filter
if filter_ is not None and country_name not in filter_:
return Geolocator._make_none_location()

return city_name, location.latitude, location.longitude

@staticmethod
def _address_str_to_city_country_name(address: str) -> tuple[str, str]:
split_loc = address.split(',')
city_name, country_name = split_loc[0].strip(), split_loc[-1].strip()
return city_name, country_name
if location is None:
return None

@staticmethod
def _make_none_location() -> tuple[None, None, None]:
return None, None, None
city_name = location.address.split(',')[0]
return (city_name, location.latitude, location.longitude)
Loading

0 comments on commit b3426b9

Please sign in to comment.