Skip to content

Feature: Device messages#1832

Open
bechols97 wants to merge 46 commits intodevelopfrom
feature/bechols97/device_messages
Open

Feature: Device messages#1832
bechols97 wants to merge 46 commits intodevelopfrom
feature/bechols97/device_messages

Conversation

@bechols97
Copy link
Copy Markdown

@bechols97 bechols97 commented Apr 28, 2025

Summary

This PR adds a feature for device messages (i.e. store function arguments on device to be handled by a host callback at a later time). The original idea for the feature is to have better error handling on the device with handling any output on with a host callback. This moves the code from Camp to RAJA due dependency on atomic operations.

Design review

For the design, there are some open questions. (regarding these open questions please see the Design notes below)

  1. Currently, this design supports a MPSC model with the expectation that the device side will produce messages while a single host thread consumes messages.
  2. Additionally, before consuming messages, the current implementation forces the stream to synchronize.
  3. If there are more messages than the buffer size, then those messages are lost. This is to avoid waits on the device side. However, would it be beneficial to have a configurable option to use a circular buffer?

Design notes

Based on discussion from a meeting:

  • As suggested in the meeting, there should be policies for the message queue to support different use cases and allows the message queue to be extended if there are any future use cases.
  • Based on current use cases, the default policy of the message queue should support a MPSC model, wait all should synchronize the stream, and not storing additional messages when the buffer is full.
  • The message_handler class that stores the callback should be move-only to prevent accidental copies to a lambda with supporting a view-like queue to copy to device kernels (as mentioned below by @MrBurmark).

Additional design notes

Based on further discussions from a meeting:

  • Ability to get range of messages (so that user can sort)
  • Ability to unsubscribe a callback
  • Ability to bind args for a callback, such as source location or strings.
    • If moving to C++20, std::bind_front can be used. However, if needed, this can be added in a future PR
  • Store number of missed messages
    • Posting messages returns a bool. Users can use this to get number of missed messages.
  • Remove duplicate messages
    • This would likely be a separate queue policy, which can be added in a future PR
  • Move interface to something similar to the Reducer interface, where kernel/forall loop gives correct resource.

  - This moves message container to RAJA to avoid dependency
    issues with required atomic operation. Currently, testing and
    waiting for messages will block until the stream is synchronized.
@MrBurmark
Copy link
Copy Markdown
Member

I think the design would benefit from a view like object that can be used in device kernels.

@artv3
Copy link
Copy Markdown
Member

artv3 commented May 11, 2025

@bechols97 , can you add an example in the examples folder? I have a use case where this may be handy. In my application if a thread gets a negative value, I want to take note and output it at the end of the kernel by the root rank. Currently I am using printf and every thread that encounters the negative value spews information onto the screen. To double check, would this be a use case?

@bechols97
Copy link
Copy Markdown
Author

Hi @artv3, yes that could be a use case for this. I will add an example of something similar to the examples folder.

bechols97 added 4 commits May 12, 2025 09:04
  - This allows the message queue to be passed to RAJA kernels
  - This also allows the message queue to be allocated with pinned
    memory when needed
  - Currently, example requires XNACK with HIP. (message queue should be using
    pinned memory so need to look into this)
@rcarson3
Copy link
Copy Markdown
Member

@bechols97 so one of the libraries I maintain has a failure macro that on the host side throw an error with some useful error messages associated with it which can provide useful context for users / developers for why something failed. Do you think we could emulate something like with this framework if we could provide the absolute max size of the char array / string literal that we'd like and then at the error site have that passed into your message class?

@MrBurmark
Copy link
Copy Markdown
Member

I think it would be possible to add a fixed capacity string-like object that could be passed through this interface.

@bechols97
Copy link
Copy Markdown
Author

@rcarson3 Yes, the original intention behind this idea is to be used as a device side error handler (though left to be more generic in case there are other use cases). As @MrBurmark mentioned, it would be possible to use a fixed-string object with the message handler. For string literals, you would want some type of fixed-string object / char array to store the string that is later passed to a host side callback.

In addition to the fixed-string object, any type that is trivially destructible should work as well.

@bechols97 bechols97 requested a review from a team June 3, 2025 20:10
@MrBurmark
Copy link
Copy Markdown
Member

MrBurmark commented Jun 5, 2025

The current state looks good if you're trying to handle a single kind of error.

RAJA::resources::Host res{};

auto logger = RAJA::message_handler<void(int*, int, int)>(num_messages, res, 
  [](int* ptr, int idx, int value) {
    std::cout << "\n pointer " << ptr << " a[" << idx << "] = " << value << "\n";
  }
);

auto cpu_msg_queue = logger.get_queue<RAJA::mpsc_queue>();
RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    cpu_msg_queue.try_post_message(a, i, a[i]); 
  }
});

logger.wait_all();

@MrBurmark
Copy link
Copy Markdown
Member

There are use cases where we might want to handle multiple kinds of errors each with different data in the same loop. Does anyone else have such a use case? What do you think of a slightly more general interface that looks more like this?

By moving the signature to the queues we don't know the message sizes upfront. So I'm not sure if it makes sense to do the sizing upfront or later when we know what kinds of messages are possible.

RAJA::resources::Host res{};

auto logger = RAJA::message_handler(res, num_bytes);

auto queue1 = logger.get_queue<RAJA::mpsc_queue, void(int*, int, int)>(num_messages, host, 
  [](int idx, int* a, int val_a) {
    std::cout << "\n Oh no! a{" << a << "}[" << idx << "] = " << val_a<< "\n";
  }
);

auto queue2 = logger.get_queue<RAJA::mpsc_queue, void(int*, int, int)>(num_messages, host, 
  [](int idx, int* a, int val_a, double* b, double val_b) {
    std::cout << "\n Inconceivable! a{" << a << "}[" << idx << "] = " << val_a <<
                             " and  b{" << b << "}[" << idx << "] = " << val_b << "\n";
  }
);

RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    queue1.try_post_message(i, a, a[i]); 
  }
  if (a[i] == 0 && b[i] < 0) { 
    queue2.try_post_message(i, a, a[i], b, b[i]); 
  }
});

logger.wait_all();

@MrBurmark
Copy link
Copy Markdown
Member

Another use case that I'm considering is having a long lived logger with an allocation. Then I can enqueue multiple types of messages in that while keeping the gpu running and check for messages occasionally to avoid extra synchronizes.
I am not sure this is feasible however as most of our logging use cases involve catching an error and stopping. If we did continue running we would likely encounter a hard error like a seg fault later.

@bechols97
Copy link
Copy Markdown
Author

bechols97 commented Jun 5, 2025

Being able to support multiple error/logging messages within the same loop is definitely a use case that we would want to support. This is something that the library I help maintain uses.

There are a couple of concerns with moving the callback to be a parameter of the get_queue function:

  • The callback is currently stored as a std::function in message_handler. Moving the callback to the get_queue the callback could still technically be stored in message_handler, but this would likely require additional virtual functions, which could increase the overhead on the host side. This seems reasonable since for the most part since the main use cases are for debugging/error handling where the callbacks aren't expected to be called often.
  • If the callback is stored in msg_queue, then queue1 and queue2 now have ownership of the std::function. This would then be copied to the RAJA kernel and likely run into compilation issues the device execution spaces.

Just to show another option with the current interface: (Please note this example is not entirely the same and requires some additional types to be created; however, one could create a type similar to std::inplace_vactor<.., 5> and std::variant to store up to 5 arguments with better type safety):

union msg_arg {
  int i;
  double d;
};

enum msg_type
{
  MSG_INT,
  MSG_DLB
};

RAJA::resources::Host res{};

auto logger = RAJA::message_handler<void(msg_type, int, void*, msg_arg)>(res, num_msg, 
  [] (msg_type type, int idx, void* ptr, msg_arg val) {
    if (type == msg_type::MSG_INT) {
      std::cout << "\n Oh no! a{" <<ptr << "}[" << idx << "] = " << val.i << "\n";
    } else if (type == msg_type::MSG_DBL) {
      std::cout << "\n Inconceivable!  b{" << ptr << "}[" << idx << "] = " << val.d << "\n";
    }
});
auto queue = logger.get_queue<RAJA::mpsc_queue>();

RAJA::forall<RAJA::seq_exec>(host, RAJA::RangeSegment(0, N), [=] (int i) { 
  if (a[i] < 0) { 
    queue.try_post_message(msg_type::MSG_INT, i, a, a[i]); 
  }
  if (a[i] == 0 && b[i] < 0) { 
    queue.try_post_message(msg_type::MSG_DBL, i, b, b[i]); 
  }
});

logger.wait_all()

@rhornung67 rhornung67 marked this pull request as ready for review September 30, 2025 16:56
@artv3
Copy link
Copy Markdown
Member

artv3 commented Nov 5, 2025

@bechols97 , can we bring this up to date?

  - Also, fixes examples to not use stream 2 memory in stream 1
  - This allows messages to be sorted, filtered, etc.
…LLNL/RAJA into feature/bechols97/device_messages

std::cout << "\n Running RAJA omp_parallel_for_static_exec (default chunksize) vector addition...\n";

RAJA::forall<RAJA::omp_parallel_for_static_exec< >>(host, RAJA::RangeSegment(0, N),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may have extra < > in the policy?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, it has a default chunk size that you can modify:

template<int ChunkSize = default_chunk_size>
using omp_parallel_for_static_exec =
omp_parallel_exec<omp_for_schedule_exec<omp::Static<ChunkSize>>>;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunk size doesn't really matter in this example, So, if you prefer having an explicit value in the < > to be more clear, then I don't mind updating this.

Comment on lines +220 to +231
#if defined(RAJA_ENABLE_CUDA)
RAJA::resources::Cuda res_gpu1;
RAJA::resources::Cuda res_gpu2;
using EXEC_POLICY = RAJA::cuda_exec_async<GPU_BLOCK_SIZE>;
#elif defined(RAJA_ENABLE_HIP)
RAJA::resources::Hip res_gpu1;
RAJA::resources::Hip res_gpu2;
using EXEC_POLICY = RAJA::hip_exec_async<GPU_BLOCK_SIZE>;
#elif defined(RAJA_ENABLE_SYCL)
RAJA::resources::Sycl res_gpu1;
RAJA::resources::Sycl res_gpu2;
using EXEC_POLICY = RAJA::sycl_exec<GPU_BLOCK_SIZE>;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simplify this by templating the resource on policy type. See:

auto res = RAJA::resources::get_default_resource<policy>();

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the purpose of this example, the hope was to show that this feature is able to work with non-default resources as well as work with multiple GPU resources. However, there are two examples showing the multiple non-default resources. If preferred, I can update the first example to only use the default stream (since most use cases will likely be using RAJA's default stream)?

@bechols97 bechols97 requested a review from a team March 31, 2026 15:59
};

template<typename Fn>
struct get_signature;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move these into a generic function_signature_helper.hpp header. See https://github.com/llnl/RAJA/pull/1949/changes#diff-72533564b1cbd49c320bfd7981489ca5e5a08143353955c70c3ef535e38fc4ccR56. Arturo recently added similar methods for deducing the index of template parameters. These types of metaprogramming utilities are broadly useful and should live in a more visible header

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to move both to a generic header, maybe in internal. I think it's good to consolidate stuff like this so we don't end up re-implementing the methods when we need it. For example we have type_trait helper headers like https://github.com/llnl/RAJA/blob/develop/include/RAJA/pattern/kernel/TypeTraits.hpp

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a preferred location for these types of metaprogramming utitlies? Would https://github.com/llnl/RAJA/tree/develop/include/RAJA/util be a good place for the more generic header?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so yes, see for example https://github.com/llnl/RAJA/blob/develop/include/RAJA/util/EnableIf.hpp is already there. I would try to move Arturo's helpers from github.com/llnl/RAJA/pull/1949/changes#diff-72533564b1cbd49c320bfd7981489ca5e5a08143353955c70c3ef535e38fc4ccR56 there as well, and name it FunctionSignatureUtil.hpp or something

~message_bus() { reset(); }

// Copy ctor/operator
message_bus(const message_bus&) = delete;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might also need message_bus (message_bus) = delete;

/// Currently, this forces a synchronize prior to calling
/// the callback function or testing if there are any messages.
///
class message_manager
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this case for the new classes @llnl/raja-core ? kernel is all the MessageManager naming convention for classes

template<typename Callable>
void subscribe(msg_id id, Callable&& c)
{
auto callback = RAJA::msg_callback {std::forward<Callable>(c)};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to use an explicit constructor here

{
auto& fn_list = m_callback_map.at(id);
auto it = std::find_if(fn_list.begin(), fn_list.end(), [](const auto& fn) {
return typeid(Callable).hash_code() == fn->hash();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is how we want to be hashing values. For one, I think it's possible to have hashing collisions here, to find_if could just match the first of several possible matches. I think it might be better to hash together the msg_id with std::type_index, and just make m_callback_map a std::unordered_map<HashCode, vector<msg_callback_t>>

size_type new_sz = old_sz + msg_sz;
local_sz = old_sz; // offset to start of message
// Checks if fits in queue
if (new_sz <= m_container->m_capacity)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we ever resize the m_container? this could be a race condition if so

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mpsc_queue and spsc_queue are view-like containers to the owning version of message_bus. All resizing type of operations on message_bus will end up forcing the resource in message_bus to synchronize prior to resizing along with ignoring any messages that are currently stored.

#include "RAJA/util/resource.hpp"

/*
* Vector Addition Example
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename it to "RAJA::messages example" and update the description

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants