From 3b8ab8e80a3f37b975c9e8ab4aee8dc708b35d9d Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 30 Aug 2023 16:36:16 -0400 Subject: [PATCH] GH-37364: [C++][GPU] Add CUDA impl of Device Event/Stream (#37365) ### What changes are included in this PR? Adding `CudaDevice::SyncEvent` and `CudaDevice::Stream` implementations which provide more idiomatic handling of Events and Streams. ### Are these changes tested? Simple SyncEvent test added. More stream tests still being added. * Closes: #37364 Authored-by: Matt Topol Signed-off-by: Matt Topol --- cpp/src/arrow/c/bridge_test.cc | 4 +- cpp/src/arrow/device.h | 46 +++++++++++-- cpp/src/arrow/gpu/cuda_context.cc | 106 ++++++++++++++++++++++++++++-- cpp/src/arrow/gpu/cuda_context.h | 97 +++++++++++++++++++++++++++ cpp/src/arrow/gpu/cuda_test.cc | 59 +++++++++++++++++ 5 files changed, 299 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index bd0e498a9f332..2aedccc965c45 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -1222,7 +1222,9 @@ class MyDevice : public Device { virtual ~MySyncEvent() = default; Status Wait() override { return Status::OK(); } - Status Record(const Device::Stream&) override { return Status::OK(); } + Status Record(const Device::Stream&, const unsigned int) override { + return Status::OK(); + } }; protected: diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index 55037ac418808..066ca7e32a4fe 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -109,7 +110,11 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// should be trivially constructible from it's device-specific counterparts. class ARROW_EXPORT Stream { public: - virtual const void* get_raw() const { return NULLPTR; } + using release_fn_t = std::function; + + virtual ~Stream() = default; + + virtual const void* get_raw() const { return stream_.get(); } /// \brief Make the stream wait on the provided event. /// @@ -117,15 +122,42 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// event is completed without blocking the CPU. virtual Status WaitEvent(const SyncEvent&) = 0; + /// \brief Blocks the current thread until a stream's remaining tasks are completed + virtual Status Synchronize() const = 0; + protected: - Stream() = default; - virtual ~Stream() = default; + explicit Stream(void* stream, release_fn_t release_stream) + : stream_{stream, release_stream} {} + + std::unique_ptr stream_; }; + virtual Result> MakeStream() { return NULLPTR; } + + /// \brief Create a new device stream + /// + /// This should create the appropriate stream type for the device, + /// derived from Device::Stream to allow for stream ordered events + /// and memory allocations. + virtual Result> MakeStream(unsigned int flags) { + return NULLPTR; + } + + /// @brief Wrap an existing device stream alongside a release function + /// + /// @param device_stream a pointer to the stream to wrap + /// @param release_fn a function to call during destruction, `nullptr` or + /// a no-op function can be passed to indicate ownership is maintained + /// externally + virtual Result> WrapStream(void* device_stream, + Stream::release_fn_t release_fn) { + return NULLPTR; + } + /// \brief EXPERIMENTAL: An object that provides event/stream sync primitives class ARROW_EXPORT SyncEvent { public: - using release_fn_t = void (*)(void*); + using release_fn_t = std::function; virtual ~SyncEvent() = default; @@ -134,9 +166,11 @@ class ARROW_EXPORT Device : public std::enable_shared_from_this, /// @brief Block until sync event is completed. virtual Status Wait() = 0; + inline Status Record(const Stream& st) { return Record(st, 0); } + /// @brief Record the wrapped event on the stream so it triggers /// the event when the stream gets to that point in its queue. - virtual Status Record(const Stream&) = 0; + virtual Status Record(const Stream&, const unsigned int flags) = 0; protected: /// If creating this with a passed in event, the caller must ensure @@ -225,7 +259,7 @@ class ARROW_EXPORT MemoryManager : public std::enable_shared_from_this> WrapDeviceSyncEvent( diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 3e1af26cac39b..fb8935319ed1c 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -26,11 +26,10 @@ #include #include -#include - #include "arrow/gpu/cuda_internal.h" #include "arrow/gpu/cuda_memory.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" namespace arrow { @@ -273,6 +272,35 @@ bool IsCudaDevice(const Device& device) { return device.type_name() == kCudaDeviceTypeName; } +Result> CudaDevice::MakeStream(unsigned int flags) { + ARROW_ASSIGN_OR_RAISE(auto context, GetContext()); + ContextSaver set_temporary(reinterpret_cast(context.get()->handle())); + + CUstream stream; + CU_RETURN_NOT_OK("cuStreamCreate", cuStreamCreate(&stream, flags)); + return std::shared_ptr( + new CudaDevice::Stream(context, new CUstream(stream), [](void* st) { + auto typed_stream = reinterpret_cast(st); + // DCHECK_OK still evaluates its argument in release mode + // but in debug mode it'll also throw if it fails + DCHECK_OK( + internal::StatusFromCuda(cuStreamDestroy(*typed_stream), "cuStreamDestroy")); + delete typed_stream; + })); +} + +Result> CudaDevice::WrapStream( + void* stream, Device::Stream::release_fn_t release_fn) { + if (!release_fn) { + release_fn = [](void*) {}; + } + + auto cu_stream = reinterpret_cast(stream); + ARROW_ASSIGN_OR_RAISE(auto context, GetContext()); + return std::shared_ptr( + new CudaDevice::Stream(context, cu_stream, release_fn)); +} + Result> AsCudaDevice(const std::shared_ptr& device) { if (IsCudaDevice(*device)) { return checked_pointer_cast(device); @@ -281,6 +309,48 @@ Result> AsCudaDevice(const std::shared_ptr& } } +Status CudaDevice::Stream::WaitEvent(const Device::SyncEvent& event) { + auto cuda_event = + checked_cast(&event); + if (!cuda_event) { + return Status::Invalid("CudaDevice::Stream cannot Wait on non-cuda event"); + } + + auto cu_event = cuda_event->value(); + if (!cu_event) { + return Status::Invalid("Cuda Stream cannot wait on null event"); + } + + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + CU_RETURN_NOT_OK("cuStreamWaitEvent", + cuStreamWaitEvent(value(), cu_event, CU_EVENT_WAIT_DEFAULT)); + return Status::OK(); +} + +Status CudaDevice::Stream::Synchronize() const { + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + CU_RETURN_NOT_OK("cuStreamSynchronize", cuStreamSynchronize(value())); + return Status::OK(); +} + +Status CudaDevice::SyncEvent::Wait() { + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + CU_RETURN_NOT_OK("cuEventSynchronize", cuEventSynchronize(value())); + return Status::OK(); +} + +Status CudaDevice::SyncEvent::Record(const Device::Stream& st, const unsigned int flags) { + auto cuda_stream = checked_cast(&st); + if (!cuda_stream) { + return Status::Invalid("CudaDevice::Event cannot record on non-cuda stream"); + } + + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + CU_RETURN_NOT_OK("cuEventRecordWithFlags", + cuEventRecordWithFlags(value(), cuda_stream->value(), flags)); + return Status::OK(); +} + // ---------------------------------------------------------------------- // CudaMemoryManager implementation @@ -293,11 +363,35 @@ std::shared_ptr CudaMemoryManager::cuda_device() const { return checked_pointer_cast(device_); } +Result> CudaMemoryManager::MakeDeviceSyncEvent() { + ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext()); + ContextSaver set_temporary(reinterpret_cast(context.get()->handle())); + + // TODO: event creation flags + CUevent ev; + CU_RETURN_NOT_OK("cuEventCreate", cuEventCreate(&ev, CU_EVENT_DEFAULT)); + + return std::shared_ptr( + new CudaDevice::SyncEvent(context, new CUevent(ev), [](void* ev) { + auto typed_event = reinterpret_cast(ev); + // DCHECK_OK still evaluates its argument in release mode + // but in debug mode it'll also throw if it fails + DCHECK_OK( + internal::StatusFromCuda(cuEventDestroy(*typed_event), "cuEventDestroy")); + delete typed_event; + })); +} + Result> CudaMemoryManager::WrapDeviceSyncEvent( void* sync_event, Device::SyncEvent::release_fn_t release_sync_event) { - return nullptr; - // auto ev = reinterpret_cast(sync_event); - // return std::make_shared(ev); + if (!release_sync_event) { + release_sync_event = [](void*) {}; + } + + auto ev = reinterpret_cast(sync_event); + ARROW_ASSIGN_OR_RAISE(auto context, cuda_device()->GetContext()); + return std::shared_ptr( + new CudaDevice::SyncEvent(context, ev, release_sync_event)); } Result> CudaMemoryManager::GetBufferReader( @@ -440,7 +534,7 @@ class CudaDeviceManager::Impl { Status AllocateHost(int device_number, int64_t nbytes, uint8_t** out) { RETURN_NOT_OK(CheckDeviceNum(device_number)); ARROW_ASSIGN_OR_RAISE(auto ctx, GetContext(device_number)); - ContextSaver set_temporary((CUcontext)(ctx.get()->handle())); + ContextSaver set_temporary(reinterpret_cast(ctx.get()->handle())); CU_RETURN_NOT_OK("cuMemHostAlloc", cuMemHostAlloc(reinterpret_cast(out), static_cast(nbytes), CU_MEMHOSTALLOC_PORTABLE)); diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 79a2ec9f97581..e4d8482855f6b 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -21,6 +21,8 @@ #include #include +#include + #include "arrow/device.h" #include "arrow/result.h" #include "arrow/util/visibility.h" @@ -140,6 +142,90 @@ class ARROW_EXPORT CudaDevice : public Device { /// \param[in] size The buffer size in bytes Result> AllocateHostBuffer(int64_t size); + /// \brief EXPERIMENTAL: Wrapper for CUstreams + /// + /// Does not *own* the CUstream object which must be separately constructed + /// and freed using cuStreamCreate and cuStreamDestroy (or equivalent). + /// Default construction will use the cuda default stream, and does not allow + /// construction from literal 0 or nullptr. + class ARROW_EXPORT Stream : public Device::Stream { + public: + ~Stream() = default; + + [[nodiscard]] inline CUstream value() const noexcept { + if (!stream_) { + return CUstream{}; + } + return *reinterpret_cast(stream_.get()); + } + operator CUstream() const noexcept { return value(); } + + const void* get_raw() const noexcept override { return stream_.get(); } + Status WaitEvent(const Device::SyncEvent&) override; + Status Synchronize() const override; + + protected: + friend class CudaDevice; + + explicit Stream(std::shared_ptr ctx, CUstream* st, + Device::Stream::release_fn_t release_fn) + : Device::Stream(reinterpret_cast(st), release_fn), + context_{std::move(ctx)} {} + + // disable construction from literal 0 + explicit Stream(std::shared_ptr, int, + Device::Stream::release_fn_t) = delete; // Prevent cast from 0 + explicit Stream(std::shared_ptr, std::nullptr_t, + Device::Stream::release_fn_t) = delete; // Prevent cast from nullptr + + private: + std::shared_ptr context_; + }; + + Result> MakeStream() override { return MakeStream(0); } + + /// \brief Create a CUstream wrapper in the current context + Result> MakeStream(unsigned int flags) override; + + /// @brief Wrap a pointer to an existing stream + /// + /// @param device_stream passed in stream (should be a CUstream*) + /// @param release_fn destructor to free the stream. `nullptr` may be passed + /// to indicate there is no destruction/freeing necessary. + Result> WrapStream( + void* device_stream, Stream::release_fn_t release_fn) override; + + class ARROW_EXPORT SyncEvent : public Device::SyncEvent { + public: + [[nodiscard]] CUevent value() const { + if (sync_event_) { + return *static_cast(sync_event_.get()); + } + return CUevent{}; + } + operator CUevent() const noexcept { return value(); } + + /// @brief Block until the sync event is marked completed + Status Wait() override; + + /// @brief Record the wrapped event on the stream + /// + /// Once the stream completes the tasks previously added to it, + /// it will trigger the event. + Status Record(const Device::Stream&, const unsigned int) override; + + protected: + friend class CudaMemoryManager; + + explicit SyncEvent(std::shared_ptr ctx, CUevent* ev, + Device::SyncEvent::release_fn_t release_ev) + : Device::SyncEvent(reinterpret_cast(ev), release_ev), + context_{std::move(ctx)} {} + + private: + std::shared_ptr context_; + }; + protected: struct Impl; @@ -179,6 +265,17 @@ class ARROW_EXPORT CudaMemoryManager : public MemoryManager { /// having to cast the `device()` result. std::shared_ptr cuda_device() const; + /// \brief Creates a wrapped CUevent. + /// + /// Will call cuEventCreate and it will call cuEventDestroy internally + /// when the event is destructed. + Result> MakeDeviceSyncEvent() override; + + /// \brief Wraps an existing event into a sync event. + /// + /// @param sync_event the event to wrap, must be a CUevent* + /// @param release_sync_event a function to call during destruction, `nullptr` or + /// a no-op function can be passed to indicate ownership is maintained externally Result> WrapDeviceSyncEvent( void* sync_event, Device::SyncEvent::release_fn_t release_sync_event) override; diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index 6d392213e231f..c39dbe28e808a 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -39,9 +39,11 @@ namespace arrow { using internal::checked_cast; +using internal::checked_pointer_cast; namespace cuda { +using internal::ContextSaver; using internal::StatusFromCuda; #define ASSERT_CUDA_OK(expr) ASSERT_OK(::arrow::cuda::internal::StatusFromCuda((expr))) @@ -213,6 +215,63 @@ TEST_F(TestCudaDevice, Copy) { } } +TEST_F(TestCudaDevice, CreateSyncEvent) { + ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent()); + ASSERT_TRUE(ev); + auto cuda_ev = checked_pointer_cast(ev); + ASSERT_CUDA_OK(cuEventQuery(*cuda_ev)); +} + +TEST_F(TestCudaDevice, WrapDeviceSyncEvent) { + // need a context to call cuEventCreate + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + + CUevent event; + ASSERT_CUDA_OK(cuEventCreate(&event, CU_EVENT_DEFAULT)); + ASSERT_CUDA_OK(cuEventQuery(event)); + + { + // wrap event with no-op destructor + ASSERT_OK_AND_ASSIGN(auto ev, mm_->WrapDeviceSyncEvent(&event, [](void*) {})); + ASSERT_TRUE(ev); + // verify it's the same event we passed in + ASSERT_EQ(ev->get_raw(), &event); + auto cuda_ev = checked_pointer_cast(ev); + ASSERT_CUDA_OK(cuEventQuery(*cuda_ev)); + } + + // verify that the event is still valid on the device when the shared_ptr + // goes away since we didn't give it ownership. + ASSERT_CUDA_OK(cuEventQuery(event)); + ASSERT_CUDA_OK(cuEventDestroy(event)); +} + +TEST_F(TestCudaDevice, DefaultStream) { + ASSERT_OK_AND_ASSIGN(auto stream, device_->MakeStream()); + ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent()); + + ASSERT_OK(ev->Record(*stream)); + ASSERT_OK(stream->WaitEvent(*ev)); + ASSERT_OK(ev->Wait()); + ASSERT_OK(stream->Synchronize()); +} + +TEST_F(TestCudaDevice, ExplicitStream) { + // need a context to call cuEventCreate + ContextSaver set_temporary(reinterpret_cast(context_.get()->handle())); + + CUstream cu_stream = CU_STREAM_PER_THREAD; + { + ASSERT_OK_AND_ASSIGN(auto stream, device_->WrapStream(&cu_stream, nullptr)); + ASSERT_OK_AND_ASSIGN(auto ev, mm_->MakeDeviceSyncEvent()); + + ASSERT_OK(ev->Record(*stream)); + ASSERT_OK(stream->WaitEvent(*ev)); + ASSERT_OK(ev->Wait()); + ASSERT_OK(stream->Synchronize()); + } +} + // ------------------------------------------------------------------------ // Test CudaContext