From 7a080cb7e8a7e60ae3d582c3f56ee5dce80cbc88 Mon Sep 17 00:00:00 2001 From: aydogdub Date: Fri, 21 Nov 2025 18:15:29 +0100 Subject: [PATCH 1/3] add GPU stream pipeline passes and necessary helpers --- .../targets/gpu_helpers/copy_strategies.py | 554 ++++++++++++++++++ dace/codegen/targets/gpu_helpers/gpu_utils.py | 27 + dace/transformation/helpers.py | 31 + .../connect_gpu_streams_to_kernels.py | 70 +++ .../connect_gpu_streams_to_tasklets.py | 80 +++ .../gpu_stream_scheduling.py | 249 ++++++++ .../gpu_stream_topology_simplification.py | 273 +++++++++ .../insert_gpu_copy_tasklet.py | 166 ++++++ .../insert_gpu_stream_sync_tasklets.py | 288 +++++++++ .../insert_gpu_streams_to_sdfgs.py | 154 +++++ 10 files changed, 1892 insertions(+) create mode 100644 dace/codegen/targets/gpu_helpers/copy_strategies.py create mode 100644 dace/codegen/targets/gpu_helpers/gpu_utils.py create mode 100644 dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py create mode 100644 dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py create mode 100644 dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py create mode 100644 dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py create mode 100644 dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py create mode 100644 dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py create mode 100644 dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py diff --git a/dace/codegen/targets/gpu_helpers/copy_strategies.py b/dace/codegen/targets/gpu_helpers/copy_strategies.py new file mode 100644 index 0000000000..1b11f5bb2b --- /dev/null +++ b/dace/codegen/targets/gpu_helpers/copy_strategies.py @@ -0,0 +1,554 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple, Union + +from dace import SDFG, SDFGState, data, dtypes, subsets +from dace import memlet as mm +from dace import symbolic +from dace.codegen import common +from dace.codegen.targets import cpp +from dace.codegen.targets.cpp import sym2cpp +from dace.codegen.targets.gpu_helpers.gpu_utils import generate_sync_debug_call +from dace.config import Config +from dace.dtypes import StorageType +from dace.frontend import operations +from dace.sdfg import nodes, scope_contains_scope +from dace.sdfg.graph import MultiConnectorEdge +from dace.transformation import helpers + + +class CopyContext: + """ + Encapsulates inputs required for copy operations and exposes helper + methods to derive additional information. This keeps copy strategies + lightweight by letting them focus only on the relevant logic. + """ + + def __init__(self, sdfg: SDFG, state: SDFGState, src_node: nodes.Node, dst_node: nodes.Node, + edge: MultiConnectorEdge[mm.Memlet], gpustream_assignments: Dict[nodes.Node, Union[int, str]]): + + # Store the basic context as attributes + self.sdfg = sdfg + self.state = state + self.src_node = src_node + self.dst_node = dst_node + self.edge = edge + self.gpustream_assignments = gpustream_assignments + + memlet = edge.data + + self.copy_shape = memlet.subset.size_exact() + if isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode): + copy_shape, src_strides, dst_strides, src_expr, dst_expr = self.get_accessnode_to_accessnode_copy_info() + else: + copy_shape = memlet.subset.size_exact() + src_strides = dst_strides = src_expr = dst_expr = None + + self.copy_shape = copy_shape + self.src_strides = src_strides + self.dst_strides = dst_strides + self.src_expr = src_expr + self.dst_expr = dst_expr + + def get_storage_type(self, node: nodes.Node): + """ + Return the storage type associated with a given SDFG node. + + Tasklets are assumed to use register storage, while AccessNodes + return the storage type from their data descriptor. Raises + NotImplementedError for unsupported node types. + """ + if isinstance(node, nodes.Tasklet): + storage_type = StorageType.Register + + elif isinstance(node, nodes.AccessNode): + storage_type = node.desc(self.sdfg).storage + + else: + raise NotImplementedError(f"Unsupported node type {type(node)} for storage type retrieval; " + "expected AccessNode or Tasklet. Please extend this method accordingly.") + + return storage_type + + def get_assigned_gpustream(self) -> str: + """ + Return the GPU stream expression assigned to both source and destination nodes. + + Ensures that both nodes have a matching stream ID, then constructs the + variable name from the configured prefix and stream ID. Raises ValueError + if assignments are missing or inconsistent. + + Example: + If the configured prefix is 'gpu_stream' and the assigned stream ID is 0, + this method returns 'gpu_stream0'. + """ + src_stream = self.gpustream_assignments.get(self.src_node) + dst_stream = self.gpustream_assignments.get(self.dst_node) + + # 1. Catch unsupported cases + if src_stream is None or dst_stream is None: + raise ValueError("GPU stream assignment missing for source or destination node.") + + if src_stream != dst_stream: + raise ValueError(f"Mismatch in assigned GPU streams: src_node has '{src_stream}', " + f"dst_node has '{dst_stream}'. They must be the same.") + + # 2. Generate GPU stream expression + gpustream = src_stream + gpustream_var_name_prefix = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',')[1] + gpustream_expr = f"{gpustream_var_name_prefix}{gpustream}" + + return gpustream_expr + + def get_memory_location(self) -> Tuple[str, str]: + """ + Determine whether the source and destination nodes reside in device or host memory. + + Uses the storage type of each node to classify it as either 'Device' + (GPU global memory) or 'Host' (all other storage types). + Used for GPU related copies outside the kernel (e.g. to construct + cudaMemcpyHostToDevice for example). + + Returns + ------- + Tuple[str, str] + (src_location, dst_location) where each is either 'Device' or 'Host'. + """ + src_storage = self.get_storage_type(self.src_node) + dst_storage = self.get_storage_type(self.dst_node) + src_location = 'Device' if src_storage == dtypes.StorageType.GPU_Global else 'Host' + dst_location = 'Device' if dst_storage == dtypes.StorageType.GPU_Global else 'Host' + + return src_location, dst_location + + def get_ctype(self) -> Any: + """ + Determine the C data type (ctype) of the source or destination node. + + The ctype is resolved from the data descriptor of the first node + (source or destination) that is an AccessNode (assumed to be the same + if both are AccessNodes). + + Returns + ------- + Any + The C type string (e.g., "float*", "int32") associated with the node. + + Raises + ------ + NotImplementedError + If neither the source nor the destination node is an AccessNode. + """ + sdfg = self.sdfg + src_node, dst_node = self.src_node, self.dst_node + + if isinstance(src_node, nodes.AccessNode): + return src_node.desc(sdfg).ctype + + if isinstance(dst_node, nodes.AccessNode): + return dst_node.desc(sdfg).ctype + + raise NotImplementedError( + f"Cannot determine ctype: neither src nor dst node is an AccessNode. " + f"Got src_node type: {type(src_node).__name__}, dst_node type: {type(dst_node).__name__}. " + "Please extend this case or fix the issue.") + + def get_accessnode_to_accessnode_copy_info(self): + """ + Compute copy shape, absolute strides, and pointer expressions for a copy + between two AccessNodes. Tries to mimic + cpp.memlet_copy_to_absolute_strides without requiring a dispatcher. + + Returns + ------- + (copy_shape, src_strides, dst_strides, src_expr, dst_expr) + + Raises + ------ + TypeError + If either endpoint is not an AccessNode. + NotImplementedError + If a descriptor is not Scalar or Array. + """ + + # ---------------------------- helpers ---------------------------- + def _collapse_strides(strides, subset): + """Remove size-1 dims; keep tile strides; default to [1] if none remain.""" + n = len(subset) + collapsed = [st for st, sz in zip(strides, subset.size()) if sz != 1] + collapsed.extend(strides[n:]) # include tiles + if len(collapsed) == 0: + return [1] + return collapsed + + def _ptr_name(desc, name): + if desc.transient and desc.lifetime in (dtypes.AllocationLifetime.Persistent, + dtypes.AllocationLifetime.External): + return f'__state->__{sdfg.cfg_id}_{name}' + return name + + def _expr_for(desc, name, subset): + ptr = _ptr_name(desc, name) + + if isinstance(desc, data.Scalar): + # GPU scalar special-case + if desc.storage in dtypes.GPU_MEMORY_STORAGES_EXPERIMENTAL_CUDACODEGEN: + parent = state.sdfg.parent_nsdfg_node + if parent is not None and name in parent.in_connectors: + return f"&{ptr}" + return ptr + # CPU (or other) scalars + return f"&{ptr}" + + if isinstance(desc, data.Array): + offset = cpp.cpp_offset_expr(desc, subset) + return f"{ptr} + {offset}" if offset != "0" else ptr + + raise NotImplementedError( + f"Expected {name} to be either data.Scalar or data.Array, but got {type(desc).__name__}.") + + # ---------------------------- Get copy info ---------------------------- + # Get needed information + src_node, dst_node = self.src_node, self.dst_node + sdfg, edge, state = self.sdfg, self.edge, self.state + memlet, copy_shape = self.edge.data, self.copy_shape + + # Guard - only applicable if src and dst are AccessNodes + if not (isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode)): + raise TypeError( + f"get_accessnode_to_accessnode_copy_info requires both source and destination " + f"to be AccessNode instances, but got {type(src_node).__name__} and {type(dst_node).__name__}.") + + # Get node descriptors + src_nodedesc = src_node.desc(sdfg) + dst_nodedesc = dst_node.desc(sdfg) + + # Resolve subsets (fallback to full range) + src_subset = memlet.get_src_subset(edge, state) + dst_subset = memlet.get_dst_subset(edge, state) + + if src_subset is None: + src_subset = subsets.Range.from_array(src_nodedesc) + + if dst_subset is None: + dst_subset = subsets.Range.from_array(dst_nodedesc) + + # Get strides + src_strides = src_subset.absolute_strides(src_nodedesc.strides) + dst_strides = dst_subset.absolute_strides(dst_nodedesc.strides) + + # Try to convert to a degenerate/strided ND copy first + result = cpp.ndcopy_to_strided_copy( + copy_shape, + src_nodedesc.shape, + src_strides, + dst_nodedesc.shape, + dst_strides, + memlet.subset, + src_subset, + dst_subset, + ) + + if result is not None: + copy_shape, src_strides, dst_strides = result + else: + src_strides = _collapse_strides(src_strides, src_subset) + dst_strides = _collapse_strides(dst_strides, dst_subset) + copy_shape = [s for s in copy_shape if s != 1] or [1] + + # Extend copy shape to the largest among the data dimensions, + # and extend other array with the appropriate strides + if len(dst_strides) != len(copy_shape) or len(src_strides) != len(copy_shape): + if memlet.data == src_node.data: + copy_shape, dst_strides = cpp.reshape_strides(src_subset, src_strides, dst_strides, copy_shape) + elif memlet.data == dst_node.data: + copy_shape, src_strides = cpp.reshape_strides(dst_subset, dst_strides, src_strides, copy_shape) + + # Build final expressions + src_expr = _expr_for(src_nodedesc, src_node.data, src_subset) + dst_expr = _expr_for(dst_nodedesc, dst_node.data, dst_subset) + + return copy_shape, src_strides, dst_strides, src_expr, dst_expr + + +class CopyStrategy(ABC): + """Abstract base class for memory copy strategies.""" + + @abstractmethod + def applicable(self, copy_context: CopyContext) -> bool: + """ + Return True if this strategy can handle the given memory copy. + """ + raise NotImplementedError('Abstract class') + + @abstractmethod + def generate_copy(self, copy_context: CopyContext) -> str: + """ + Generates and returns the copy code for the supported pattern. + """ + raise NotImplementedError('Abstract class') + + +class OutOfKernelCopyStrategy(CopyStrategy): + """ + Copy strategy for memory transfers that occur outside of kernel execution. + + This pattern often occurs when generating host-to-device copies for kernel inputs + (since kernels cannot access host memory directly), and device-to-host copies + to retrieve results for further processing. + """ + + def applicable(self, copy_context: CopyContext) -> bool: + """ + Determines whether the data movement is a host<->device memory copy. + + This function returns True if: + - We are not currently generating kernel code + - The copy occurs between two AccessNodes + - The data descriptors of source and destination are not views. + - The storage types of either src or dst is CPU_Pinned or GPU_Device + - We do not have a CPU-to-CPU copy + """ + # Retrieve needed information + state = copy_context.state + src_node, dst_node = copy_context.src_node, copy_context.dst_node + + # 1. Ensure copy is not occuring within a kernel + scope_dict = state.scope_dict() + deeper_node = dst_node if scope_contains_scope(scope_dict, src_node, dst_node) else src_node + + parent_map_tuple = helpers.get_parent_map(state, deeper_node) + while parent_map_tuple is not None: + parent_map, parent_state = parent_map_tuple + if parent_map.map.schedule in dtypes.GPU_SCHEDULES_EXPERIMENTAL_CUDACODEGEN: + return False + else: + parent_map_tuple = helpers.get_parent_map(parent_state, parent_map) + + # 2. Check whether copy is between two AccessNodes + if not (isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode)): + return False + + # 3. The data descriptors of source and destination are not views + if isinstance(src_node.desc(state), data.View) or isinstance(dst_node.desc(state), data.View): + return False + + # 4. Check that one StorageType of either src or dst is CPU_Pinned or GPU_Device + src_storage = copy_context.get_storage_type(src_node) + dst_storage = copy_context.get_storage_type(dst_node) + if not (src_storage in (StorageType.GPU_Global, StorageType.CPU_Pinned) + or dst_storage in (StorageType.GPU_Global, StorageType.CPU_Pinned)): + return False + + # 5. Check that this is not a CPU to CPU copy + cpu_storage_types = [StorageType.CPU_Heap, StorageType.CPU_ThreadLocal, StorageType.CPU_Pinned] + if src_storage in cpu_storage_types and dst_storage in cpu_storage_types: + return False + + return True + + def generate_copy(self, copy_context: CopyContext) -> str: + """Execute host-device copy with CUDA memory operations""" + + # Guard + memlet = copy_context.edge.data + if memlet.wcr is not None: + src_location, dst_location = copy_context.get_memory_location() + raise NotImplementedError(f'Accumulate {src_location} to {dst_location} not implemented') + + # Based on the copy dimension, call appropiate helper function + num_dims = len(copy_context.copy_shape) + if num_dims == 1: + copy_call = self._generate_1d_copy(copy_context) + + elif num_dims == 2: + copy_call = self._generate_2d_copy(copy_context) + + else: + # sanity check + assert num_dims > 2, f"Expected copy shape with more than 2 dimensions, but got {num_dims}." + copy_call = self._generate_nd_copy(copy_context) + + return copy_call + + def _generate_1d_copy(self, copy_context: CopyContext) -> str: + """ + Generates a 1D memory copy between host and device using the GPU backend. + + Uses {backend}MemcpyAsync for contiguous memory. For strided memory, + {backend}Memcpy2DAsync is leveraged to efficiently handle the stride along one dimension. + """ + # ----------- Retrieve relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + + src_location, dst_location = copy_context.get_memory_location() + is_contiguous_copy = (src_strides[-1] == 1) and (dst_strides[-1] == 1) + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + + # ----------------- Generate backend call -------------------- + + if is_contiguous_copy: + # Memory is linear: can use {backend}MemcpyAsync + copysize = ' * '.join(sym2cpp(copy_shape)) + copysize += f' * sizeof({ctype})' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + call = f'DACE_GPU_CHECK({backend}MemcpyAsync({dst_expr}, {src_expr}, {copysize}, {kind}, {gpustream}));\n' + + else: + # Memory is strided: use {backend}Memcpy2DAsync with dpitch/spitch + # This allows copying a strided 1D region + dpitch = f'{sym2cpp(dst_strides[0])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[0])} * sizeof({ctype})' + width = f'sizeof({ctype})' + height = sym2cpp(copy_shape[0]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync({dst_expr}, {dpitch}, {src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Potentially snychronization required if syncdebug is set to true in configurations + call = call + generate_sync_debug_call() + return call + + def _generate_2d_copy(self, copy_context: CopyContext) -> None: + """ + Generates a 2D memory copy using {backend}Memcpy2DAsync. + + Three main cases are handled: + - Copy between row-major stored arrays with contiguous rows. + - Copy between column-major stored arrays with contiguous columns. + - A special case where a 2D copy can still be represented. + + Raises: + NotImplementedError: Raised if the source and destination strides do not match any of the handled patterns. + Such cases indicate an unsupported 2D copy and should be examined separately. + They can be implemented if valid, or a more descriptive error should be raised if the path should not occur. + + Note: + {backend}Memcpy2DAsync supports strided copies along only one dimension (row or column), + but not both simultaneously. + """ + + # ----------- Extract relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + src_location, dst_location = copy_context.get_memory_location() + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + + # ----------------- Generate backend call if supported -------------------- + + # Case: Row-major layout, rows are not strided. + if (src_strides[1] == 1) and (dst_strides[1] == 1): + dpitch = f'{sym2cpp(dst_strides[0])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[0])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[1])} * sizeof({ctype})' + height = f'{sym2cpp(copy_shape[0])}' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync({dst_expr}, {dpitch}, {src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Case: Column-major layout, no columns are strided. + elif (src_strides[0] == 1) and (dst_strides[0] == 1): + dpitch = f'{sym2cpp(dst_strides[1])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[1])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[0])} * sizeof({ctype})' + height = f'{sym2cpp(copy_shape[1])}' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync({dst_expr}, {dpitch}, {src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Special case + elif (src_strides[0] / src_strides[1] == copy_shape[1] and dst_strides[0] / dst_strides[1] == copy_shape[1]): + # Consider as an example this copy: A[0:I, 0:J, K] -> B[0:I, 0:J] with + # copy shape [I, J], src_strides[J*K, K], dst_strides[J, 1]. This can be represented with a + # {backend}Memcpy2DAsync call! + + dpitch = f'{sym2cpp(dst_strides[1])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[1])} * sizeof({ctype})' + width = f'sizeof({ctype})' + height = sym2cpp(copy_shape[0] * copy_shape[1]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync({dst_expr}, {dpitch}, {src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + else: + raise NotImplementedError( + f"Unsupported 2D memory copy: shape={copy_shape}, src_strides={src_strides}, dst_strides={dst_strides}." + "Please implement this case if it is valid, or raise a more descriptive error if this path should not be taken." + ) + + # Potentially snychronization required if syncdebug is set to true in configurations + call = call + generate_sync_debug_call() + return call + + def _generate_nd_copy(self, copy_context: CopyContext) -> None: + """ + Generates GPU code for copying N-dimensional arrays using 2D memory copies. + + Uses {backend}Memcpy2DAsync for the last two dimensions, with nested loops + for any outer dimensions. Expects the copy to be contiguous and between + row-major storage locations. + """ + # ----------- Extract relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + + src_location, dst_location = copy_context.get_memory_location() + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + num_dims = len(copy_shape) + + # ----------- Guard for unsupported Pattern -------------- + if not (src_strides[-1] == 1) and (dst_strides[-1] == 1): + src_node, dst_node = copy_context.src_node, copy_context.dst_node + src_storage = copy_context.get_storage_type(src_node) + dst_storage = copy_context.get_storage_type(dst_node) + raise NotImplementedError( + "N-dimensional GPU memory copies, that are strided or contain column-major arrays, are currently not supported.\n" + f" Source node: {src_node} (storage: {src_storage})\n" + f" Destination node: {copy_context.dst_node} (storage: {dst_storage})\n" + f" Source strides: {src_strides}\n" + f" Destination strides: {dst_strides}\n" + f" copy shape: {copy_shape}\n") + + # ----------------- Generate and write backend call(s) -------------------- + + call = "" + # Write for-loop headers + for dim in range(num_dims - 2): + call += f"for (int __copyidx{dim} = 0; __copyidx{dim} < {copy_shape[dim]}; ++__copyidx{dim}) {{\n" + + # Write Memcopy2DAsync + offset_src = ' + '.join(f'(__copyidx{d} * ({sym2cpp(s)}))' for d, s in enumerate(src_strides[:-2])) + offset_dst = ' + '.join(f'(__copyidx{d} * ({sym2cpp(s)}))' for d, s in enumerate(dst_strides[:-2])) + + src = f'{src_expr} + {offset_src}' + dst = f'{dst_expr} + {offset_dst}' + + dpitch = f'{sym2cpp(dst_strides[-2])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[-2])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[-1])} * sizeof({ctype})' + height = sym2cpp(copy_shape[-2]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + # Generate call and write it + call += f'DACE_GPU_CHECK({backend}Memcpy2DAsync({dst}, {dpitch}, {src}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Potentially snychronization required if syncdebug is set to true in configurations + call += generate_sync_debug_call() + + # Write for-loop footers + for dim in range(num_dims - 2): + call += "\n}" + + # Return the code + return call + diff --git a/dace/codegen/targets/gpu_helpers/gpu_utils.py b/dace/codegen/targets/gpu_helpers/gpu_utils.py new file mode 100644 index 0000000000..e4c4c1fc38 --- /dev/null +++ b/dace/codegen/targets/gpu_helpers/gpu_utils.py @@ -0,0 +1,27 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from dace import Config +from dace.codegen import common + + +def generate_sync_debug_call() -> str: + """ + Generate backend sync and error-check calls as a string if + synchronous debugging is enabled. + + Parameters + ---------- + backend : str + Backend API prefix (e.g., 'cuda'). + + Returns + ------- + str + The generated debug call code, or an empty string if debugging is disabled. + """ + backend: str = common.get_gpu_backend() + sync_call: str = "" + if Config.get_bool('compiler', 'cuda', 'syncdebug'): + sync_call = (f"DACE_GPU_CHECK({backend}GetLastError());\n" + f"DACE_GPU_CHECK({backend}DeviceSynchronize());\n") + + return sync_call diff --git a/dace/transformation/helpers.py b/dace/transformation/helpers.py index 13842fd162..f5607f952b 100644 --- a/dace/transformation/helpers.py +++ b/dace/transformation/helpers.py @@ -1550,6 +1550,37 @@ def get_parent_map(state: SDFGState, node: Optional[nodes.Node] = None) -> Optio cursdfg = cursdfg.parent_sdfg return None +def is_within_schedule_types(state: SDFGState, node: nodes.Node, schedules: Set[dtypes.ScheduleType]) -> bool: + """ + Checks if the given node is enclosed within a Map whose schedule type + matches any in the `schedules` set. + + Parameters + ---------- + state : SDFGState + The State where the node resides + node : nodes.Node + The node to check. + schedules : set[dtypes.ScheduleType] + A set of schedule types to match (e.g., {dtypes.ScheduleType.GPU_Device}). + + Returns + ---------- + bool + True if the node is enclosed by a Map with a schedule type in `schedules`, False otherwise. + """ + current = node + + while current is not None: + if isinstance(current, nodes.MapEntry): + if current.map.schedule in schedules: + return True + + parent = get_parent_map(state, current) + if parent is None: + return False + current, state = parent + def redirect_edge(state: SDFGState, edge: graph.MultiConnectorEdge[Memlet], diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py new file mode 100644 index 0000000000..851f18e108 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py @@ -0,0 +1,70 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import dtypes, properties, SDFG +from dace.codegen import common +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs + + +@properties.make_properties +@transformation.explicit_cf_compatible +class ConnectGPUStreamsToKernels(ppl.Pass): + """ + This Pass attaches GPU streams to kernels (i.e., dtypes.ScheduleType.GPU_Device scheduled maps). + + Adds GPU stream AccessNodes and connects them to kernel entry and exit nodes, + indicating which GPU stream each kernel is assigned to. These assignments are e.g. + used when launching the kernels. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + # Retrieve the GPU stream array name and the prefix for individual stream variables + stream_array_name, stream_var_name_prefix = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',') + + # Retrieve GPU stream assignments for nodes + stream_assignments: Dict[nodes.Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + + # Link kernels to their assigned GPU streams + for sub_sdfg in sdfg.all_sdfgs_recursive(): + + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Not a kernel entry - continue + if not (isinstance(node, nodes.MapEntry) and node.map.schedule == dtypes.ScheduleType.GPU_Device): + continue + + # Stream connector name and the used GPU Stream for the kernel + assigned_gpustream = stream_assignments[node] + gpu_stream_var_name = f"{stream_var_name_prefix}{assigned_gpustream}" + accessed_gpu_stream = f"{stream_array_name}[{assigned_gpustream}]" + + # Assign the GPU stream to the kernel entry + kernel_entry = node + kernel_entry.add_in_connector(gpu_stream_var_name, dtypes.gpuStream_t) + stream_array_in = state.add_access(stream_array_name) + state.add_edge(stream_array_in, None, kernel_entry, gpu_stream_var_name, + dace.Memlet(accessed_gpu_stream)) + + # Assign the GPU stream to the kernel exit + kernel_exit = state.exit_node(kernel_entry) + kernel_exit.add_out_connector(gpu_stream_var_name, dtypes.gpuStream_t) + stream_array_out = state.add_access(stream_array_name) + state.add_edge(kernel_exit, gpu_stream_var_name, stream_array_out, None, + dace.Memlet(accessed_gpu_stream)) + + return {} \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py new file mode 100644 index 0000000000..9877f2d563 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py @@ -0,0 +1,80 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import dtypes, properties, SDFG +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels + +# Placeholder for the GPU stream variable used in tasklet code +STREAM_PLACEHOLDER = "__dace_current_stream" + + +@properties.make_properties +@transformation.explicit_cf_compatible +class ConnectGPUStreamsToTasklets(ppl.Pass): + """ + This pass ensures that tasklets which require access to their assigned GPU stream + are provided with it explicitly. + + Such tasklets typically originate from expanded LibraryNodes targeting GPUs. + These nodes may reference the special placeholder variable `__dace_current_stream`, + which is expected to be defined during unparsing in `cpp.py`. + + To avoid relying on this "hidden" mechanism, the pass rewrites tasklets to use + the GPU stream AccessNode directly. + + Note that this pass is similar to `ConnectGPUStreamsToKernels`. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + # Retrieve the GPU stream's array name + stream_array_name = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',')[0] + + # Retrieve GPU stream assignments for nodes + stream_assignments: Dict[nodes.Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + + # Find all tasklets which use the GPU stream variable (STREAM_PLACEHOLDER) in the code + # and provide them the needed GPU stream explicitly + for sub_sdfg in sdfg.all_sdfgs_recursive(): + + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Not a tasklet - continue + if not isinstance(node, nodes.Tasklet): + continue + + # Tasklet does not need use its assigned GPU stream - continue + if not STREAM_PLACEHOLDER in node.code.as_string: + continue + + # Stream connector name and the used GPU Stream for the kernel + assigned_gpustream = stream_assignments[node] + gpu_stream_conn = STREAM_PLACEHOLDER + accessed_gpu_stream = f"{stream_array_name}[{assigned_gpustream}]" + + # Provide the GPU stream explicitly to the tasklet + stream_array_in = state.add_access(stream_array_name) + stream_array_out = state.add_access(stream_array_name) + + node.add_in_connector(gpu_stream_conn, dtypes.gpuStream_t) + node.add_out_connector(gpu_stream_conn, dtypes.gpuStream_t, force=True) + + state.add_edge(stream_array_in, None, node, gpu_stream_conn, dace.Memlet(accessed_gpu_stream)) + state.add_edge(node, gpu_stream_conn, stream_array_out, None, dace.Memlet(accessed_gpu_stream)) + + return {} \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py new file mode 100644 index 0000000000..0151d790b8 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py @@ -0,0 +1,249 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Dict, List, Set, Type, Union + +import dace +from dace import SDFG, SDFGState, properties +from dace.config import Config +from dace.sdfg import nodes +from dace.sdfg.graph import Graph, NodeT +from dace.transformation import pass_pipeline as ppl, transformation + +# Placeholder for the GPU stream variable used in tasklet code +STREAM_PLACEHOLDER = "__dace_current_stream" + + +@properties.make_properties +@transformation.explicit_cf_compatible +class NaiveGPUStreamScheduler(ppl.Pass): + """ + Assigns GPU streams to nodes and stores the assignments in a dictionary. + This can be useful for enabling asynchronous and parallel GPU computation using GPU streams. + + Strategy Overview: + ------------------ + - GPU stream assignment is based on weakly connected components (WCCs) within each state. + - Nodes in the same WCC are assigned to the same stream. + - For top-level states (not within nested SDFGs), each new WCC starts on a new stream (starting from 0). + - In nested SDFGs: + * Stream assignment is inherited from the parent component, + * All internal components share the parent's stream. + - GPU stream IDs wrap around according to the `max_concurrent_streams` configuration. + + Example: + -------- + A state with the following independent chains: + K1 → K2 + K3 → K4 → K5 + K6 + + would be scheduled as: + K1, K2 → stream 0 + K3, K4, K5 → stream 1 + K6 → stream 2 + + (assuming no limit on the number of concurrent streams) + + Note: + ----- + These refer to **backend GPU streams** (e.g., CUDA or HIP), not DaCe symbolic streams. + """ + + def __init__(self): + # Maximum number of concurrent streams allowed (from config). + # Cached locally for frequent reuse. + self._max_concurrent_streams = int(Config.get('compiler', 'cuda', 'max_concurrent_streams')) + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Nothing + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, _) -> Dict[nodes.Node, int]: + """ + Assigns GPU streams to nodes within the given SDFG. + + Parameters + ---------- + sdfg : SDFG + The top-level SDFG to process. + pipeline_results : Dict + Unused. + + Returns + ------- + Dict[nodes.Node, int] + A dictionary mapping each node to its assigned GPU stream. + """ + stream_assignments: Dict[nodes.Node, int] = dict() + for state in sdfg.states(): + self._assign_gpu_streams_in_state(sdfg, False, state, stream_assignments, 0) + + return stream_assignments + + def _assign_gpu_streams_in_state(self, sdfg: SDFG, in_nested_sdfg: bool, state: SDFGState, + stream_assignments: Dict[nodes.Node, int], gpu_stream: int) -> None: + """ + Assigns GPU streams to nodes in a single state. + + If inside a nested SDFG, components inherit the parent's stream. + Otherwise, each connected component gets a different stream. + Nested SDFGs are processed recursively. + + Parameters + ---------- + sdfg : SDFG + The SDFG containing the state. + in_nested_sdfg : bool + True if the state is in a nested SDFG. + state : SDFGState + The state to process. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to assigned GPU streams (updated in-place). + gpu_stream : int + The current GPU stream ID. + + Returns + ------- + None + """ + components = self._get_weakly_connected_nodes(state) + + for component in components: + + if not self._requires_gpu_stream(state, component): + continue + + nodes_assigned_before = len(stream_assignments) + + for node in component: + stream_assignments[node] = gpu_stream + if isinstance(node, nodes.NestedSDFG): + for nested_state in node.sdfg.states(): + self._assign_gpu_streams_in_state(node.sdfg, True, nested_state, stream_assignments, gpu_stream) + + # Move to the next stream if we have assigned streams to any node in this component + # (careful: if nested, states are in same component) + if not in_nested_sdfg and len(stream_assignments) > nodes_assigned_before: + gpu_stream = self._next_stream(gpu_stream) + + def _get_weakly_connected_nodes(self, graph: Graph) -> List[Set[NodeT]]: + """ + Returns all weakly connected components in the given directed graph. + + A weakly connected component is a maximal group of nodes such that each pair + of nodes is connected by a path when ignoring edge directions. + + Parameters + ---------- + graph: Graph + A directed graph instance. + + Returns + ------- + List[Set[Node_T]] + + A list containing sets of nodes, with each set corresponding to a weakly + connected component. + """ + visited: Set[NodeT] = set() + components: List[Set[NodeT]] = [] + + for node in graph.nodes(): + if node in visited: + continue + + # Start a new weakly connected component + component: Set[NodeT] = set() + stack = [node] + + while stack: + current = stack.pop() + if current in visited: + continue + + visited.add(current) + component.add(current) + + for neighbor in graph.neighbors(current): + if neighbor not in visited: + stack.append(neighbor) + + components.append(component) + + return components + + def _next_stream(self, gpu_stream: int) -> int: + """ + Compute the next CUDA stream index according to the concurrency configuration. + + Behavior depends on the configured max_concurrent_streams value: + - If 0: unlimited streams allowed, so increment the stream index by one. + - If -1: default setting, always return stream 0 (no concurrency). + - Otherwise: cycle through stream indices from 0 up to max_concurrent_streams - 1. + + Parameters + ---------- + gpu_stream : int + The current CUDA stream index. + + Returns + ------- + int + The next CUDA stream index based on the concurrency policy. + """ + if self._max_concurrent_streams == 0: + return gpu_stream + 1 + elif self._max_concurrent_streams == -1: + return 0 + else: + return (gpu_stream + 1) % self._max_concurrent_streams + + def _requires_gpu_stream(self, state: SDFGState, component: Set[NodeT]) -> bool: + """ + Check whether a connected component in an SDFG state should be assigned + a GPU stream. + + A component requires a GPU stream if it contains at least one of: + - An AccessNode with GPU global memory storage, + - A MapEntry scheduled on a GPU device, + - A Tasklet whose code includes the stream placeholder. + + Parameters + ---------- + state : SDFGState + The state containing the component. + component : Set[NodeT] + The set of nodes that form the connected component. + + Returns + ------- + bool + True if the component requires a GPU stream, False otherwise. + """ + + def gpu_relevant(node, parent) -> bool: + if (isinstance(node, nodes.AccessNode) and node.desc(parent).storage == dace.dtypes.StorageType.GPU_Global): + return True + + elif (isinstance(node, nodes.MapEntry) and node.map.schedule == dace.dtypes.ScheduleType.GPU_Device): + return True + + elif (isinstance(node, nodes.Tasklet) and STREAM_PLACEHOLDER in node.code.as_string): + return True + + return False + + for node in component: + if isinstance(node, nodes.NestedSDFG): + if any(gpu_relevant(node, parent) for node, parent in node.sdfg.all_nodes_recursive()): + return True + + else: + if gpu_relevant(node, state): + return True + + return False \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py new file mode 100644 index 0000000000..7af22aa6c6 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py @@ -0,0 +1,273 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import SDFG, SDFGState, dtypes, properties +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_copy_tasklet import InsertGPUCopyTasklets + +@properties.make_properties +@transformation.explicit_cf_compatible +class GPUStreamTopologySimplification(ppl.Pass): + """ + Simplifies an SDFG after GPU stream nodes have been added. + + This pass is optional; the SDFG works without it, but it cleans up + the topology by merging adjacent or redundant GPU stream AccessNodes. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + depending_passes = { + NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets, + InsertGPUStreamSyncTasklets, InsertGPUCopyTasklets + } + + return depending_passes + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Simplify the SDFG topology by merging adjacent GPU stream nodes. + """ + self._merge_close_gpustream_nodes(sdfg) + + self._merge_gpustreams_special_case(sdfg) + return {} + + def _merge_close_gpustream_nodes(self, sdfg: SDFG) -> None: + """ + Merge "close" GPU stream AccessNodes in the SDFG. + + This function looks for a predecessor GPU stream AccessNode that can be merged + with any successor GPU stream AccessNodes of its grand-predecessors. + + Example: + + Consider two GPU copy tasklets connected via distinct GPU stream AccessNodes: + the corresponding subgraph looks like this: + + -> Sink GPU Source GPU -> + ¦ ¦ + Tasklet ------> Data AccessNode -----> Tasklet + + This function would merge the sink and source node to simplify the SDFG. + """ + for sub_sdfg in sdfg.all_sdfgs_recursive(): + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Skip AccessNodes + if isinstance(node, nodes.AccessNode): + continue + + # Find GPU stream AccessNode predecessors with no incoming edges + # (i.e. source GPU stream AccessNodes) + node_predecessors = state.predecessors(node) + preceeding_gpustream_sources = [ + pre for pre in node_predecessors if isinstance(pre, nodes.AccessNode) + and pre.desc(state).dtype == dtypes.gpuStream_t and state.in_degree(pre) == 0 + ] + + # Skip if there are no preceding GPU stream sources + if len(preceeding_gpustream_sources) == 0: + continue + + # If multiple GPU stream sources exist, merge them; otherwise, use the single source + if len(preceeding_gpustream_sources) > 1: + combined_stream_node = preceeding_gpustream_sources.pop() + for preceeding_gpu_stream in preceeding_gpustream_sources: + # Note: there are no ingoing edges + for out_edge in state.out_edges(preceeding_gpu_stream): + _, src_conn, dst, dst_conn, data = out_edge + state.add_edge(combined_stream_node, src_conn, dst, dst_conn, data) + state.remove_edge(out_edge) + state.remove_node(preceeding_gpu_stream) + + else: + combined_stream_node = preceeding_gpustream_sources.pop() + + # Merge grand-predecessors' successors sink GPU streams with predecessor source GPU stream + node_grand_predecessors = [ + grand_pred for pred in node_predecessors for grand_pred in state.predecessors(pred) + ] + node_gp_successors_streams = [ + succ_of_gp for gp in node_grand_predecessors for succ_of_gp in state.successors(gp) + if isinstance(succ_of_gp, nodes.AccessNode) + and succ_of_gp.desc(state).dtype == dtypes.gpuStream_t and state.out_degree(succ_of_gp) == 0 + ] + + # remove duplicates + node_gp_successors_streams = list(set(node_gp_successors_streams)) + + for gp_succ_stream in node_gp_successors_streams: + for edge in state.in_edges(gp_succ_stream): + src, src_conn, _, dst_conn, data = edge + state.add_edge(src, src_conn, combined_stream_node, dst_conn, data) + state.remove_edge(edge) + # Note: the grand-predecessor's successor GPU stream is a sink node and has no + # outgoing edges + state.remove_node(gp_succ_stream) + + def _merge_gpustreams_special_case(self, sdfg: SDFG) -> None: + """ + Special-case simplification of GPU stream AccessNodes. + + This pass detects the following pattern: + - A GPU stream AccessNode `X` has a predecessor and a successor (i.e. at least one of both). + - Between the predecessor and successor lie one or more tasklets. + - These tasklets use their own distinct GPU stream AccessNodes (not `X`), + which are connected only to the tasklet itself. + + To simplify the topology, redundant streams are merged: + - A single unified input GPU stream connects to the predecessor and replaces (merges) + the per-tasklet input streams. + - A single unified output GPU stream connects to the successor and replaces (merges) + the per-tasklet output streams. + + + The simplification is easier to understand visually than in words. + Inspect the intermediate SDFGs produced by the minimal example below + to see the effect of the stream merging. + + Example + ------- + @dace.program + def example(A: dace.uint32[128], B: dace.uint32[128], + C: dace.uint32[128], D: dace.uint32[128]): + for i in dace.map[0:128:1]: + B[i] = A[i] + for i in dace.map[0:128:1]: + D[i] = C[i] + + sdfg = example.to_sdfg() + sdfg.apply_gpu_transformations() + """ + # Get the name of the GPU stream arry + gpustream_array_name = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',')[0] + + #------------------------- Preprocess: Gather Information ---------------------------- + + # For each GPU Stream AccessNode having a predecessor and a successor: + # Determine with which Tasklet Source and which Tasklet sink nodes lie between its predecessor + # and its successor + merge_source_gpustream: Dict[Tuple[nodes.AccessNode, SDFGState], List[nodes.AccessNode]] = dict() + merge_sink_gpustream: Dict[Tuple[nodes.AccessNode, SDFGState], List[nodes.AccessNode]] = dict() + + for node, state in sdfg.all_nodes_recursive(): + + # Skip non-tasklets + if not isinstance(node, nodes.Tasklet): + continue + + # The tasklets of interest should have exactly one preceeding source GPU node and one following sink GPU node + # If not, we skip + node_predecessors = state.predecessors(node) + node_successors = state.successors(node) + downstream_gpustream_sinks = [ + succ for succ in node_successors if isinstance(succ, nodes.AccessNode) + and succ.desc(state).dtype == dtypes.gpuStream_t and state.out_degree(succ) == 0 + ] + upstream_gpustream_sources = [ + pre for pre in node_predecessors if isinstance(pre, nodes.AccessNode) + and pre.desc(state).dtype == dtypes.gpuStream_t and state.in_degree(pre) == 0 + ] + + # Skip not considered case + if not (len(upstream_gpustream_sources) == len(downstream_gpustream_sinks) + and len(upstream_gpustream_sources) == 1): + continue + + # Look for potential predecessor of a "passthrough" GPU Stream AccessNode + # which would also be the grand-predeccessor of the current node (=tasklet) + candidate_predecessor = [] + for pred in node_predecessors: + for grand_pred in state.predecessors(pred): + + # Current nodes grand pred is a candidate of a predecessor of a "passthrough" GPU Stream AccessNode + candidate = grand_pred + + # A PassThrough GPU stream node can only have MapExits and Tasklets as candidate predecessors + if not (isinstance(candidate, nodes.MapExit) and candidate.map.schedule + == dtypes.ScheduleType.GPU_Device or isinstance(candidate, nodes.Tasklet)): + continue + + has_passthrough_gpustream = any( + (isinstance(succ, nodes.AccessNode) and succ.desc(state).dtype == dtypes.gpuStream_t) and ( + state.in_degree(succ) > 0 and state.out_degree(succ) > 0) + for succ in state.successors(candidate)) + + if has_passthrough_gpustream: + candidate_predecessor.append(candidate) + + # Not "close" passthrough GPU node exists if no candidate predecessor exists + if len(candidate_predecessor) == 0: + continue + + # Niche case, more than one "close" passthrough GPU node exists: Out of scope + # Ignore this case (note: This Pass only makes the Graph visually nicer, so skipping has + # no effect on correctness) + if len(candidate_predecessor) > 1: + continue + + # Get the Kernel Exits GPU stream + candidate_predecessor = candidate_predecessor[0] + passthrough_gpu_node = [ + succ for succ in state.successors(candidate_predecessor) + if isinstance(succ, nodes.AccessNode) and succ.desc(state).dtype == dtypes.gpuStream_t + ][0] + + # Collect and store the GPU stream merging information + pre_gpustream: nodes.AccessNode = upstream_gpustream_sources[0] # Note: Len is 1 + succ_gpustream: nodes.AccessNode = downstream_gpustream_sinks[0] # Note: Len is 1 + if (passthrough_gpu_node, state) in merge_source_gpustream: + merge_source_gpustream[(passthrough_gpu_node, state)].append(pre_gpustream) + merge_sink_gpustream[(passthrough_gpu_node, state)].append(succ_gpustream) + else: + merge_source_gpustream[(passthrough_gpu_node, state)] = [pre_gpustream] + merge_sink_gpustream[(passthrough_gpu_node, state)] = [succ_gpustream] + + #------------------------- Merge the GPU Stream AccessNodes ---------------------------- + for passthrough_gpu_node, state in merge_sink_gpustream.keys(): + + # Add new AccessNodes which merge the other loose streams + unified_in_stream = state.add_access(gpustream_array_name) + unified_out_stream = state.add_access(gpustream_array_name) + + for in_edge in state.in_edges(passthrough_gpu_node): + src, src_conn, _, dst_conn, memlet = in_edge + state.add_edge(src, src_conn, unified_in_stream, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(in_edge) + + for out_edge in state.out_edges(passthrough_gpu_node): + _, src_conn, dst, dst_conn, memlet = out_edge + state.add_edge(unified_out_stream, src_conn, dst, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(out_edge) + + for source_stream in merge_source_gpustream[passthrough_gpu_node, state]: + for out_edge in state.out_edges(source_stream): + _, src_conn, dst, dst_conn, memlet = out_edge + state.add_edge(unified_in_stream, src_conn, dst, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(out_edge) + state.remove_node(source_stream) + + for sink_stream in merge_sink_gpustream[passthrough_gpu_node, state]: + for in_edge in state.in_edges(sink_stream): + src, src_conn, _, dst_conn, memlet = in_edge + state.add_edge(src, src_conn, unified_out_stream, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(in_edge) + state.remove_node(sink_stream) + + state.remove_node(passthrough_gpu_node) \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py b/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py new file mode 100644 index 0000000000..cea8fc1f43 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py @@ -0,0 +1,166 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import SDFG, SDFGState, dtypes, properties +from dace import memlet as mm +from dace.codegen.targets.gpu_helpers.copy_strategies import CopyContext, OutOfKernelCopyStrategy +from dace.config import Config +from dace.sdfg import nodes, scope_contains_scope +from dace.sdfg.graph import MultiConnectorEdge +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertGPUCopyTasklets(ppl.Pass): + """ + This pass inserts explicit copy tasklets for data transfers that need to be handled + by the GPU and occur outside a kernel (for example, copying data from host memory + to the GPU before executing a kernel). + + It identifies such copy locations and inserts the corresponding tasklets. For each + memlet path describing a copy, the first edge is duplicated: one edge goes from the original + source to the tasklet, and the other from the tasklet to the original destination, while + the original edge is removed. + + This is experimental and could later serve as inspiration for making all copies explicit. + Considerations for future work include allowing tasklets to access array addresses + from connectors and describing in memlets how data will be moved, since currently + tasklets only support value inputs. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + depending_passes = { + NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets, + InsertGPUStreamSyncTasklets + } + return depending_passes + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Tasklets | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]) -> Dict: + """ + Inserts out-of-kernel GPU copy tasklets into the SDFG based on GPU stream scheduling. + Out-of-kernel copies are copies which are handled by the GPU and occur out of a kernel + function. + + Parameters + ---------- + sdfg : SDFG + The SDFG to transform by adding out-of-kernel GPU copy tasklets. + pipeline_results : Dict[str, Any] + Results from previous transformation passes, including GPU stream assignments. + + Returns + ------- + dict + Currently returns an empty dictionary. + """ + # Prepare GPU stream + gpustream_assignments: Dict[nodes.Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + gpustream_array_name, gpustream_var_name_prefix = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',') + + # Initialize the strategy for copies that occur outside of kernel execution + out_of_kernel_copy = OutOfKernelCopyStrategy() + + # Get all data copies to process the out of kernel copies + copy_worklist = self.find_all_data_copies(sdfg) + + for copy_sdfg, state, src_node, dst_node, edge in copy_worklist: + + copy_context = CopyContext(copy_sdfg, state, src_node, dst_node, edge, gpustream_assignments) + + # Only insert copy tasklets for GPU related copies occuring out of the + # kernel (i.e. a GPU_device scheduled map) + if not out_of_kernel_copy.applicable(copy_context): + continue + + # Generatae the copy call + code = out_of_kernel_copy.generate_copy(copy_context) + + # Prepare GPU ustream connectors and the stream to be accessed from the + # GPU stream array + gpustream_id = gpustream_assignments[dst_node] + gpustream_var_name = f"{gpustream_var_name_prefix}{gpustream_id}" + accessed_gpustream = f"{gpustream_array_name}[{gpustream_id}]" + + # Create the tasklet and add GPU stream related connectors + tasklet = state.add_tasklet("gpu_copy", {}, {}, code, language=dtypes.Language.CPP) + tasklet.add_in_connector(gpustream_var_name, dtypes.gpuStream_t, True) + tasklet.add_out_connector(gpustream_var_name, dtypes.gpuStream_t, True) + + # Add incoming and outgoing GPU stream accessNodes to the tasklet + in_gpustream = state.add_access(gpustream_array_name) + out_gpustream = state.add_access(gpustream_array_name) + state.add_edge(in_gpustream, None, tasklet, gpustream_var_name, dace.Memlet(accessed_gpustream)) + state.add_edge(tasklet, gpustream_var_name, out_gpustream, None, dace.Memlet(accessed_gpustream)) + + # Put the tasklet in between the edge + dst_node_pred, dst_node_conn, _, dst_conn, memlet = edge + state.add_edge(dst_node_pred, dst_node_conn, tasklet, None, copy.deepcopy(memlet)) + state.add_edge(tasklet, None, dst_node, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(edge) + + return {} + + def find_all_data_copies( + self, sdfg: SDFG) -> List[Tuple[SDFG, SDFGState, nodes.Node, nodes.Node, MultiConnectorEdge[mm.Memlet]]]: + """ + Finds and returns all data copies in the SDFG as tuples containing the SDFG, state, source node, + destination node, and the first memlet edge of in the memlet path between source and destination node. + + Parameters + ---------- + sdfg : SDFG + The SDFG to analyze for potential data copies. + + Returns + ------- + List[Tuple[SDFG, SDFGState, nodes.Node, nodes.Node, MultiConnectorEdge[mm.Memlet]]] + A list of tuples representing the data copy, each containing: + - The SDFG containing the copy + - The state in which the copy occurs + - The source node of the copy + - The destination node of the copy + - The first memlet edge representing the data movement + """ + copy_worklist: List[Tuple[SDFG, SDFGState, nodes.Node, nodes.Node, MultiConnectorEdge[mm.Memlet]]] = [] + visited_edges: Set[MultiConnectorEdge[mm.Memlet]] = set() + + for sub_sdfg in sdfg.all_sdfgs_recursive(): + for state in sub_sdfg.states(): + for edge in state.edges(): + + # Skip edges that were already processed + if edge in visited_edges: + continue + + # Get the memlet path and mark all edges in the path as visited + memlet_path = state.memlet_path(edge) + visited_edges.update(set(memlet_path)) + + # Get source and destination noces + first_edge = memlet_path[0] + last_edge = memlet_path[-1] + src_node = first_edge.src + dst_node = last_edge.dst + + # Skip empty memlets + if first_edge.data.subset is None: + continue + + # Add copy to the worklist + copy_worklist.append((sub_sdfg, state, src_node, dst_node, first_edge)) + + return copy_worklist \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py new file mode 100644 index 0000000000..62f3484a08 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py @@ -0,0 +1,288 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import dtypes, properties, SDFG, SDFGState +from dace.codegen import common +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.helpers import is_within_schedule_types +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets + + +STREAM_PLACEHOLDER = "__dace_current_stream" + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertGPUStreamSyncTasklets(ppl.Pass): + """ + Inserts GPU stream synchronization tasklets in an SDFG where needed. + + This pass uses a heuristic approach to find locations matching specific patterns + that require synchronization. Additional locations can be added easily if new + cases are discovered. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Tasklets | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Inserts GPU stream synchronization tasklets at required locations + after certain nodes and at the end of a state, for GPU streams used in the state. + """ + stream_assignments: Dict[nodes.Node, int] = pipeline_results['NaiveGPUStreamScheduler'] + + # Get sync locations + sync_state, sync_node = self._identify_sync_locations(sdfg, stream_assignments) + + # Synchronize after a node when required + self._insert_gpu_stream_sync_after_node(sdfg, sync_node, stream_assignments) + + # Synchronize all used streams at the end of a state + self._insert_gpu_stream_sync_at_state_end(sdfg, sync_state, stream_assignments) + return {} + + def _identify_sync_locations( + self, sdfg: SDFG, + stream_assignments: Dict[nodes.Node, int]) -> Tuple[Dict[SDFGState, Set[int]], Dict[nodes.Node, SDFGState]]: + """ + Heuristically identifies GPU stream synchronization points in an SDFG. + + Parameters + ---------- + sdfg : SDFG + The SDFG to analyze. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream ids. + + Returns + ------- + Tuple[Dict[SDFGState, Set[int]], Dict[nodes.Node, SDFGState]] + - **sync_state**: Maps each state to the set of stream IDs that should be + synchronized at the end of the state. + - **sync_node**: The keys of this dictionary are nodes after which synchronization + is needed, and their corresponding value is the state they belong to. + """ + + # ------------------ Helper predicates ----------------------------- + + def is_gpu_global_accessnode(node, state): + return isinstance(node, nodes.AccessNode) and node.desc( + state.parent).storage == dtypes.StorageType.GPU_Global + + def is_nongpu_accessnode(node, state): + return isinstance(node, nodes.AccessNode) and node.desc( + state.parent).storage not in dtypes.GPU_MEMORY_STORAGES_EXPERIMENTAL_CUDACODEGEN + + def is_kernel_exit(node): + return isinstance(node, nodes.ExitNode) and node.schedule == dtypes.ScheduleType.GPU_Device + + def is_sink_node(node, state): + return state.out_degree(node) == 0 + + def edge_within_kernel(state, src, dst): + gpu_schedules = dtypes.GPU_SCHEDULES_EXPERIMENTAL_CUDACODEGEN + src_in_kernel = is_within_schedule_types(state, src, gpu_schedules) + dst_in_kernel = is_within_schedule_types(state, dst, gpu_schedules) + return src_in_kernel and dst_in_kernel + + def is_tasklet_with_stream_use(src): + return isinstance(src, nodes.Tasklet) and STREAM_PLACEHOLDER in src.code.as_string + + # ------------------ Sync detection logic ----------------------------- + + sync_state: Dict[SDFGState, Set[int]] = {} + sync_node: Dict[nodes.Node, SDFGState] = {} + + for edge, state in sdfg.all_edges_recursive(): + src, dst = edge.src, edge.dst + + # Ensure state is initialized in sync_state + if state not in sync_state: + sync_state[state] = set() + + # --- Heuristics for when to sync --- + if (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) and is_sink_node(dst, state) + and not edge_within_kernel(state, src, dst)): + sync_state[state].add(stream_assignments[dst]) + + elif (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) and not is_sink_node(dst, state) + and not edge_within_kernel(state, src, dst)): + sync_node[dst] = state + sync_state[state].add(stream_assignments[dst]) + + elif (is_nongpu_accessnode(src, state) and is_gpu_global_accessnode(dst, state) + and not edge_within_kernel(state, src, dst)): + sync_state[state].add(stream_assignments[dst]) + + elif (is_kernel_exit(src) and is_gpu_global_accessnode(dst, state) and not is_sink_node(dst, state)): + sync_state[state].add(stream_assignments[src]) + sync_state[state].add(stream_assignments[src]) + + elif (is_kernel_exit(src) and is_gpu_global_accessnode(dst, state) and is_sink_node(dst, state)): + sync_state[state].add(stream_assignments[dst]) + + elif is_tasklet_with_stream_use(src): + sync_state[state].add(stream_assignments[src]) + + else: + continue + + # Check that state is indeed a SDFGState when added to the dictionary, to be on the safe side + if not isinstance(state, SDFGState): + raise NotImplementedError(f"Unexpected parent type '{type(state).__name__}' for edge '{edge}'. " + "Expected 'SDFGState'. Please handle this case explicitly.") + + # Remove states with no syncs + sync_state = {state: streams for state, streams in sync_state.items() if len(streams) > 0} + + return sync_state, sync_node + + def _insert_gpu_stream_sync_at_state_end(self, sdfg: SDFG, sync_state: Dict[SDFGState, Set[int]], + stream_assignments: Dict[nodes.Node, int]) -> None: + """ + Inserts GPU stream synchronization tasklets at the end of SDFG states. + + For each state that requires synchronization, this method: + + 1. Generates a tasklet that synchronizes all assigned GPU streams using + the appropriate backend (e.g., CUDA). + 2. Ensures all other operations in the state complete before synchronization + by connecting all sink nodes to the tasklet. + 3. Guarantees that only a single GPU stream AccessNode connects to the sync + tasklet, creating one if needed. + + Parameters + ---------- + sdfg : SDFG + The top level SDFG. + sync_state : Dict[SDFGState, Set[int] + Mapping of states to sets of stream IDs that require synchronization at the end of the state. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream IDs. + """ + # Prepare GPU stream info and backend + stream_array_name, stream_var_name_prefix = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',') + backend: str = common.get_gpu_backend() + + for state, streams in sync_state.items(): + + #----------------- Generate GPU stream synchronization Tasklet ----------------- + + # Build synchronization calls for all streams used in this state + sync_code_lines = [] + for stream in streams: + gpu_stream_var_name = f"{stream_var_name_prefix}{stream}" + sync_call = f"DACE_GPU_CHECK({backend}StreamSynchronize({gpu_stream_var_name}));" + sync_code_lines.append(sync_call) + sync_code = "\n".join(sync_code_lines) + + # Create the tasklet + tasklet = state.add_tasklet(name=f"gpu_stream_{stream}_synchronization", + inputs=set(), + outputs=set(), + code=sync_code, + language=dtypes.Language.CPP) + + # ----------------- Connect sink nodes to the synchronization tasklet ----------------- + + # 1. Seperate GPU stream sink nodes and other sink nodes + stream_sink_nodes: List[nodes.AccessNode] = [] + non_stream_sink_nodes: List[nodes.Node] = [] + for sink_node in state.sink_nodes(): + if isinstance(sink_node, nodes.AccessNode) and sink_node.desc(state).dtype == dtypes.gpuStream_t: + stream_sink_nodes.append(sink_node) + + elif sink_node != tasklet: + non_stream_sink_nodes.append(sink_node) + + # 2. Connect non-stream sink nodes to the sync tasklet + for sink_node in non_stream_sink_nodes: + state.add_edge(sink_node, None, tasklet, None, dace.Memlet()) + + # 3. Connect a single GPU stream sink node (create or merge if needed) + if len(stream_sink_nodes) == 0: + combined_stream_node = state.add_access(stream_array_name) + + else: + combined_stream_node = stream_sink_nodes.pop() + for stream_node in stream_sink_nodes: + for edge in state.in_edges(stream_node): + state.add_edge(edge.src, edge.src_conn, combined_stream_node, edge.dst_conn, edge.data) + state.remove_edge(edge) + state.remove_node(stream_node) + + # Connect back to output stream node + output_stream_node = state.add_access(combined_stream_node.data) + for stream in streams: + accessed_gpu_stream = f"{stream_array_name}[{stream}]" + conn = f"{stream_var_name_prefix}{stream}" # Note: Same as "gpu_stream_var_name" from tasklet + + tasklet.add_in_connector(conn, dtypes.gpuStream_t) + tasklet.add_out_connector(conn, dtypes.gpuStream_t, force=True) + state.add_edge(combined_stream_node, None, tasklet, conn, dace.Memlet(accessed_gpu_stream)) + state.add_edge(tasklet, conn, output_stream_node, None, dace.Memlet(accessed_gpu_stream)) + + def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.Node, SDFGState], + stream_assignments: Dict[nodes.Node, int]) -> None: + """ + Insert a GPU stream synchronization tasklet immediately after specified nodes. + + Parameters + ---------- + sdfg : SDFG + The top level SDFG. + sync_node : Dict[nodes.Node, SDFGState] + Mapping of nodes to their parent state. After after the node a GPU stream synchronization should occur. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream IDs. + """ + # Prepare GPU stream info and backend + stream_array_name, stream_var_name_prefix = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',') + backend: str = common.get_gpu_backend() + + for node, state in sync_node.items(): + + #----------------- Generate GPU stream synchronization Tasklet ----------------- + + # Get assigned GPU stream + stream = stream_assignments.get(node, "nullptr") + if stream == "nullptr": + raise NotImplementedError("Using the default 'nullptr' gpu stream is not supported yet.") + + # Create the tasklet + stream_var_name = f"{stream_var_name_prefix}{stream}" + sync_call = f"DACE_GPU_CHECK({backend}StreamSynchronize({stream_var_name}));\n" + tasklet = state.add_tasklet( name=f"gpu_stream_{stream}_synchronization", + inputs=set(), outputs=set(), + code=sync_call, language=dtypes.Language.CPP) + + + #----------------- Place tasklet between node and successors, link GPU streams ---------------- + + # 1. Put the tasklet between the node and its successors + for succ in state.successors(node): + state.add_edge(tasklet, None, succ, None, dace.Memlet()) + state.add_edge(node, None, tasklet, None, dace.Memlet()) + + # 2. Connect tasklet to GPU stream AccessNodes + in_stream = state.add_access(stream_array_name) + out_stream = state.add_access(stream_array_name) + accessed_stream = f"{stream_array_name}[{stream}]" + state.add_edge(in_stream, None, tasklet, stream_var_name, dace.Memlet(accessed_stream)) + state.add_edge(tasklet, stream_var_name, out_stream, None, dace.Memlet(accessed_stream)) + tasklet.add_in_connector(stream_var_name, dtypes.gpuStream_t, force=True) + tasklet.add_out_connector(stream_var_name, dtypes.gpuStream_t, force=True) \ No newline at end of file diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py b/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py new file mode 100644 index 0000000000..1896ec382c --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py @@ -0,0 +1,154 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import SDFG, dtypes, properties +from dace.config import Config +from dace.sdfg import is_devicelevel_gpu +from dace.sdfg.nodes import AccessNode, MapEntry, MapExit, Node, Tasklet +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler + +STREAM_PLACEHOLDER = "__dace_current_stream" + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertGPUStreamsToSDFGs(ppl.Pass): + """ + Inserts a GPU stream array into the top-level SDFG and propagates it to all + nested SDFGs that require it, including intermediate SDFGs along the hierarchy. + + This pass guarantees that every relevant SDFG has the array defined, avoiding + duplication and allowing subsequent passes in the GPU stream pipeline to rely + on its presence without redefining it. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Ensure that a GPU stream array is available in all SDFGs that require it. + + The pass creates the array once at the top-level SDFG and propagates it + down the hierarchy by inserting matching arrays in child SDFGs and wiring + them through nested SDFG connectors. This way, all SDFGs share a consistent + reference to the same GPU stream array. + """ + + # Extract stream array name and number of streams to allocate + stream_array_name = Config.get('compiler', 'cuda', 'gpu_stream_name').split(',')[0] + stream_assignments: Dict[Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + num_assigned_streams = max(stream_assignments.values(), default=0) + 1 + + # Add the GPU stream array at the top level + sdfg.add_transient(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Ensure GPU stream array is defined where required + for child_sdfg in self.find_child_sdfgs_requiring_gpu_stream(sdfg): + + # Skip if this child already has the array (inserted higher up in the hierarchy) + if stream_array_name in child_sdfg.arrays: + continue + + # Add the array to the child SDFG + inner_sdfg = child_sdfg + inner_sdfg.add_array(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Walk up the hierarchy until the array is found, inserting it into each parent + outer_sdfg = inner_sdfg.parent_sdfg + while stream_array_name not in outer_sdfg.arrays: + + # Insert array in parent SDFG + outer_sdfg.add_array(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Connect parent SDFG array to nested SDFG node + inner_nsdfg_node = inner_sdfg.parent_nsdfg_node + inner_parent_state = inner_sdfg.parent + inner_nsdfg_node.add_in_connector(stream_array_name, dtypes.gpuStream_t) + inp_gpu_stream: AccessNode = inner_parent_state.add_access(stream_array_name) + inner_parent_state.add_edge(inp_gpu_stream, None, inner_nsdfg_node, stream_array_name, + dace.Memlet(stream_array_name)) + + # Continue climbing up the hierarchy + inner_sdfg = outer_sdfg + outer_sdfg = outer_sdfg.parent_sdfg + + # Ensure final connection from the first parent that had the array down to this SDFG + inner_nsdfg_node = inner_sdfg.parent_nsdfg_node + inner_parent_state = inner_sdfg.parent + inner_nsdfg_node.add_in_connector(stream_array_name, dtypes.gpuStream_t) + inp_gpu_stream: AccessNode = inner_parent_state.add_access(stream_array_name) + inner_parent_state.add_edge(inp_gpu_stream, None, inner_nsdfg_node, stream_array_name, + dace.Memlet(f"{stream_array_name}[0:{num_assigned_streams}]")) + + outer_sdfg = inner_sdfg.parent_sdfg + + return {} + + def find_child_sdfgs_requiring_gpu_stream(self, sdfg) -> Set[SDFG]: + """ + Identify all child SDFGs that require a GPU stream array in their + array descriptor store. A child SDFG requires a GPU stream if: + + - It launches GPU kernels (MapEntry/MapExit with GPU_Device schedule). + - It contains special Tasklets (e.g., from library node expansion) that + use the GPU stream they are assigned to in the code. + - It accesses GPU global memory outside device-level GPU scopes, which + implies memory copies or kernel data feeds. + + Parameters + ---------- + sdfg : SDFG + The root SDFG to inspect. + + Returns + ------- + Set[SDFG] + The set of child SDFGs that need a GPU stream array in their array descriptor + store. + """ + requiring_gpu_stream = set() + for child_sdfg in sdfg.all_sdfgs_recursive(): + + # Skip the root SDFG itself + if child_sdfg is sdfg: + continue + + for state in child_sdfg.states(): + for node in state.nodes(): + + # Case 1: Kernel launch nodes + if isinstance(node, (MapEntry, MapExit)) and node.map.schedule == dtypes.ScheduleType.GPU_Device: + requiring_gpu_stream.add(child_sdfg) + break + + # Case 2: Tasklets that use GPU stream in their code + if isinstance(node, Tasklet) and STREAM_PLACEHOLDER in node.code.as_string: + requiring_gpu_stream.add(child_sdfg) + break + + # Case 3: Accessing GPU global memory outside device-level scopes + if (isinstance(node, AccessNode) and node.desc(state).storage == dtypes.StorageType.GPU_Global + and not is_devicelevel_gpu(state.sdfg, state, node)): + requiring_gpu_stream.add(child_sdfg) + break + + # Stop scanning this SDFG once a reason is found + if child_sdfg in requiring_gpu_stream: + break + + return requiring_gpu_stream \ No newline at end of file From eb10eacf55ba0e1539b5d21775228e826feb8976 Mon Sep 17 00:00:00 2001 From: aydogdub Date: Fri, 19 Dec 2025 14:54:22 +0100 Subject: [PATCH 2/3] added tests and adhzstments --- .../targets/gpu_helpers/copy_strategies.py | 1 - dace/config_schema.yml | 11 ++ dace/dtypes.py | 14 ++ dace/sdfg/state.py | 7 + dace/transformation/helpers.py | 1 + .../connect_gpu_streams_to_kernels.py | 2 +- .../connect_gpu_streams_to_tasklets.py | 2 +- .../gpu_stream_scheduling.py | 2 +- .../gpu_stream_topology_simplification.py | 3 +- .../insert_gpu_copy_tasklet.py | 2 +- .../insert_gpu_stream_sync_tasklets.py | 26 ++-- .../insert_gpu_streams_to_sdfgs.py | 2 +- .../gpu_specialization/gpu_stream_test.py | 134 ++++++++++++++++++ 13 files changed, 188 insertions(+), 19 deletions(-) create mode 100644 tests/passes/gpu_specialization/gpu_stream_test.py diff --git a/dace/codegen/targets/gpu_helpers/copy_strategies.py b/dace/codegen/targets/gpu_helpers/copy_strategies.py index 1b11f5bb2b..27a5b2c53b 100644 --- a/dace/codegen/targets/gpu_helpers/copy_strategies.py +++ b/dace/codegen/targets/gpu_helpers/copy_strategies.py @@ -551,4 +551,3 @@ def _generate_nd_copy(self, copy_context: CopyContext) -> None: # Return the code return call - diff --git a/dace/config_schema.yml b/dace/config_schema.yml index 812e24329e..ab77a7d7ff 100644 --- a/dace/config_schema.yml +++ b/dace/config_schema.yml @@ -458,6 +458,17 @@ required: will raise an exception if such a Memlet is encountered. This allows the user to have full control over all Maps in the SDFG. + # New configs, needed for new CUDACodeGen + gpu_stream_name: + type: str + title: Name for the GPU stream object + description: > + GPU streams allow GPU operations, such as kernel execution or memory transfers, to run asynchronously + and in parallel. This field specifies the naming convention for the hpu stream array and its connectors + in the SDFG. For example: 'gpu_streams,gpu_stream' means 'gpu_streams' is the array containing the + stream objects, and 'gpu_stream0' (prefix derived from the second name + stream id) is used as a + connector for gpu_streams[0]. + default: gpu_streams,gpu_stream ############################################# # General FPGA flags diff --git a/dace/dtypes.py b/dace/dtypes.py index faadc84a50..ef049af343 100644 --- a/dace/dtypes.py +++ b/dace/dtypes.py @@ -87,6 +87,18 @@ class ScheduleType(aenum.AutoNumberEnum): ScheduleType.GPU_Persistent, ] +# A subset of GPU schedule types for the new GPU backend +GPU_SCHEDULES_EXPERIMENTAL_CUDACODEGEN = [ + ScheduleType.GPU_Device, + ScheduleType.GPU_ThreadBlock, +] + +# A subset of on-GPU storage types for the new GPU backend +GPU_MEMORY_STORAGES_EXPERIMENTAL_CUDACODEGEN = [ + StorageType.GPU_Global, + StorageType.GPU_Shared, +] + # A subset of CPU schedule types CPU_SCHEDULES = [ ScheduleType.CPU_Multicore, @@ -1266,6 +1278,7 @@ def isconstant(var): complex128 = typeclass(numpy.complex128) string = stringtype() MPI_Request = opaque('MPI_Request') +gpuStream_t = opaque('gpuStream_t') @undefined_safe_enum @@ -1286,6 +1299,7 @@ class Typeclasses(aenum.AutoNumberEnum): float64 = float64 complex64 = complex64 complex128 = complex128 + gpuStream_t = gpuStream_t _bool = bool diff --git a/dace/sdfg/state.py b/dace/sdfg/state.py index d558053d3d..93b925b2c5 100644 --- a/dace/sdfg/state.py +++ b/dace/sdfg/state.py @@ -405,6 +405,13 @@ def memlet_path(self, edge: MultiConnectorEdge[mm.Memlet]) -> List[MultiConnecto if (edge.src_conn is None and edge.dst_conn is None and edge.data.is_empty()): return result + # For the gpu stream (i.e. cudastream, hipstream) management we can have dynamic out connectors, e.g. + # (GPU_Device-scheduled) MapExit: stream -> None: AccessNode, where AccessNode accesses a Stream array + # Memlets are used but its not about seing how data flows + if (isinstance(edge.src, nd.MapExit) and edge.src.map.schedule == dtypes.ScheduleType.GPU_Device + and isinstance(edge.dst, nd.AccessNode) and edge.dst.desc(state).dtype == dtypes.gpuStream_t): + return result + # Prepend incoming edges until reaching the source node curedge = edge visited = set() diff --git a/dace/transformation/helpers.py b/dace/transformation/helpers.py index f5607f952b..7756cbf1fe 100644 --- a/dace/transformation/helpers.py +++ b/dace/transformation/helpers.py @@ -1550,6 +1550,7 @@ def get_parent_map(state: SDFGState, node: Optional[nodes.Node] = None) -> Optio cursdfg = cursdfg.parent_sdfg return None + def is_within_schedule_types(state: SDFGState, node: nodes.Node, schedules: Set[dtypes.ScheduleType]) -> bool: """ Checks if the given node is enclosed within a Map whose schedule type diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py index 851f18e108..225dba00e4 100644 --- a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py @@ -67,4 +67,4 @@ def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): state.add_edge(kernel_exit, gpu_stream_var_name, stream_array_out, None, dace.Memlet(accessed_gpu_stream)) - return {} \ No newline at end of file + return {} diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py index 9877f2d563..58d9ff70ff 100644 --- a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py @@ -77,4 +77,4 @@ def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): state.add_edge(stream_array_in, None, node, gpu_stream_conn, dace.Memlet(accessed_gpu_stream)) state.add_edge(node, gpu_stream_conn, stream_array_out, None, dace.Memlet(accessed_gpu_stream)) - return {} \ No newline at end of file + return {} diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py index 0151d790b8..0ad3c2e7c0 100644 --- a/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py @@ -246,4 +246,4 @@ def gpu_relevant(node, parent) -> bool: if gpu_relevant(node, state): return True - return False \ No newline at end of file + return False diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py index 7af22aa6c6..7e1a62b29c 100644 --- a/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py @@ -14,6 +14,7 @@ from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets from dace.transformation.passes.gpu_specialization.insert_gpu_copy_tasklet import InsertGPUCopyTasklets + @properties.make_properties @transformation.explicit_cf_compatible class GPUStreamTopologySimplification(ppl.Pass): @@ -270,4 +271,4 @@ def example(A: dace.uint32[128], B: dace.uint32[128], state.remove_edge(in_edge) state.remove_node(sink_stream) - state.remove_node(passthrough_gpu_node) \ No newline at end of file + state.remove_node(passthrough_gpu_node) diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py b/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py index cea8fc1f43..162aa6143f 100644 --- a/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_copy_tasklet.py @@ -163,4 +163,4 @@ def find_all_data_copies( # Add copy to the worklist copy_worklist.append((sub_sdfg, state, src_node, dst_node, first_edge)) - return copy_worklist \ No newline at end of file + return copy_worklist diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py index 62f3484a08..2d2c1137de 100644 --- a/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py @@ -14,7 +14,6 @@ from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets - STREAM_PLACEHOLDER = "__dace_current_stream" @@ -30,7 +29,9 @@ class InsertGPUStreamSyncTasklets(ppl.Pass): """ def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: - return {NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets} + return { + NaiveGPUStreamScheduler, InsertGPUStreamsToSDFGs, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets + } def modifies(self) -> ppl.Modifies: return ppl.Modifies.Tasklets | ppl.Modifies.Memlets @@ -119,8 +120,8 @@ def is_tasklet_with_stream_use(src): and not edge_within_kernel(state, src, dst)): sync_state[state].add(stream_assignments[dst]) - elif (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) and not is_sink_node(dst, state) - and not edge_within_kernel(state, src, dst)): + elif (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) + and not is_sink_node(dst, state) and not edge_within_kernel(state, src, dst)): sync_node[dst] = state sync_state[state].add(stream_assignments[dst]) @@ -236,7 +237,7 @@ def _insert_gpu_stream_sync_at_state_end(self, sdfg: SDFG, sync_state: Dict[SDFG state.add_edge(combined_stream_node, None, tasklet, conn, dace.Memlet(accessed_gpu_stream)) state.add_edge(tasklet, conn, output_stream_node, None, dace.Memlet(accessed_gpu_stream)) - def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.Node, SDFGState], + def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.Node, SDFGState], stream_assignments: Dict[nodes.Node, int]) -> None: """ Insert a GPU stream synchronization tasklet immediately after specified nodes. @@ -258,7 +259,7 @@ def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.N #----------------- Generate GPU stream synchronization Tasklet ----------------- - # Get assigned GPU stream + # Get assigned GPU stream stream = stream_assignments.get(node, "nullptr") if stream == "nullptr": raise NotImplementedError("Using the default 'nullptr' gpu stream is not supported yet.") @@ -266,10 +267,11 @@ def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.N # Create the tasklet stream_var_name = f"{stream_var_name_prefix}{stream}" sync_call = f"DACE_GPU_CHECK({backend}StreamSynchronize({stream_var_name}));\n" - tasklet = state.add_tasklet( name=f"gpu_stream_{stream}_synchronization", - inputs=set(), outputs=set(), - code=sync_call, language=dtypes.Language.CPP) - + tasklet = state.add_tasklet(name=f"gpu_stream_{stream}_synchronization", + inputs=set(), + outputs=set(), + code=sync_call, + language=dtypes.Language.CPP) #----------------- Place tasklet between node and successors, link GPU streams ---------------- @@ -277,7 +279,7 @@ def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.N for succ in state.successors(node): state.add_edge(tasklet, None, succ, None, dace.Memlet()) state.add_edge(node, None, tasklet, None, dace.Memlet()) - + # 2. Connect tasklet to GPU stream AccessNodes in_stream = state.add_access(stream_array_name) out_stream = state.add_access(stream_array_name) @@ -285,4 +287,4 @@ def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.N state.add_edge(in_stream, None, tasklet, stream_var_name, dace.Memlet(accessed_stream)) state.add_edge(tasklet, stream_var_name, out_stream, None, dace.Memlet(accessed_stream)) tasklet.add_in_connector(stream_var_name, dtypes.gpuStream_t, force=True) - tasklet.add_out_connector(stream_var_name, dtypes.gpuStream_t, force=True) \ No newline at end of file + tasklet.add_out_connector(stream_var_name, dtypes.gpuStream_t, force=True) diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py b/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py index 1896ec382c..f45caa5dd0 100644 --- a/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_streams_to_sdfgs.py @@ -151,4 +151,4 @@ def find_child_sdfgs_requiring_gpu_stream(self, sdfg) -> Set[SDFG]: if child_sdfg in requiring_gpu_stream: break - return requiring_gpu_stream \ No newline at end of file + return requiring_gpu_stream diff --git a/tests/passes/gpu_specialization/gpu_stream_test.py b/tests/passes/gpu_specialization/gpu_stream_test.py new file mode 100644 index 0000000000..3bdd1f2aab --- /dev/null +++ b/tests/passes/gpu_specialization/gpu_stream_test.py @@ -0,0 +1,134 @@ +# Copyright 2019-2025 ETH Zurich and the DaCe authors. All rights reserved. +import pytest + +import dace +from dace.codegen import common +from dace.transformation.pass_pipeline import Pipeline +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams_to_sdfgs import InsertGPUStreamsToSDFGs +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_copy_tasklet import InsertGPUCopyTasklets +from dace.transformation.passes.gpu_specialization.gpu_stream_topology_simplification import GPUStreamTopologySimplification + +gpu_stream_pipeline = Pipeline([ + NaiveGPUStreamScheduler(), + InsertGPUStreamsToSDFGs(), + ConnectGPUStreamsToKernels(), + ConnectGPUStreamsToTasklets(), + InsertGPUStreamSyncTasklets(), + InsertGPUCopyTasklets(), + GPUStreamTopologySimplification(), +]) + +backend = common.get_gpu_backend() + +@pytest.mark.gpu +def test_basic(): + """ + A simple memory copy program. + + Since the SDFG has a single connected component, exactly one GPU stream is used + and must be synchronized at the end of the state. For each synchronized stream, + the pipeline introduces a memlet from the synchronization tasklet to a GPU stream + AccessNode. Therefore, it is sufficient to verify there is only one sink node with one ingoing + edge, verify its dtype, and check for the presence of a preceeding synchronization tasklet. + """ + @dace.program + def simple_copy( + A: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global, + B: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global + ): + for i in dace.map[0:128:1] @ dace.dtypes.ScheduleType.GPU_Device: + B[i] = A[i] + + sdfg = simple_copy.to_sdfg() + gpu_stream_pipeline.apply_pass(sdfg, {}) + + state = sdfg.states()[0] + sink_nodes = state.sink_nodes() + node = sink_nodes[0] + assert ( + len(sink_nodes) == 1 + and len(state.in_edges(node)) == 1 + and isinstance(node, dace.nodes.AccessNode) + and node.desc(state).dtype == dace.dtypes.gpuStream_t + ),( + "Only one sink node with should exist, which is a GPU stream AccessNode and it should have one ingoing edge." + ) + + assert ( + isinstance(pre, dace.nodes.Tasklet) + and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node) + ), ( + "At then end of each state any used stream must be synchronized." + ) + +@pytest.mark.gpu +def test_extended(): + """ + A program that performs two independent memory copies. + + The input arrays reside in host memory, and `gpu_transformations()` is applied to + the program. As a result, the data is first copied to GPU global memory, after + which the two copies are executed on the GPU. Since these copies form two + independent connected components in the resulting SDFG, the naive GPU stream + scheduler assigns them to different GPU streams. + + This test verifies that exactly two GPU streams are used, that both streams are + synchronized at the end of the state, and that the corresponding asynchronous + memory copy tasklets are correctly associated with their assigned streams. + """ + @dace.program + def independent_copies(A : dace.uint32[128], B: dace.uint32[128], C : dace.uint32[128], D: dace.uint32[128]): + for i in dace.map[0:128:1]: + B[i] = A[i] + for i in dace.map[0:128:1]: + D[i] = C[i] + + sdfg = independent_copies.to_sdfg() + + # Transform such that program can run on GPU and apply GPU stream pipeline + sdfg.apply_gpu_transformations() + gpu_stream_pipeline.apply_pass(sdfg, {}) + + # Test 1: Two GPU streams were used since we use the Naive Stream scheduler + state = sdfg.states()[0] + sink_nodes = state.sink_nodes() + node = sink_nodes[0] + assert ( + len(sink_nodes) == 1 + and len(state.in_edges(node)) == 2 + and isinstance(node, dace.nodes.AccessNode) + and node.desc(state).dtype == dace.dtypes.gpuStream_t + ),( + "Only one sink node with should exist, which is a GPU stream AccessNode and it " + "should have two ingoing edges as original graph consisted of two connected components." + ) + + # Test 2: We synchronize at the end of the state + assert ( + isinstance(pre, dace.nodes.Tasklet) + and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node) + ), ( + "At then end of each state any used stream must be synchronized." + ) + + # Test 3: Check that we have memory copy tasklets (as we perform two "Main Memory -> GPU GLobal" + # memory copies and two "GPU Global -> Main Memory" memory copies by applying the gpu tranformation) + # and that they use the name of the in connector of the GPU stream in the copy call + memcopy_tasklets = [n for n in state.nodes() if isinstance(n, dace.nodes.Tasklet) and f"{backend}MemcpyAsync(" in n.code.as_string] + for tasklet in memcopy_tasklets: + assert len(tasklet.in_connectors) == 1, ( + "Memcpy tasklets must have exactly one input connector " + "corresponding to the GPU stream." + ) + + in_connector = next(iter(tasklet.in_connectors)) + + assert in_connector in tasklet.code.as_string, ( + "Memcpy tasklets must reference their GPU stream input connector in the memcpy call." + ) From 69fbb731e2ece60d731f73734ccc242000fec2c8 Mon Sep 17 00:00:00 2001 From: aydogdub Date: Fri, 19 Dec 2025 14:59:25 +0100 Subject: [PATCH 3/3] run pre-commit --- .../gpu_specialization/gpu_stream_test.py | 74 +++++++------------ 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/tests/passes/gpu_specialization/gpu_stream_test.py b/tests/passes/gpu_specialization/gpu_stream_test.py index 3bdd1f2aab..07d1facdf9 100644 --- a/tests/passes/gpu_specialization/gpu_stream_test.py +++ b/tests/passes/gpu_specialization/gpu_stream_test.py @@ -24,6 +24,7 @@ backend = common.get_gpu_backend() + @pytest.mark.gpu def test_basic(): """ @@ -31,15 +32,14 @@ def test_basic(): Since the SDFG has a single connected component, exactly one GPU stream is used and must be synchronized at the end of the state. For each synchronized stream, - the pipeline introduces a memlet from the synchronization tasklet to a GPU stream - AccessNode. Therefore, it is sufficient to verify there is only one sink node with one ingoing + the pipeline introduces a memlet from the synchronization tasklet to a GPU stream + AccessNode. Therefore, it is sufficient to verify there is only one sink node with one ingoing edge, verify its dtype, and check for the presence of a preceeding synchronization tasklet. """ + @dace.program - def simple_copy( - A: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global, - B: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global - ): + def simple_copy(A: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global, + B: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global): for i in dace.map[0:128:1] @ dace.dtypes.ScheduleType.GPU_Device: B[i] = A[i] @@ -50,21 +50,13 @@ def simple_copy( sink_nodes = state.sink_nodes() node = sink_nodes[0] assert ( - len(sink_nodes) == 1 - and len(state.in_edges(node)) == 1 - and isinstance(node, dace.nodes.AccessNode) + len(sink_nodes) == 1 and len(state.in_edges(node)) == 1 and isinstance(node, dace.nodes.AccessNode) and node.desc(state).dtype == dace.dtypes.gpuStream_t - ),( - "Only one sink node with should exist, which is a GPU stream AccessNode and it should have one ingoing edge." - ) - - assert ( - isinstance(pre, dace.nodes.Tasklet) - and f"{backend}StreamSynchronize(" in pre.code.as_string - for pre in state.predecessors(node) - ), ( - "At then end of each state any used stream must be synchronized." - ) + ), ("Only one sink node with should exist, which is a GPU stream AccessNode and it should have one ingoing edge.") + + assert (isinstance(pre, dace.nodes.Tasklet) and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node)), ("At then end of each state any used stream must be synchronized.") + @pytest.mark.gpu def test_extended(): @@ -81,8 +73,9 @@ def test_extended(): synchronized at the end of the state, and that the corresponding asynchronous memory copy tasklets are correctly associated with their assigned streams. """ + @dace.program - def independent_copies(A : dace.uint32[128], B: dace.uint32[128], C : dace.uint32[128], D: dace.uint32[128]): + def independent_copies(A: dace.uint32[128], B: dace.uint32[128], C: dace.uint32[128], D: dace.uint32[128]): for i in dace.map[0:128:1]: B[i] = A[i] for i in dace.map[0:128:1]: @@ -98,37 +91,26 @@ def independent_copies(A : dace.uint32[128], B: dace.uint32[128], C : dace.uint3 state = sdfg.states()[0] sink_nodes = state.sink_nodes() node = sink_nodes[0] - assert ( - len(sink_nodes) == 1 - and len(state.in_edges(node)) == 2 - and isinstance(node, dace.nodes.AccessNode) - and node.desc(state).dtype == dace.dtypes.gpuStream_t - ),( - "Only one sink node with should exist, which is a GPU stream AccessNode and it " - "should have two ingoing edges as original graph consisted of two connected components." - ) + assert (len(sink_nodes) == 1 and len(state.in_edges(node)) == 2 and isinstance(node, dace.nodes.AccessNode) + and node.desc(state).dtype == dace.dtypes.gpuStream_t), ( + "Only one sink node with should exist, which is a GPU stream AccessNode and it " + "should have two ingoing edges as original graph consisted of two connected components.") # Test 2: We synchronize at the end of the state - assert ( - isinstance(pre, dace.nodes.Tasklet) - and f"{backend}StreamSynchronize(" in pre.code.as_string - for pre in state.predecessors(node) - ), ( - "At then end of each state any used stream must be synchronized." - ) - - # Test 3: Check that we have memory copy tasklets (as we perform two "Main Memory -> GPU GLobal" + assert (isinstance(pre, dace.nodes.Tasklet) and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node)), ("At then end of each state any used stream must be synchronized.") + + # Test 3: Check that we have memory copy tasklets (as we perform two "Main Memory -> GPU GLobal" # memory copies and two "GPU Global -> Main Memory" memory copies by applying the gpu tranformation) # and that they use the name of the in connector of the GPU stream in the copy call - memcopy_tasklets = [n for n in state.nodes() if isinstance(n, dace.nodes.Tasklet) and f"{backend}MemcpyAsync(" in n.code.as_string] + memcopy_tasklets = [ + n for n in state.nodes() if isinstance(n, dace.nodes.Tasklet) and f"{backend}MemcpyAsync(" in n.code.as_string + ] for tasklet in memcopy_tasklets: - assert len(tasklet.in_connectors) == 1, ( - "Memcpy tasklets must have exactly one input connector " - "corresponding to the GPU stream." - ) + assert len(tasklet.in_connectors) == 1, ("Memcpy tasklets must have exactly one input connector " + "corresponding to the GPU stream.") in_connector = next(iter(tasklet.in_connectors)) assert in_connector in tasklet.code.as_string, ( - "Memcpy tasklets must reference their GPU stream input connector in the memcpy call." - ) + "Memcpy tasklets must reference their GPU stream input connector in the memcpy call.")