Skip to content

Comments

RE command and plan stubs for plan-based remote management of the que…#49

Open
burkeds wants to merge 3 commits intobluesky:mainfrom
burkeds:remote_control
Open

RE command and plan stubs for plan-based remote management of the que…#49
burkeds wants to merge 3 commits intobluesky:mainfrom
burkeds:remote_control

Conversation

@burkeds
Copy link

@burkeds burkeds commented Apr 3, 2025

This is an exposure of the API via bluesky plan_stubs. This was motivated by the use-case of operando catalysis as part of the ROCK-IT project. Catalysis reactions necessitate independent control of scanning and sample environment. The need to be able to pause and change scans without interrupting sample environment control necessitates at least two RunEngines. By exposing the API to the RunEngine via plan_stubs, we are able to remotely manage a secondary queue from a primary RunEngine.

A new command "remote_queue" is registered to the RunEngine which calls a coroutine to interact with an REManagerAPI object.

Accompanying this is a library of plan_stubs which call methods and attributes of REManagerAPI. Here is an example of its use:

Note: Due to an apparent bug in the main code base, the async versions of REManagerAPI can't be called in this way. See issue #48.

Note: Issue #48 has been resolved. The example has been edited to demonstrate how to instantiate the asynchronous REManagerAPI.

from bluesky import RunEngine
from bluesky.plans import count
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan
from bluesky_queueserver_api.run_engine.re_command import remote_queue_coroutine, REMOTE_QUEUE_COMMAND
import bluesky_queueserver_api.run_engine.plan_stubs as rq

def my_plan(run_manager):
    print("Running plan")
    yield from count([])
    
    yield from rq.console_monitor_enable(run_manager)
    
    print("Is the console monitor enabled?")
    enabled = yield from rq.console_monitor_enabled(run_manager)
    print(enabled)
    
    user = yield from rq.user(run_manager)
    print("Current User", user)
    
    status = yield from rq.status(run_manager)
    worker_environment_exists = status['worker_environment_exists']
    worker_environment_state = status['worker_environment_state']
    
    if worker_environment_exists is True and worker_environment_state == 'open':
        yield from rq.environment_close(run_manager)
        yield from rq.wait_for_idle(run_manager)
    
    yield from rq.environment_open(run_manager)
    yield from rq.wait_for_idle(run_manager)
    
    item1 = BPlan('scan', ['detectors'], 'motor', 0, 1, 4)
    item2 = BPlan('count', ['detectors'], num=10)
    
    yield from rq.item_add(run_manager, item1)
    yield from rq.item_add(run_manager, item2)
    
    queue = yield from rq.queue_get(run_manager)
    print("queue", queue)
    
    print("Starting queue")
    yield from rq.queue_start(run_manager)
    yield from rq.wait_for_idle(run_manager)
    
    queue = yield from rq.queue_get(run_manager)
    print("queue", queue)
    
    print("Closing environment")
    yield from rq.environment_close(run_manager)
    yield from rq.wait_for_idle(run_manager)
    
    text = yield from rq.console_monitor_text(run_manager)
    print("Console monitor text")
    print(text)
    
    yield from rq.console_monitor_disable(run_manager)
    
    print("Done")

async def create_rm():
    thread_id = threading.current_thread().ident
    print(f"Background thread ID: {thread_id}")

    RM = REManagerAPI(
        zmq_control_addr="tcp://localhost:60615",
        zmq_info_addr="tcp://localhost:60625"
    )

    return RM
    
if __name__ == "__main__":
    RM = REManagerAPI(zmq_control_addr="tcp://localhost:60615", 
                      zmq_info_addr="tcp://localhost:60625")
    # RM.console_monitor.enable()
    
    RE = RunEngine()
    RE.register_command(REMOTE_QUEUE_COMMAND, remote_queue_coroutine)

    # How to to use async version of REManagerAPI
    # loop = RE.loop
    # f = asyncio.run_coroutine_threadsafe(create_rm(), loop)
    # RM = f.result(timeout=None)

    # Execute the plan using the RunEngine
    RE(my_plan(RM))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this dunder init file on the top level.

@mrakitin mrakitin requested a review from tacaswell September 15, 2025 15:37
@mrakitin
Copy link
Member

@sligara7, please have a look.

Copy link

@sligara7 sligara7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new run_engine features are pretty cool. I did not see any errors in code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants