Skip to content

Conversation

t-kalinowski
Copy link
Collaborator

@t-kalinowski t-kalinowski commented Sep 23, 2025

The primary motivation for this function is to speed up store creation using parallelism. It offloads the work of reading, converting to markdown, chunking, and generating embeddings to mirai background workers. This substantially reduces store creation time. For example, creating a store from quarto.org processes about 90% of the site in ~20 seconds before hitting a TPM rate limit of 1 million tokens per minute with OpenAI. Previously it would take around 10 minutes total.

This will require a solution for rate-limit handling, either through smart backoff or support for using the batch API. These can be addressed in follow-up PRs.

We now disable the background workers from retrying the request after a 429. The main thread then tries to generate embeddings with retries, and blocks until the embedding request succeeds. This gracefully leads to backpressure and lets the whole queue be rate-limited without spamming excessive retries.

Copy link
Collaborator

@dfalbel dfalbel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

I would abstract the queue implementation from actual ingest code so it's easier to read.

R/ingest.R Outdated
if (mirai::is_error_value(result)) {
cond <- attributes(result)
class(cond) <- cond$condition.class
stop(cond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to fully stop when something goes wrong? I like it would be nice to recover and then offer an easy way to get the paths that didn't work.

Copy link
Member

@shikokuchuo shikokuchuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider adding a way for users to use existing mirai daemons that they may have set up (to allow remote daemons). This could be by default or opt-in via an argument.

If you need to make anything conditional, you may use mirai::daemons_set().

An alternative is to do what purrr does, and abdicate responsibility to the user. E.g. offer a parallel = FALSE argument and for the TRUE branch, guard that with a mirai::require_daemons() call.

@t-kalinowski
Copy link
Collaborator Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting

Comment on lines +147 to +155
.launch_jobs <- function() {
while (
length(.pending) &&
(length(.finished) + length(.active)) < max_uncollected
) {
mirai_args <- .pending[[1L]]
.pending <<- .pending[-1L]
.active[[length(.active) + 1L]] <<-
inject(mirai::mirai(!!!mirai_args, .compute = .compute))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P0] Reference rlang::inject when launching mirai jobs

The new queue helper calls inject(mirai::mirai(!!!mirai_args, .compute = .compute)), but inject is not defined or imported anywhere in the package. As soon as ragnar_store_ingest() schedules its first job, R will error with could not find function "inject" and the parallel ingestion never starts. Prefixing the call with rlang:: (or importing it) is required for this function to work at all.

Useful? React with 👍 / 👎.

Comment on lines +111 to 118
do_ingest_remote_work <- function(path, store, prepare, embed = TRUE) {
chunks <- prepare(path)
if (embed) {
tryCatch(
{
chunks <- origin |> read_as_markdown() |> markdown_chunk()
ragnar_store_update(store, chunks)
},
error = warning
chunks <- do_embed(store, chunks),
error = function(e) NULL
)
# cli::cli_progress_update()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Propagate embedding failures from workers

Inside do_ingest_remote_work() the embedding call is wrapped with tryCatch(..., error = function(e) NULL) and any failure is discarded before returning chunks to the main process. When the embedder relies on packages not attached in mirai workers (as in the new test that intentionally omits stats::runif), the error is swallowed and ingestion succeeds silently because the main thread re‑embeds later. This prevents callers from learning that their embed function is misconfigured and causes the added expect_error() test to fail. Consider letting the error bubble up or returning it so that the main loop can stop or handle it explicitly.

Useful? React with 👍 / 👎.

@t-kalinowski
Copy link
Collaborator Author

Thanks @shikokuchuo. I’d prefer to keep daemons’ lifetimes tied to the function call, since we configure global state in each worker. That way it stays simple and avoids the potential for leaking state or misconfigured workers.

@t-kalinowski t-kalinowski merged commit 027c4e6 into main Sep 30, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants