We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make an R version for RAP
def dedupe_df(df, col_filter, on_col: str) -> DataFrame: # Remove Duplicates w = W.partitionBy(col_filter).orderBy(F.desc(on_col)) df_dedupe = df.withColumn("rank", F.dense_rank().over(w)).filter("rank = 1").drop("rank") return df_dedupe
def test_output_cols_are_unique(spark_session): # create dataset import_date = ['2022-05-24', '2022-05-25'] date = ['2016-01-01', '2016-01-01'] code = ['RWJ', 'RWJ'] age = ['1', '1'] zipped = list(zip(import_date, date, code, age)) input_df = pd.DataFrame(zipped, columns=['import_date', 'date', 'code', 'age']) input_df = spark_session.createDataFrame(input_df) input_df = input_df.withColumn('unique_check', F.concat(F.col('date'), F.col('code'), F.col('age'))) # test function output_df = utils.dedupe_df(input_df, ['date', 'code', 'age'], 'import_date') # assertions assert pd.Series(output_df.toPandas()['unique_check']).is_unique == True def test_output_cols_is_latest_value(spark_session): # create dataset import_date = ['2022-05-24', '2022-05-25'] date = ['2016-01-01', '2016-01-01'] code = ['RWJ', 'RWJ'] age = ['1', '1'] zipped = list(zip(import_date, date, code, age)) input_df = pd.DataFrame(zipped, columns=['import_date', 'date', 'code', 'age']) input_df = spark_session.createDataFrame(input_df) input_df = input_df.withColumn('unique_check', F.concat(F.col('date'), F.col('code'), F.col('age'))) # Calculate latetest import date latest_value = input_df.agg({'import_date': 'max'}) # test function output_df = utils.dedupe_df(input_df, ['date', 'code', 'age'], 'import_date') # assertions assert output_df.collect()[0]['import_date'] == latest_value.collect()[0]['max(import_date)']
Reference: https://github.com/craig-shenton/foundry-de-utilities
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Make an R version for RAP
PySpark code example
Unit Tests
Reference: https://github.com/craig-shenton/foundry-de-utilities
The text was updated successfully, but these errors were encountered: