Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triggering Argo Workflow pipelines #819

Open
DiamondJoseph opened this issue Feb 13, 2025 · 9 comments
Open

Triggering Argo Workflow pipelines #819

DiamondJoseph opened this issue Feb 13, 2025 · 9 comments

Comments

@DiamondJoseph
Copy link
Contributor

DiamondJoseph commented Feb 13, 2025

Been talking a lot with @olliesilvester today about how this is going to be accomplished, so wanted to get some words down.

Referencing and sometimes citing https://diamondlightsource.github.io/workflows/docs/

Ollie is working on enabling small in-process callbacks that are required for some plans. Because of the plan stub subscribe, this can be done from within a plan without needing to touch the RunEngine, but a way to make sure that the RunEngine unsubscribes again from the subscription id would be useful, else we risk gaining a growing overlapping set of callbacks with every run.

Because these callbacks are being run within the same python process as blueapi, they are limited by the resources given, which should remain minimal. When callbacks grow to require more resources or speed or repeatability, they should be extracted into the workflow system.

We therefore need to consider how to trigger those workflows. Blueapi already has one persistent callback to send documents to the message bus for the client and other services- the following assumes that this callback remains in place either to send messages to a bus or to insert documents into a document store.

In order to enable a complete experiment I believe we need:

  • An API for workflows to consume data from [the message bus|a document store]
  • An API for workflows to put data into [the message bus|a document store|the blueapi instance that spawned them]
  • An API for plans to receive data that results from workflow runs and use them to adapt their future behaviour
  • A way of triggering workflows: we should be able to decorate a plan and define 1 or many workflows that should be executed for [the plan|each run within the plan]

A simple plan that triggers an external workflow

def my_plan(detectors: list[Readable], workflow_param: int):
    @run_workflow(workflow_id="my_workflow", params={"my_param": workflow_param)
    def inner_plan():
        run_id = yield from open_run()
        for _ in range(3):
            yield from OurAPI.next_position_from_workflow(PositionType, "my_workflow", run_id)
        yield from close_run()

A simple workflow, for which there is a copier template (or similar) to build a container and register it with the workflow engine with the name "my_workflow", such that the only thing required to create a new workflow is defining my_analysis or my_per_point_analysis or both (? Something entirely within this python file only)

def my_analysis(foo: int, data) -> PositionType:
    ...

def my_per_point_analysis(foo: int, data) -> PositionType:
    ...

@click.command()
@click.argument('my_param')
def main(ctx: click.Context, my_param: int, run_id: uuid) -> None:
    run = tiledAPI.run_from(run_id)
    OurAPI.send(my_analysis(my_param, run.get_data()))
    while run.has_more():
        OurAPI.send(per_point_analysis(my_param,  run.next_position())

Acceptance Criteria

  • All of the sub-issues on this issue have been resolved
@DiamondJoseph
Copy link
Contributor Author

I have assumed in this that OurAPI extracts authentication information and information about sending information back into the plan from either its ENVironment or from additional paramets unshown

@DiamondJoseph
Copy link
Contributor Author

@run_workflow is something like a Msg pre-processor that when it sees a StartDocument inserts the run_id into a message to the Workflow engine.
As we have made an authenticated request to blueapi, blueapi can then on-behalf-of the user get a token that is authorized for running the workflow and accessing the data with that run_id.

@olliesilvester
Copy link

olliesilvester commented Feb 13, 2025

Ollie is working on enabling small in-process callbacks that are required for some plans. Because of the plan stub subscribe, this can be done from within a plan without needing to touch the RunEngine, but a way to make sure that the RunEngine unsubscribes again from the subscription id would be useful, else we risk gaining a growing overlapping set of callbacks with every run

This shouldn't be a problem since subscribe returns the uid for that subscription, so the decorator handling the subscriptions can keep track of everything

Also, for interest, MX main uses for callbacks are:

  • Creating nexus files based off experimental parameters
  • Depositing information to ISPyB
  • Triggering zocalo processing

In Hyperion, we run these in one external process and use bluesky.callbacks.zmq to send messages across from our plan-running process. For our non-hyperion bluesky beamlines, we are using BlueAPI, so can't easily run callbacks in external processes. However, it's not a blocker since for now. Running these callbacks internally using the subscribe plan stub will be fine, and we can switch to the Argo way once it's ready.

Tagging @DominicOram for interest too

@DiamondJoseph
Copy link
Contributor Author

Any processing that cannot be run in the workflow engine (proprietary analysis?) can still make use of the APIs for consuming/sending data but a way of passing authn information in will be required [probably as a small additional note on the Copier template?].

How will workflows be organised? One repository that in an ArgoCD like way is loaded for the workflow engine?

@DiamondJoseph
Copy link
Contributor Author

So the MX use cases right now are fire-and-forget style callbacks, without any further introspection required for the plans, or any introspection is e.g. loaded from ISPyB?

@olliesilvester
Copy link

olliesilvester commented Feb 13, 2025

So the MX use cases right now are fire-and-forget style callbacks, without any further introspection required for the plans, or any introspection is e.g. loaded from ISPyB?

For the external callbacks, yes (as far as I can see).

We do have a few internal callbacks in Hyperion which we introspect during the plan, but these are doing very small things and Argo wouldn't need to know about it

@DiamondJoseph
Copy link
Contributor Author

DiamondJoseph commented Feb 13, 2025

sequenceDiagram
actor Alice
participant Keycloak
participant Blueapi
participant Tiled
participant Argo Workflows

    Alice <<->> Keycloak: Log in to blueapi & cache <run_token>
    Note left of Keycloak: scopes=["data:write"]
    Note left of Blueapi: @run_workflow("my_processing")<br/> def my_plan()
    Alice ->>+ Blueapi: run my_plan with <run_token>
    Blueapi <<->> Keycloak: on-behalf-of Alice request <process_token><br> with <run_token>
    Note right of Keycloak: scopes=["data:write", "data:read", "analysis:my_processing"]
    Blueapi ->> Argo Workflows: run my_processing with <process_token>
    create participant my_processing
    Argo Workflows ->> my_processing: Creates

    loop until plan finishes
        Blueapi ->> Tiled: Insert docs callback
        Tiled ->>+ my_processing: Tiled API get data
        my_processing ->>- Blueapi: New workflow API inform decisions
    end

    Blueapi ->> Argo Workflows: Finished
    Destroy my_processing
    Argo Workflows ->> my_processing: Finished

    Blueapi ->>- Alice: Finished
Loading
  • The auth token is embedded into the processing container when it is created
  • When it makes calls to the document store, it's authenticated as the user that made the request.

@DominicOram
Copy link
Contributor

An additional use-case of note is that some of our callbacks rely on other callbacks. Which we use the emit for in the DocumentRouter. Specifically we need to write to ispyb then send the ispyb key to zocalo. So we chain the ispyb and zocalo callbacks. It would be nice if this chain could also be multiprocess, you can envisage wanting a service that was allowed ispyb write credentials and another that was allowed zocalo triggering credentials. Having them multiprocess isn't a hard requirement but allowing them to be chained still is.

So the MX use cases right now are fire-and-forget style callbacks, without any further introspection required for the plans, or any introspection is e.g. loaded from ISPyB?

Yes, they are fire-and-forget but this has caused issues:

  • We have a zocalo callback for triggering analysis and then a zocalo ophyd device for reading the results. This is really messy and would be nicer to have one entity responsible for reading and writing
  • We had an issue where we needed to know the callback had finished before moving on to the next task in the plan (I can't remember the exact issue but can dig it out if interested) we got round this by changing the structure of the plan but I can see a general need for it

When we first started hyperion we did have callbacks that we interrogated but this didn't really seem to fit the bluesky model where there's no obvious way of getting information back from callbacks unless you just carry around a reference to the instance everywhere. I think it would be nice to find a cleaner way to do this (basically the New workflow API inform decisions in your diagram) but will need wider discussion with the collaboration

@callumforrester
Copy link
Contributor

This all looks good from my point of view, but I don't think we should get too far into the design without trying something simple first: Trigger a post-processing job at the end of a scan, no feedback into the plan etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants