Replies: 4 comments 5 replies
-
That's a strange error for a local controller. What do the log files in My first recommendation is to update |
Beta Was this translation helpful? Give feedback.
-
OK so I did this and pushed to one worker and it worked. Still don't know why it doesn't work within a targets pipeline. I updated all the packages. collateNodeCounts <- function(summariseTripTuples){
target_names <- tar_objects()
target_names <- target_names[str_detect(target_names,'summariseTripTuples')]
batch_size <- 10
result_summariseTripTuples <- NULL
# Function to process and combine data frames
process_combine <- function(data_frames) {
bind_rows(data_frames) %>%
group_by(node_id1, node_id2, weight_category) %>%
summarise(vehicle_count = 1,
trips_n = sum(trips_n, na.rm = TRUE),
.groups = 'drop')
}
# Split target names into batches
target_batches <- split(target_names, ceiling(seq_along(target_names) / batch_size))
# Iterate over each batch
for (batch in target_batches) {
# Load all targets in the current batch
batch_data <- lapply(batch, function(target_name) {
filelocation <- gsub(' ', '', paste("_targets/objects/", target_name))
qs::qread(filelocation)
})
# Combine all data frames in the current batch
batch_combined <- process_combine(batch_data)
# Combine with the result from previous batches
if (is.null(result_summariseTripTuples)) {
result_summariseTripTuples <- batch_combined
} else {
result_summariseTripTuples <- process_combine(list(result_summariseTripTuples, batch_combined))
}
print(nrow(result_summariseTripTuples))
rm(batch_data, batch_combined)
gc()
}
return(result_summariseTripTuples)
}
controller <- crew::crew_controller_local(
name = "snapping_test",
workers = 1,
launch_max = 5L,
local_log_directory = '/tmp/crew',
seconds_idle = 3,
garbage_collection = TRUE
)
controller$start()
controller$push(packages = c('tidyverse', 'targets'),
name = "summariseTripTuples", command = {
collateNodeCounts()
})
controller$wait(mode = 'all')
task <- controller$pop()
controller$terminate()
crew_clean()
task The number of rows slowly increase towards the unique combinates of node1, node2, and weight category. This is better than the 371M rows! I got a data frame with the expected number of results. A tibble: 1 × 12
name command result seconds seed algorithm error trace warnings launcher worker instance
<chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr> <chr> <int> <chr>
1 summariseTripTuples NA <tibble [2,295,481 × 5]> 1557. NA NA NA NA NA snapping_test 1 7edd89e3… |
Beta Was this translation helpful? Give feedback.
-
The default behavior of You can also use file targets for big data cases like this, in which case your I am not aware that it is possible to do a "partial gather" in |
Beta Was this translation helpful? Give feedback.
-
Pipeline ran successfully. Thanks for all the help. |
Beta Was this translation helpful? Give feedback.
-
My pipeline does the following:
Last error message: {crew} worker 1 launched 5 times in a row without completing any tasks. Either troubleshoot or raise launch_max above 5.
The error message suggests that the memory requirements can be altered, but I don't see how. I'm running this on an AWS EC2 c5a.8xlarge with 32vCPUs and 64Gb RAM.
this is the controller Im using
controller <- crew::crew_controller_local( name = "snapping", workers = 1, launch_max = 10L, local_log_directory = '/tmp/crew', garbage_collection = TRUE )
any help to overcome this would be appreciated
Beta Was this translation helpful? Give feedback.
All reactions