Skip to content

Commit

Permalink
fix: download wheels in parallel to avoid deadlock (#752)
Browse files Browse the repository at this point in the history
This paralellizes the download/building of wheels (vs concurrent) to
avoid a deadlock issue we encountered.
  • Loading branch information
baszalmstra committed Feb 1, 2024
1 parent 80f5590 commit 5e88db8
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub async fn update_prefix_pypi(
name: &str,
prefix: &Prefix,
platform: Platform,
package_db: &PackageDb,
package_db: Arc<PackageDb>,
conda_records: &[RepoDataRecord],
pypi_records: &[(PypiPackageData, PypiPackageEnvironmentData)],
status: &PythonStatus,
Expand Down Expand Up @@ -388,7 +388,7 @@ impl<'p> LockFileDerivedData<'p> {
environment.name().as_str(),
&prefix,
platform,
&package_db,
package_db,
&repodata_records,
&pypi_records,
&python_status,
Expand Down
90 changes: 61 additions & 29 deletions src/install_pypi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use crate::consts::PROJECT_MANIFEST;
use crate::project::manifest::SystemRequirements;
use crate::pypi_marker_env::determine_marker_environment;
use crate::pypi_tags::{is_python_record, project_platform_tags};
use pep508_rs::MarkerEnvironment;
use rattler_conda_types::{Platform, RepoDataRecord};
use rattler_lock::{PypiPackageData, PypiPackageEnvironmentData};
use rip::artifacts::wheel::{InstallPaths, UnpackWheelOptions};
use rip::artifacts::Wheel;
use rip::index::PackageDb;
use rip::python_env::{
find_distributions_in_venv, uninstall_distribution, Distribution, PythonLocation, WheelTag,
WheelTags,
};
use rip::resolve::{ResolveOptions, SDistResolution};
use rip::types::{
Expand All @@ -28,6 +30,7 @@ use rip::wheel_builder::WheelBuilder;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinError;

Expand All @@ -40,7 +43,7 @@ type CombinedPypiPackageData = (PypiPackageData, PypiPackageEnvironmentData);
// TODO: refactor arguments in struct
#[allow(clippy::too_many_arguments)]
pub async fn update_python_distributions(
package_db: &PackageDb,
package_db: Arc<PackageDb>,
prefix: &Prefix,
conda_package: &[RepoDataRecord],
python_packages: &[CombinedPypiPackageData],
Expand Down Expand Up @@ -91,31 +94,32 @@ pub async fn update_python_distributions(
.ok_or_else(|| miette::miette!("could not resolve pypi dependencies because no python interpreter is added to the dependencies of the project.\nMake sure to add a python interpreter to the [dependencies] section of the {PROJECT_MANIFEST}, or run:\n\n\tpixi add python"))?;

// Determine the environment markers
let marker_environment = determine_marker_environment(platform, python_record.as_ref())?;
let marker_environment = Arc::new(determine_marker_environment(
platform,
python_record.as_ref(),
)?);

// Determine the compatible tags
let compatible_tags =
project_platform_tags(platform, system_requirements, python_record.as_ref());

let wheel_builder = WheelBuilder::new(
package_db,
&marker_environment,
Some(&compatible_tags),
&ResolveOptions {
sdist_resolution,
python_location: PythonLocation::Custom(python_location),
..Default::default()
},
HashMap::default(),
)
.into_diagnostic()
.context("error in construction of WheelBuilder for `pypi-dependencies` installation")?;
let compatible_tags = Arc::new(project_platform_tags(
platform,
system_requirements,
python_record.as_ref(),
));

// Define the resolve options for local wheel building
let resolve_options = Arc::new(ResolveOptions {
sdist_resolution,
python_location: PythonLocation::Custom(python_location),
..Default::default()
});

// Start downloading the python packages that we want in the background.
let (package_stream, package_stream_pb) = stream_python_artifacts(
package_db,
marker_environment,
compatible_tags,
resolve_options,
python_distributions_to_install.clone(),
Some(&wheel_builder),
);

// Remove python packages that need to be removed
Expand Down Expand Up @@ -229,12 +233,14 @@ async fn install_python_distributions(

/// Creates a stream which downloads the specified python packages. The stream will download the
/// packages in parallel and yield them as soon as they become available.
fn stream_python_artifacts<'a>(
package_db: &'a PackageDb,
packages_to_download: Vec<&'a CombinedPypiPackageData>,
wheel_builder: Option<&'a WheelBuilder<'a, 'a>>,
fn stream_python_artifacts(
package_db: Arc<PackageDb>,
marker_environment: Arc<MarkerEnvironment>,
compatible_tags: Arc<WheelTags>,
resolve_options: Arc<ResolveOptions>,
packages_to_download: Vec<&CombinedPypiPackageData>,
) -> (
impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + 'a,
impl Stream<Item = miette::Result<(Option<String>, HashSet<Extra>, Wheel)>> + '_,
Option<ProgressBar>,
) {
if packages_to_download.is_empty() {
Expand All @@ -261,6 +267,11 @@ fn stream_python_artifacts<'a>(
.map(move |(pkg_data, pkg_env_data)| {
let pb = stream_pb.clone();
let message_formatter = message_formatter.clone();
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();

async move {
// Determine the filename from the
let filename = pkg_data
Expand Down Expand Up @@ -293,9 +304,31 @@ fn stream_python_artifacts<'a>(
yanked: Default::default(),
};

// TODO: Maybe we should have a cache of wheels separate from the package_db. Since a
// wheel can just be identified by its hash or url.
let wheel: Wheel = package_db.get_wheel(&artifact_info, wheel_builder).await?;
let wheel = tokio::spawn({
let marker_environment = marker_environment.clone();
let compatible_tags = compatible_tags.clone();
let resolve_options = resolve_options.clone();
let package_db = package_db.clone();
async move {
let wheel_builder = WheelBuilder::new(
&package_db,
&marker_environment,
Some(&compatible_tags),
&resolve_options,
HashMap::default(),
)
.into_diagnostic()
.context("error in construction of WheelBuilder for `pypi-dependencies` installation")?;

// TODO: Maybe we should have a cache of wheels separate from the package_db. Since a
// wheel can just be identified by its hash or url.
package_db.get_wheel(&artifact_info, Some(&wheel_builder)).await
}
})
.await.unwrap_or_else(|e| match e.try_into_panic() {
Ok(panic) => std::panic::resume_unwind(panic),
Err(_) => Err(miette::miette!("operation was cancelled"))
})?;

// Update the progress bar
pb_task.finish().await;
Expand All @@ -322,8 +355,7 @@ fn stream_python_artifacts<'a>(
))
}
})
// TODO: put this back on 20 when there is not deadlock anymore.
.buffer_unordered(1)
.buffer_unordered(20)
.right_stream();

(download_stream, Some(pb))
Expand Down

0 comments on commit 5e88db8

Please sign in to comment.