Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract then merge #162

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

alienzj
Copy link

@alienzj alienzj commented Apr 25, 2024

When processing large-scale samples using SemiBin multi binning mode, data_cov.csv and data_split_cov.csv may require 1TB+ memory. This PR is dedicated to extracting sample-wise contigs coverage first and then merging, which can significantly reduce memory usage.

And after testing, I found it was still very slow when processing many (1K+) CSV files. So I updated the code to use polars to parse CSV file.

@alienzj
Copy link
Author

alienzj commented Apr 26, 2024

Benchmark data:

sample_id = "sampleA:"

cov_list = [
    "sampleA.sorted.bam_0_data_cov.csv", #  ~1G
    "sampleB.sorted.bam_1_data_cov.csv", #  ~1G
    "sampleC.sorted.bam_2_data_cov.csv", #  ~1G
    "sampleD.sorted.bam_3_data_cov.csv", #  ~1G
    "sampleE.sorted.bam_4_data_cov.csv", #  ~1G
    "sampleF.sorted.bam_5_data_cov.csv", #  ~1G
    "sampleG.sorted.bam_6_data_cov.csv"  #  ~1G
]

@alienzj
Copy link
Author

alienzj commented Apr 26, 2024

Test using pd.read_csv:

%%time
pd_dfs = []

for i in cov_list:
    data_cov = pd.read_csv(i, index_col=0, engine="pyarrow")
    data_cov = data_cov.reset_index()
    columns_list = list(data_cov.columns)
    columns_list[0] = 'contig_name'
    data_cov.columns = columns_list

    part_data = data_cov[data_cov['contig_name'].str.contains(sample_id, regex=False)]
    part_data = part_data.set_index("contig_name")
    part_data.index.name = None
    part_data.index = [ix.split(":")[1] for ix in part_data.index]
    
    pd_dfs.append(part_data)
    
sample_cov = pd.concat(pd_dfs, axis=1)
sample_cov.index = sample_cov.index.astype(str)

abun_scale = (sample_cov.mean() / 100).apply(np.ceil) * 100
sample_cov = sample_cov.div(abun_scale)

Results:

CPU times: user 49.1 s, sys: 19.4 s, total: 1min 8s
Wall time: 49.3 s

@alienzj
Copy link
Author

alienzj commented Apr 26, 2024

Test using pl::read_csv:

%%time
pl_dfs_read = []

for i in cov_list:
    data_cov = pl.read_csv(i)\
    .rename({"": "contig_name"}).filter(pl.col("contig_name").str.contains(sample_id))
    
    pl_dfs_read.append(data_cov)

contig_cov = pl.concat(pl_dfs_read, how="align")

contig_names = [i.split(":")[1] for i in contig_cov["contig_name"]]

contig_cov = contig_cov.drop("contig_name")
headers = ["contig_name"] + list(contig_cov.columns)

abun_scale = (contig_cov.mean() / 100).map_rows(np.ceil) * 100
divided_columns = [pl.col(col) / abun_scale[0, index] for index, col in enumerate(list(contig_cov.columns))]

result = contig_cov.select(divided_columns)

result = result.with_columns(pl.Series("contig_name", contig_names))
result = result.select(headers)

Results:

CPU times: user 19.1 s, sys: 3.31 s, total: 22.4 s
Wall time: 3.49 s

@alienzj
Copy link
Author

alienzj commented Apr 26, 2024

Test using pl::scan_csv:

%%time
pl_dfs_scan = []

for i in cov_list:
    data_cov = pl.scan_csv(i)\
    .rename({"": "contig_name"}).filter(pl.col("contig_name").str.contains(sample_id)).collect()
    
    pl_dfs_scan.append(data_cov)

contig_cov = pl.concat(pl_dfs_scan, how="align")

contig_names = [i.split(":")[1] for i in contig_cov["contig_name"]]

contig_cov = contig_cov.drop("contig_name")
headers = ["contig_name"] + list(contig_cov.columns)

abun_scale = (contig_cov.mean() / 100).map_rows(np.ceil) * 100
divided_columns = [pl.col(col) / abun_scale[0, index] for index, col in enumerate(list(contig_cov.columns))]

result = contig_cov.select(divided_columns)

result = result.with_columns(pl.Series("contig_name", contig_names))
result = result.select(headers)

Results:

CPU times: user 18.6 s, sys: 2.86 s, total: 21.5 s
Wall time: 2.3 s

@alienzj
Copy link
Author

alienzj commented Apr 26, 2024

System information:

        #####           
       #######          ---------------------
       ##O#O##          OS: AlmaLinux 9.3 (Shamrock Pampas Cat) x86_64
       #######          Host: VMware7,1 None
     ###########        Kernel: 5.14.0-362.24.2.el9_3.x86_64
    #############       Uptime: 8 days, 1 hour, 49 mins
   ###############
   ################     Packages: 2261 (rpm)
  #################     Shell: fish 3.3.1
#####################   Resolution: 1280x800
#####################   Terminal: /dev/pts/0
  #################     CPU: AMD EPYC 7763 (100) @ 2.445GHz
                        GPU: NVIDIA A40
                        Memory: 27503MiB / 805752MiB

Software information:

Python 3.9.19
pandas 2.2.2
polars 0.20.21

@apcamargo
Copy link

apcamargo commented Aug 12, 2024

After a quick look at the code, I think there are some potential optimizations that could improve memory usage and performance (though speed may not be the primary concern here).

  • Polars' scan_csv function supports wildcards (e.g., cov_*.csv) or a list of file paths as input. This eliminates the need to append data frames to a list in a loop. However, Polars will generate a single long data frame, which you would need to reshape into a wide format using the pivot function. One caveat is that you would need a suitable column to use as the index for the pivot operation.
  • The Polars streaming API could improve memory usage, as it allows for filtering data without loading the entire dataset into memory.
  • Instead of dropping the contig names, manipulating the coverage values, and then creating a new data frame with the contig names, you can use Polars' selectors to operate directly on the coverage columns while keeping the name column unchanged.
  • Using Polars expressions instead of pure Python (e.g., [i.split(":")[1] for i in contig_cov["contig_name"]]) or NumPy (e.g., (contig_cov.mean() / 100).map_rows(np.ceil) * 100) can reduce the creation of additional objects and improve code efficiency. The first example could be replaced with extract, and the second could potentially be replaced by using Polars' mean expression coupled with over. It might be feasible to convert this entire piece of code to Polars (which is mostly zero-copy, so memory usage should improve).

Apologies for the unsolicited advice! I’m particularly interested in this issue and thought I could put the time I spent learning Polars to good use. Please feel free to disregard any of this!

@luispedro
Copy link
Member

At first, I was hesitant to add polars as a dependency, but I have also been switching to polars from pandas, so I am less worried about it now.

@apcamargo
Copy link

I had the same exact experience. I try to avoid extra dependencies as much as possible, but nowadays Polars is one of the few packages that I don't mind depending on. Polars itself also has very minimal dependencies.

@alienzj
Copy link
Author

alienzj commented Sep 9, 2024

Thank you @apcamargo so much for your suggestions!
I am going to refine the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants