Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Multithreading on msgpack Chunking in BulkImportWriter #142

Merged
merged 3 commits into from
Dec 6, 2024

Conversation

DavidLandup0
Copy link
Contributor

@DavidLandup0 DavidLandup0 commented Dec 5, 2024

Similar to how multithreading is enabled on the msgpack temp file uploads - this PR enables multithreading in the chunking process itself, using the same number of max workers as the uploads.

Benchmark Script

import pytd
import os
import numpy as np
import pandas as pd

API_KEY = "xxxxxxxxxxxxxxxx"
ENDPOINT = "xxxxxxxxxxxxx"

os.environ["TD_PRESTO_API"] = "xxxxxxxxxxxx"
os.environ["TD_API_KEY"] = API_KEY
os.environ["TD_API_SERVER"] = ENDPOINT

num_users = 100_000_000

# create a dataframe with 500M entries and a few columns
users = np.random.randint(1, num_users, num_users)
recency = np.random.randint(0, 365, num_users)
frequency = np.random.randint(1, 10, num_users)
monetary_value = np.random.randint(1, 1000, num_users)
df = pd.DataFrame({"user": users, "recency": recency, "frequency": frequency, "monetary_value": monetary_value})

client = pytd.Client(database="some_db", retry_post_requests=True, endpoint=ENDPOINT)
table = client.api_client.table("some_db", "some_table")
client.create_database_if_not_exists("some_db")

client.load_table_from_dataframe(
    df,
    "some_db.some_table",
    writer="bulk_import",
    if_exists="overwrite",
    fmt="msgpack",
    keep_list=True,
    max_workers=64
)

Script Results

Progress Bar - 100M rows

The script works nicely with the show_progress flag from #141 since it's easy to observe the individual steps.

On main + feature/show_progress_bulk_import:

python script.py
...
Chunking into msgpack: 100%|██████████| 200/200 [11:38<00:00,  3.49s/it]
...

On feature/multi_threaded_chunking + feature/show_progress_bulk_import:

python script.py
...
Chunking into msgpack: 100%|██████████| 200/200 [00:59<00:00,  3.38it/s]
...

No Progress Bar - 1M rows

Without the progress bar, due to the variable bulk import times in the performing a bulk import job step, it's not as easy to observe the direct effect of the PR. One can only measure the end-to-end time holistically (confounding variable, i.e. the import time, is not accounted for).

On main:

time python script.py
# add...

On feature/multi_threaded_chunking:

time python script.py
# add...

On my local environment:

  • Decreased chunking time from ~55min to ~5min for a 500M row dataframe
  • Similarly, decreased the chunking time from ~10min to ~1min for a 100M row dataframe
  • No effect on small datasets (such as 10k, since they're one chunk by default)

P.S. Resulting table in the database seems to have no side effects. Double checking tomorrow morning with fresh eyes if everything is as it should be in the results.

@DavidLandup0 DavidLandup0 requested a review from chezou December 5, 2024 08:34
@chezou
Copy link
Member

chezou commented Dec 5, 2024

@DavidLandup0 Thanks for the additional optimization! It's amazing

Copy link
Contributor

@tung-vu-td tung-vu-td left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@DavidLandup0 DavidLandup0 merged commit 7357c4e into master Dec 6, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants