Skip to content

Commit

Permalink
(python) Initially implement merge
Browse files Browse the repository at this point in the history
Iterate overall all python paths and if same folder has same name multiple times,
then merge the content and put to <job_dir>/site-packages

Solves problem with imports for some dependencies.

Default layout (/windmill/cache/):

dep==x.y.z
└── X
   └── A
dep-ext==x.y.z
└── X
   └── B

In this case python would be confused with finding B module.

This function will convert it to (/<job_id>):

site-packages
└── X
   ├── A
   └── B

This way python has no problems with finding correct module
  • Loading branch information
pyranota committed Nov 25, 2024
1 parent dd50815 commit ccfc708
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 3 deletions.
2 changes: 2 additions & 0 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ pub struct PythonAnnotations {
pub no_uv: bool,
pub no_uv_install: bool,
pub no_uv_compile: bool,

pub no_postinstall: bool,
}

#[annotations("//")]
Expand Down
111 changes: 108 additions & 3 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, fs, process::Stdio};
use std::{
collections::{HashMap, HashSet},
fs,
path::Path,
process::Stdio,
};

use itertools::Itertools;
use regex::Regex;
Expand All @@ -16,7 +21,7 @@ use windmill_common::{
error::{self, Error},
jobs::{QueuedJob, PREPROCESSOR_FAKE_ENTRYPOINT},
utils::calculate_hash,
worker::{write_file, WORKER_CONFIG},
worker::{write_file, PythonAnnotations, WORKER_CONFIG},
DB,
};

Expand Down Expand Up @@ -359,6 +364,100 @@ pub async fn uv_pip_compile(
Ok(lockfile)
}

fn copy_dir_recursively(src: &Path, dst: &Path) -> windmill_common::error::Result<()> {
if !dst.exists() {
fs::create_dir_all(dst)?;
}

for entry in fs::read_dir(src)? {
let entry = entry?;
let src_path = entry.path();
let dst_path = dst.join(entry.file_name());

if src_path.is_dir() {
copy_dir_recursively(&src_path, &dst_path)?;
} else {
fs::copy(&src_path, &dst_path)?;
}
}

Ok(())
}

/**
Iterate overall all python paths and if same folder has same name multiple times,
then merge the content and put to <job_dir>/site-packages
Solves problem with imports for some dependencies.
Default layout (/windmill/cache/):
dep==x.y.z
└── X
└── A
dep-ext==x.y.z
└── X
└── B
In this case python would be confused with finding B module.
This function will convert it to (/<job_id>):
site-packages
└── X
├── A
└── B
This way python has no problems with finding correct module
*/
#[tracing::instrument(level = "trace", skip_all)]
async fn postinstall(
additional_python_paths: &mut Vec<String>,
job_dir: &str,
) -> windmill_common::error::Result<()> {
// (PackageName, Vec<GlobalPath>)
let mut lookup_table: HashMap<String, Vec<String>> = HashMap::new();

for path in additional_python_paths.iter() {
for entry in fs::read_dir(&path).unwrap() {
let entry = entry.unwrap();

// TODO: Iterate only over windmill/cache
// Ignore all files, we only need directories.
// We can not merge files
if entry.file_type().unwrap().is_dir() {
let name = entry.file_name().to_str().unwrap().to_owned();

if !name.contains("dist-info") && name != "bin" {
if let Some(existing_paths) = lookup_table.get_mut(&name) {
println!("Found existing name: {:?}\n in {}", entry.file_name(), path);
existing_paths.push(path.to_owned())
} else {
lookup_table.insert(name, vec![path.to_owned()]);
}
}
// .or_else(|| tracing::error!("Can't convert name"));
}
}
}

let mut paths_to_remove: HashSet<String> = HashSet::new();
for existing_paths in lookup_table.values() {
for path in existing_paths {
copy_dir_recursively(
Path::new(path),
Path::new(&format!("{}/site-packages", job_dir)),
)?;

paths_to_remove.insert(path.to_owned());
}
}

additional_python_paths.retain(|e| !paths_to_remove.contains(e));
additional_python_paths.insert(0, format!("{}/site-packages", job_dir));
Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
pub async fn handle_python_job(
requirements_o: Option<String>,
Expand All @@ -378,7 +477,7 @@ pub async fn handle_python_job(
occupancy_metrics: &mut OccupancyMetrics,
) -> windmill_common::error::Result<Box<RawValue>> {
let script_path = crate::common::use_flow_root_path(job.script_path());
let additional_python_paths = handle_python_deps(
let mut additional_python_paths = handle_python_deps(
job_dir,
requirements_o,
inner_content,
Expand All @@ -394,6 +493,12 @@ pub async fn handle_python_job(
)
.await?;

if !PythonAnnotations::parse(inner_content).no_postinstall {
if let Err(e) = postinstall(&mut additional_python_paths, job_dir).await {
tracing::error!("Postinstall stage has been failed. Reason: {e}");
}
}

append_logs(
&job.id,
&job.workspace_id,
Expand Down

0 comments on commit ccfc708

Please sign in to comment.