diff --git a/dace/codegen/dispatcher.py b/dace/codegen/dispatcher.py index ff15f110bb..421225c7a4 100644 --- a/dace/codegen/dispatcher.py +++ b/dace/codegen/dispatcher.py @@ -98,7 +98,8 @@ def add(self, name: str, dtype: DefinedType, ctype: str, ancestor: int = 0, allo for _, scope, can_access_parent in reversed(self._scopes): if name in scope: err_str = "Shadowing variable {} from type {} to {}".format(name, scope[name], dtype) - if (allow_shadowing or config.Config.get_bool("compiler", "allow_shadowing")): + if (allow_shadowing or config.Config.get_bool("compiler", "allow_shadowing") + or dtype == DefinedType.GPUStream): if not allow_shadowing: print("WARNING: " + err_str) else: diff --git a/dace/codegen/targets/cpp.py b/dace/codegen/targets/cpp.py index 3b6e4efb8f..7acf73afc3 100644 --- a/dace/codegen/targets/cpp.py +++ b/dace/codegen/targets/cpp.py @@ -218,14 +218,17 @@ def memlet_copy_to_absolute_strides(dispatcher: 'TargetDispatcher', def is_cuda_codegen_in_device(framecode) -> bool: """ - Check the state of the CUDA code generator, whether it is inside device code. + Check the state of the (Experimental) CUDA code generator, whether it is inside device code. """ from dace.codegen.targets.cuda import CUDACodeGen + + cudaClass = CUDACodeGen + if framecode is None: cuda_codegen_in_device = False else: for codegen in framecode.targets: - if isinstance(codegen, CUDACodeGen): + if isinstance(codegen, cudaClass): cuda_codegen_in_device = codegen._in_device_code break else: @@ -248,11 +251,9 @@ def ptr(name: str, desc: data.Data, sdfg: SDFG = None, framecode: 'DaCeCodeGener root = name.split('.')[0] if root in sdfg.arrays and isinstance(sdfg.arrays[root], data.Structure): name = name.replace('.', '->') - # Special case: If memory is persistent and defined in this SDFG, add state # struct to name if (desc.transient and desc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External)): - if desc.storage == dtypes.StorageType.CPU_ThreadLocal: # Use unambiguous name for thread-local arrays return f'__{sdfg.cfg_id}_{name}' elif not is_cuda_codegen_in_device(framecode): # GPU kernels cannot access state @@ -807,9 +808,12 @@ def unparse_cr(sdfg, wcr_ast, dtype): def connected_to_gpu_memory(node: nodes.Node, state: SDFGState, sdfg: SDFG): for e in state.all_edges(node): path = state.memlet_path(e) - if ((isinstance(path[0].src, nodes.AccessNode) - and path[0].src.desc(sdfg).storage is dtypes.StorageType.GPU_Global)): + if (((isinstance(path[0].src, nodes.AccessNode) + and path[0].src.desc(sdfg).storage is dtypes.StorageType.GPU_Global)) + or ((isinstance(path[-1].dst, nodes.AccessNode) + and path[-1].dst.desc(sdfg).storage is dtypes.StorageType.GPU_Global))): return True + return False diff --git a/dace/codegen/targets/cpu.py b/dace/codegen/targets/cpu.py index 23e230f9d0..acd9f0a646 100644 --- a/dace/codegen/targets/cpu.py +++ b/dace/codegen/targets/cpu.py @@ -502,6 +502,14 @@ def allocate_array(self, return elif (nodedesc.storage == dtypes.StorageType.Register): + # The assignment necessary to unify the explicit streams and streams declared through + # the state of the SDFG. + if nodedesc.dtype == dtypes.gpuStream_t: + ctype = dtypes.gpuStream_t.ctype + allocation_stream.write(f"{ctype}* {name} = __state->gpu_context->streams;") + define_var(name, DefinedType.Pointer, ctype) + return + ctypedef = dtypes.pointer(nodedesc.dtype).ctype if nodedesc.start_offset != 0: raise NotImplementedError('Start offset unsupported for registers') @@ -577,6 +585,9 @@ def deallocate_array(self, sdfg: SDFG, cfg: ControlFlowRegion, dfg: StateSubgrap if isinstance(nodedesc, (data.Scalar, data.View, data.Stream, data.Reference)): return + elif nodedesc.dtype == dtypes.gpuStream_t: + callsite_stream.write(f"{alloc_name} = nullptr;") + return elif (nodedesc.storage == dtypes.StorageType.CPU_Heap or (nodedesc.storage == dtypes.StorageType.Register and (symbolic.issymbolic(arrsize, sdfg.constants) or @@ -994,6 +1005,11 @@ def process_out_memlets(self, dst_edge = dfg.memlet_path(edge)[-1] dst_node = dst_edge.dst + if isinstance(dst_node, nodes.AccessNode) and dst_node.desc(state).dtype == dtypes.gpuStream_t: + # Special case: GPU Streams do not represent data flow - they assing GPU Streams to kernels/tasks + # Thus, nothing needs to be written and out memlets of this kind should be ignored. + continue + # Target is neither a data nor a tasklet node if isinstance(node, nodes.AccessNode) and (not isinstance(dst_node, nodes.AccessNode) and not isinstance(dst_node, nodes.CodeNode)): @@ -1035,8 +1051,7 @@ def process_out_memlets(self, # Tasklet -> array with a memlet. Writing to array is emitted only if the memlet is not empty if isinstance(node, nodes.CodeNode) and not edge.data.is_empty(): if not uconn: - raise SyntaxError("Cannot copy memlet without a local connector: {} to {}".format( - str(edge.src), str(edge.dst))) + return conntype = node.out_connectors[uconn] is_scalar = not isinstance(conntype, dtypes.pointer) @@ -1254,7 +1269,6 @@ def memlet_definition(self, # Dynamic WCR memlets start uninitialized result += "{} {};".format(memlet_type, local_name) defined = DefinedType.Scalar - else: if not memlet.dynamic: if is_scalar: @@ -1289,8 +1303,12 @@ def memlet_definition(self, memlet_type = ctypedef result += "{} &{} = {};".format(memlet_type, local_name, expr) defined = DefinedType.Stream - else: - raise TypeError("Unknown variable type: {}".format(var_type)) + + # Set Defined Type for GPU Stream connectors + # Shadowing for stream variable needs to be allowed + if memlet_type == 'gpuStream_t': + var_type = DefinedType.GPUStream + defined = DefinedType.GPUStream if defined is not None: self._dispatcher.defined_vars.add(local_name, defined, memlet_type, allow_shadowing=allow_shadowing) diff --git a/dace/config_schema.yml b/dace/config_schema.yml index 2b05d45232..4732d75198 100644 --- a/dace/config_schema.yml +++ b/dace/config_schema.yml @@ -268,7 +268,7 @@ required: type: str title: Arguments description: Compiler argument flags - default: '-fPIC -Wall -Wextra -O3 -march=native -ffast-math -Wno-unused-parameter -Wno-unused-label' + default: '-fopenmp -fPIC -Wall -Wextra -O3 -march=native -ffast-math -Wno-unused-parameter -Wno-unused-label' default_Windows: '/O2 /fp:fast /arch:AVX2 /D_USRDLL /D_WINDLL /D__restrict__=__restrict' libs: @@ -326,7 +326,7 @@ required: Additional CUDA architectures (separated by commas) to compile GPU code for, excluding the current architecture on the compiling machine. - default: '60' + default: '86' hip_arch: type: str diff --git a/dace/dtypes.py b/dace/dtypes.py index c2835d85a4..190d078d0f 100644 --- a/dace/dtypes.py +++ b/dace/dtypes.py @@ -71,6 +71,7 @@ class ScheduleType(AutoNumberEnum): GPU_ThreadBlock = () #: Thread-block code GPU_ThreadBlock_Dynamic = () #: Allows rescheduling work within a block GPU_Persistent = () + GPU_Warp = () Snitch = () Snitch_Multicore = () @@ -84,6 +85,11 @@ class ScheduleType(AutoNumberEnum): ScheduleType.GPU_Persistent, ] +# A subset of GPU schedule types for ExperimentalCUDACodeGen +EXPERIMENTAL_GPU_SCHEDULES = [ + ScheduleType.GPU_Warp, +] + # A subset of CPU schedule types CPU_SCHEDULES = [ ScheduleType.CPU_Multicore, @@ -95,6 +101,8 @@ class ScheduleType(AutoNumberEnum): StorageType.GPU_Shared, ] +GPU_KERNEL_ACCESSIBLE_STORAGES = [StorageType.GPU_Global, StorageType.GPU_Shared, StorageType.Register] + @undefined_safe_enum class ReductionType(AutoNumberEnum): @@ -192,7 +200,8 @@ class TilingType(AutoNumberEnum): ScheduleType.GPU_ThreadBlock: StorageType.Register, ScheduleType.GPU_ThreadBlock_Dynamic: StorageType.Register, ScheduleType.SVE_Map: StorageType.CPU_Heap, - ScheduleType.Snitch: StorageType.Snitch_TCDM + ScheduleType.Snitch: StorageType.Snitch_TCDM, + ScheduleType.GPU_Warp: StorageType.Register, } # Maps from ScheduleType to default ScheduleType for sub-scopes @@ -207,9 +216,10 @@ class TilingType(AutoNumberEnum): ScheduleType.GPU_Device: ScheduleType.GPU_ThreadBlock, ScheduleType.GPU_ThreadBlock: ScheduleType.Sequential, ScheduleType.GPU_ThreadBlock_Dynamic: ScheduleType.Sequential, + ScheduleType.GPU_Warp: ScheduleType.Sequential, ScheduleType.SVE_Map: ScheduleType.Sequential, ScheduleType.Snitch: ScheduleType.Snitch, - ScheduleType.Snitch_Multicore: ScheduleType.Snitch_Multicore + ScheduleType.Snitch_Multicore: ScheduleType.Snitch_Multicore, } # Maps from StorageType to a preferred ScheduleType for helping determine schedules. @@ -1240,6 +1250,7 @@ class string(_DaCeArray, npt.NDArray[numpy.str_]): ... class vector(_DaCeArray, npt.NDArray[numpy.void]): ... class MPI_Request(_DaCeArray, npt.NDArray[numpy.void]): ... class float32sr(_DaCeArray, npt.NDArray[numpy.float32]): ... + class gpuStream_t(_DaCeArray, npt.NDArray[numpy.void]): ... # yapf: enable else: # Runtime definitions @@ -1260,6 +1271,7 @@ class float32sr(_DaCeArray, npt.NDArray[numpy.float32]): ... complex128 = typeclass(numpy.complex128) string = stringtype() MPI_Request = opaque('MPI_Request') + gpuStream_t = opaque('gpuStream_t') float32sr = Float32sr() @@ -1281,6 +1293,7 @@ class Typeclasses(AutoNumberEnum): float64 = float64 complex64 = complex64 complex128 = complex128 + gpuStream_t = gpuStream_t _bool = bool @@ -1508,6 +1521,7 @@ def can_access(schedule: ScheduleType, storage: StorageType): ScheduleType.GPU_Persistent, ScheduleType.GPU_ThreadBlock, ScheduleType.GPU_ThreadBlock_Dynamic, + ScheduleType.GPU_Warp, ]: return storage in [StorageType.GPU_Global, StorageType.GPU_Shared, StorageType.CPU_Pinned] elif schedule in [ScheduleType.Default, ScheduleType.CPU_Multicore, ScheduleType.CPU_Persistent]: diff --git a/dace/sdfg/state.py b/dace/sdfg/state.py index 97ddc91e58..c9240c3ef3 100644 --- a/dace/sdfg/state.py +++ b/dace/sdfg/state.py @@ -406,6 +406,13 @@ def memlet_path(self, edge: MultiConnectorEdge[mm.Memlet]) -> List[MultiConnecto if (edge.src_conn is None and edge.dst_conn is None and edge.data.is_empty()): return result + # For the explicit (new) gpu stream handling we can have dynamic out connectors, e.g. + # KernelExit: stream -> None: AccessNode, where AccessNode accesses a Stream array + # Memlets are used but its not about seing how data flows + if (isinstance(edge.src, nd.MapExit) and edge.src.map.schedule == dtypes.ScheduleType.GPU_Device + and isinstance(edge.dst, nd.AccessNode) and edge.dst.desc(state).dtype == dtypes.gpuStream_t): + return result + # Prepend incoming edges until reaching the source node curedge = edge visited = set() diff --git a/dace/sdfg/validation.py b/dace/sdfg/validation.py index eec95abf0e..e7e5c82f59 100644 --- a/dace/sdfg/validation.py +++ b/dace/sdfg/validation.py @@ -858,9 +858,14 @@ def validate_state(state: 'dace.sdfg.SDFGState', for oe in state.out_edges(dst_node)}): pass else: - raise InvalidSDFGEdgeError( - f"Memlet creates an invalid path (sink node {dst_node}" - " should be a data node)", sdfg, state_id, eid) + if isinstance(dst_node, nd.Tasklet) and len(dst_node.in_connectors) == 0 and len( + dst_node.out_connectors) == 0: + # Tasklets with no input or output connector -> sync tasklet -> OK + pass + else: + raise InvalidSDFGEdgeError( + f"Memlet creates an invalid path (sink node {dst_node}" + " should be a data node)", sdfg, state_id, eid) # If scope(dst) is disjoint from scope(src), it's an illegal memlet else: raise InvalidSDFGEdgeError("Illegal memlet between disjoint scopes", sdfg, state_id, eid) diff --git a/dace/transformation/helpers.py b/dace/transformation/helpers.py index bb4bd85d3f..8c36310a02 100644 --- a/dace/transformation/helpers.py +++ b/dace/transformation/helpers.py @@ -1959,3 +1959,33 @@ def _is_pointer(obj) -> bool: def _is_structure_view(obj) -> bool: """Check if object is a StructureView.""" return isinstance(obj, data.StructureView) + + +def is_within_schedule_types(state: SDFGState, node: nodes.Node, schedules: Set[dtypes.ScheduleType]) -> bool: + """ + Checks if the given node is enclosed within a Map whose schedule type + matches any in the `schedules` set. + Parameters + ---------- + state : SDFGState + The State where the node resides + node : nodes.Node + The node to check. + schedules : set[dtypes.ScheduleType] + A set of schedule types to match (e.g., {dtypes.ScheduleType.GPU_Device}). + Returns + ---------- + bool + True if the node is enclosed by a Map with a schedule type in `schedules`, False otherwise. + """ + current = node + + while current is not None: + if isinstance(current, nodes.MapEntry): + if current.map.schedule in schedules: + return True + + parent = get_parent_map(state, current) + if parent is None: + return False + current, state = parent diff --git a/dace/transformation/passes/gpu_specialization/__init__.py b/dace/transformation/passes/gpu_specialization/__init__.py new file mode 100644 index 0000000000..1469adb5ea --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/__init__.py @@ -0,0 +1 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py new file mode 100644 index 0000000000..9ab6b5160b --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_kernels.py @@ -0,0 +1,69 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import dtypes, properties, SDFG +from dace.codegen import common +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_gpu_streams import InsertGPUStreams, get_gpu_stream_array_name, get_gpu_stream_connector_name + + +@properties.make_properties +@transformation.explicit_cf_compatible +class ConnectGPUStreamsToKernels(ppl.Pass): + """ + This Pass attaches GPU streams to kernels (i.e., dtypes.ScheduleType.GPU_Device scheduled maps). + + Adds GPU stream AccessNodes and connects them to kernel entry and exit nodes, + indicating which GPU stream each kernel is assigned to. These assignments are e.g. + used when launching the kernels. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreams} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + # Retrieve the GPU stream array name and the prefix for individual stream variables + stream_array_name = get_gpu_stream_array_name() + stream_var_name_prefix = get_gpu_stream_connector_name() + + # Retrieve GPU stream assignments for nodes + stream_assignments: Dict[nodes.Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + + # Link kernels to their assigned GPU streams + for sub_sdfg in sdfg.all_sdfgs_recursive(): + + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Not a kernel entry - continue + if not (isinstance(node, nodes.MapEntry) and node.map.schedule == dtypes.ScheduleType.GPU_Device): + continue + + # Stream connector name and the used GPU Stream for the kernel + assigned_gpustream = stream_assignments[node] + gpu_stream_var_name = f"{stream_var_name_prefix}{assigned_gpustream}" + accessed_gpu_stream = f"{stream_array_name}[{assigned_gpustream}]" + + # Assign the GPU stream to the kernel entry + kernel_entry = node + kernel_entry.add_in_connector(gpu_stream_var_name, dtypes.gpuStream_t) + stream_array_in = state.add_access(stream_array_name) + state.add_edge(stream_array_in, None, kernel_entry, gpu_stream_var_name, + dace.Memlet(accessed_gpu_stream)) + + # Assign the GPU stream to the kernel exit + kernel_exit = state.exit_node(kernel_entry) + stream_array_out = state.add_access(stream_array_name) + state.add_edge(kernel_exit, None, stream_array_out, None, dace.Memlet(None)) + + return {} diff --git a/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py new file mode 100644 index 0000000000..5a86321673 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/connect_gpu_streams_to_tasklets.py @@ -0,0 +1,79 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import dtypes, properties, SDFG +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import get_default_gpu_stream_name +from dace.transformation.passes.gpu_specialization.insert_gpu_streams import InsertGPUStreams, get_gpu_stream_array_name, get_dace_runtime_gpu_stream_name, get_gpu_stream_connector_name +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels + +# Placeholder for the GPU stream variable used in tasklet code + + +@properties.make_properties +@transformation.explicit_cf_compatible +class ConnectGPUStreamsToTasklets(ppl.Pass): + """ + This pass ensures that tasklets which require access to their assigned GPU stream + are provided with it explicitly. + + Such tasklets typically originate from expanded LibraryNodes targeting GPUs. + These nodes may reference the special placeholder variable `__dace_current_stream`, + which is expected to be defined during unparsing in `cpp.py`. + + To avoid relying on this "hidden" mechanism, the pass rewrites tasklets to use + the GPU stream AccessNode directly. + + Note that this pass is similar to `ConnectGPUStreamsToKernels`. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreams, ConnectGPUStreamsToKernels} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + # Retrieve the GPU stream's array name + stream_array_name = get_gpu_stream_array_name() + + # Retrieve GPU stream assignments for nodes + stream_assignments: Dict[nodes.Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + + # Find all tasklets which use the GPU stream variable (get_dace_runtime_gpu_stream_name()) in the code + # and provide them the needed GPU stream explicitly + for sub_sdfg in sdfg.all_sdfgs_recursive(): + + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Not a tasklet - continue + if not isinstance(node, nodes.Tasklet): + continue + + # Tasklet does not need use its assigned GPU stream - continue + if not get_dace_runtime_gpu_stream_name() in node.code.as_string: + continue + + # Stream connector name and the used GPU Stream for the kernel + assigned_gpustream = stream_assignments[node] + gpu_stream_conn = get_default_gpu_stream_name() + accessed_gpu_stream = f"{stream_array_name}[{assigned_gpustream}]" + + # Provide the GPU stream explicitly to the tasklet + stream_array_in = state.add_access(stream_array_name) + stream_array_out = state.add_access(stream_array_name) + + node.add_in_connector(gpu_stream_conn, dtypes.gpuStream_t) + + state.add_edge(stream_array_in, None, node, gpu_stream_conn, dace.Memlet(accessed_gpu_stream)) + state.add_edge(node, None, stream_array_out, None, dace.Memlet(None)) + + return {} diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py new file mode 100644 index 0000000000..bd9764ea32 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_scheduling.py @@ -0,0 +1,249 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Dict, List, Set, Type, Union + +import dace +from dace import SDFG, SDFGState, properties +from dace.config import Config +from dace.sdfg import nodes +from dace.sdfg.graph import Graph, NodeT +from dace.transformation import pass_pipeline as ppl, transformation + +# Placeholder for the GPU stream variable used in tasklet code +STREAM_PLACEHOLDER = "__dace_current_stream" + + +@properties.make_properties +@transformation.explicit_cf_compatible +class NaiveGPUStreamScheduler(ppl.Pass): + """ + Assigns GPU streams to nodes and stores the assignments in a dictionary. + This can be useful for enabling asynchronous and parallel GPU computation using GPU streams. + + Strategy Overview: + ------------------ + - GPU stream assignment is based on weakly connected components (WCCs) within each state. + - Nodes in the same WCC are assigned to the same stream. + - For top-level states (not within nested SDFGs), each new WCC starts on a new stream (starting from 0). + - In nested SDFGs: + * Stream assignment is inherited from the parent component, + * All internal components share the parent's stream. + - GPU stream IDs wrap around according to the `max_concurrent_streams` configuration. + + Example: + -------- + A state with the following independent chains: + K1 → K2 + K3 → K4 → K5 + K6 + + would be scheduled as: + K1, K2 → stream 0 + K3, K4, K5 → stream 1 + K6 → stream 2 + + (assuming no limit on the number of concurrent streams) + + Note: + ----- + These refer to **backend GPU streams** (e.g., CUDA or HIP), not DaCe symbolic streams. + """ + + def __init__(self): + # Maximum number of concurrent streams allowed (from config). + # Cached locally for frequent reuse. + self._max_concurrent_streams = int(Config.get('compiler', 'cuda', 'max_concurrent_streams')) + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Nothing + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, _) -> Dict[nodes.Node, int]: + """ + Assigns GPU streams to nodes within the given SDFG. + + Parameters + ---------- + sdfg : SDFG + The top-level SDFG to process. + pipeline_results : Dict + Unused. + + Returns + ------- + Dict[nodes.Node, int] + A dictionary mapping each node to its assigned GPU stream. + """ + stream_assignments: Dict[nodes.Node, int] = dict() + for state in sdfg.states(): + self._assign_gpu_streams_in_state(sdfg, False, state, stream_assignments, 0) + + return stream_assignments + + def _assign_gpu_streams_in_state(self, sdfg: SDFG, in_nested_sdfg: bool, state: SDFGState, + stream_assignments: Dict[nodes.Node, int], gpu_stream: int) -> None: + """ + Assigns GPU streams to nodes in a single state. + + If inside a nested SDFG, components inherit the parent's stream. + Otherwise, each connected component gets a different stream. + Nested SDFGs are processed recursively. + + Parameters + ---------- + sdfg : SDFG + The SDFG containing the state. + in_nested_sdfg : bool + True if the state is in a nested SDFG. + state : SDFGState + The state to process. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to assigned GPU streams (updated in-place). + gpu_stream : int + The current GPU stream ID. + + Returns + ------- + None + """ + components = self._get_weakly_connected_nodes(state) + + for component in components: + + if not self._requires_gpu_stream(state, component): + continue + + nodes_assigned_before = len(stream_assignments) + + for node in component: + stream_assignments[node] = gpu_stream + if isinstance(node, nodes.NestedSDFG): + for nested_state in node.sdfg.states(): + self._assign_gpu_streams_in_state(node.sdfg, True, nested_state, stream_assignments, gpu_stream) + + # Move to the next stream if we have assigned streams to any node in this component + # (careful: if nested, states are in same component) + if not in_nested_sdfg and len(stream_assignments) > nodes_assigned_before: + gpu_stream = self._next_stream(gpu_stream) + + def _get_weakly_connected_nodes(self, graph: Graph) -> List[Set[NodeT]]: + """ + Returns all weakly connected components in the given directed graph. + + A weakly connected component is a maximal group of nodes such that each pair + of nodes is connected by a path when ignoring edge directions. + + Parameters + ---------- + graph: Graph + A directed graph instance. + + Returns + ------- + List[Set[Node_T]] + + A list containing sets of nodes, with each set corresponding to a weakly + connected component. + """ + visited: Set[NodeT] = set() + components: List[Set[NodeT]] = [] + + for node in graph.nodes(): + if node in visited: + continue + + # Start a new weakly connected component + component: Set[NodeT] = set() + stack = [node] + + while stack: + current = stack.pop() + if current in visited: + continue + + visited.add(current) + component.add(current) + + for neighbor in graph.neighbors(current): + if neighbor not in visited: + stack.append(neighbor) + + components.append(component) + + return components + + def _next_stream(self, gpu_stream: int) -> int: + """ + Compute the next CUDA stream index according to the concurrency configuration. + + Behavior depends on the configured max_concurrent_streams value: + - If 0: unlimited streams allowed, so increment the stream index by one. + - If -1: default setting, always return stream 0 (no concurrency). + - Otherwise: cycle through stream indices from 0 up to max_concurrent_streams - 1. + + Parameters + ---------- + gpu_stream : int + The current CUDA stream index. + + Returns + ------- + int + The next CUDA stream index based on the concurrency policy. + """ + if self._max_concurrent_streams == 0: + return gpu_stream + 1 + elif self._max_concurrent_streams == -1: + return 0 + else: + return (gpu_stream + 1) % self._max_concurrent_streams + + def _requires_gpu_stream(self, state: SDFGState, component: Set[NodeT]) -> bool: + """ + Check whether a connected component in an SDFG state should be assigned + a GPU stream. + + A component requires a GPU stream if it contains at least one of: + - An AccessNode with GPU global memory storage, + - A MapEntry scheduled on a GPU device, + - A Tasklet whose code includes the stream placeholder. + + Parameters + ---------- + state : SDFGState + The state containing the component. + component : Set[NodeT] + The set of nodes that form the connected component. + + Returns + ------- + bool + True if the component requires a GPU stream, False otherwise. + """ + + def gpu_relevant(node, parent) -> bool: + if (isinstance(node, nodes.AccessNode) and node.desc(parent).storage == dace.dtypes.StorageType.GPU_Global): + return True + + elif (isinstance(node, nodes.MapEntry) and node.map.schedule == dace.dtypes.ScheduleType.GPU_Device): + return True + + elif (isinstance(node, nodes.Tasklet) and STREAM_PLACEHOLDER in node.code.as_string): + return True + + return False + + for node in component: + if isinstance(node, nodes.NestedSDFG): + if any(gpu_relevant(node, parent) for node, parent in node.sdfg.all_nodes_recursive()): + return True + + else: + if gpu_relevant(node, state): + return True + + return False diff --git a/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py new file mode 100644 index 0000000000..829775253d --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/gpu_stream_topology_simplification.py @@ -0,0 +1,277 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import SDFG, SDFGState, dtypes, properties +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import get_gpu_stream_array_name, get_gpu_stream_connector_name +from dace.transformation.passes.gpu_specialization.insert_gpu_streams import InsertGPUStreams +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets + + +@properties.make_properties +@transformation.explicit_cf_compatible +class GPUStreamTopologySimplification(ppl.Pass): + """ + Simplifies an SDFG after GPU stream nodes have been added. + + This pass is optional; the SDFG works without it, but it cleans up + the topology by merging adjacent or redundant GPU stream AccessNodes. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + depending_passes = { + NaiveGPUStreamScheduler, + InsertGPUStreams, + ConnectGPUStreamsToKernels, + ConnectGPUStreamsToTasklets, + InsertGPUStreamSyncTasklets, + } + + return depending_passes + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Simplify the SDFG topology by merging adjacent GPU stream nodes. + """ + self._merge_close_gpustream_nodes(sdfg) + + self._merge_gpustreams_special_case(sdfg) + return {} + + def _merge_close_gpustream_nodes(self, sdfg: SDFG) -> None: + """ + Merge "close" GPU stream AccessNodes in the SDFG. + + This function looks for a predecessor GPU stream AccessNode that can be merged + with any successor GPU stream AccessNodes of its grand-predecessors. + + Example: + + Consider two GPU copy tasklets connected via distinct GPU stream AccessNodes: + the corresponding subgraph looks like this: + + -> Sink GPU Source GPU -> + ¦ ¦ + Tasklet ------> Data AccessNode -----> Tasklet + + This function would merge the sink and source node to simplify the SDFG. + """ + for sub_sdfg in sdfg.all_sdfgs_recursive(): + for state in sub_sdfg.states(): + for node in state.nodes(): + + # Skip AccessNodes + if isinstance(node, nodes.AccessNode): + continue + + # Find GPU stream AccessNode predecessors with no incoming edges + # (i.e. source GPU stream AccessNodes) + node_predecessors = state.predecessors(node) + preceeding_gpustream_sources = [ + pre for pre in node_predecessors if isinstance(pre, nodes.AccessNode) + and pre.desc(state).dtype == dtypes.gpuStream_t and state.in_degree(pre) == 0 + ] + + # Skip if there are no preceding GPU stream sources + if len(preceeding_gpustream_sources) == 0: + continue + + # If multiple GPU stream sources exist, merge them; otherwise, use the single source + if len(preceeding_gpustream_sources) > 1: + combined_stream_node = preceeding_gpustream_sources.pop() + for preceeding_gpu_stream in preceeding_gpustream_sources: + # Note: there are no ingoing edges + for out_edge in state.out_edges(preceeding_gpu_stream): + _, src_conn, dst, dst_conn, data = out_edge + state.add_edge(combined_stream_node, src_conn, dst, dst_conn, data) + state.remove_edge(out_edge) + state.remove_node(preceeding_gpu_stream) + + else: + combined_stream_node = preceeding_gpustream_sources.pop() + + # Merge grand-predecessors' successors sink GPU streams with predecessor source GPU stream + node_grand_predecessors = [ + grand_pred for pred in node_predecessors for grand_pred in state.predecessors(pred) + ] + node_gp_successors_streams = [ + succ_of_gp for gp in node_grand_predecessors for succ_of_gp in state.successors(gp) + if isinstance(succ_of_gp, nodes.AccessNode) + and succ_of_gp.desc(state).dtype == dtypes.gpuStream_t and state.out_degree(succ_of_gp) == 0 + ] + + # remove duplicates + node_gp_successors_streams = list(set(node_gp_successors_streams)) + + for gp_succ_stream in node_gp_successors_streams: + for edge in state.in_edges(gp_succ_stream): + src, src_conn, _, dst_conn, data = edge + state.add_edge(src, src_conn, combined_stream_node, dst_conn, data) + state.remove_edge(edge) + # Note: the grand-predecessor's successor GPU stream is a sink node and has no + # outgoing edges + state.remove_node(gp_succ_stream) + + def _merge_gpustreams_special_case(self, sdfg: SDFG) -> None: + """ + Special-case simplification of GPU stream AccessNodes. + + This pass detects the following pattern: + - A GPU stream AccessNode `X` has a predecessor and a successor (i.e. at least one of both). + - Between the predecessor and successor lie one or more tasklets. + - These tasklets use their own distinct GPU stream AccessNodes (not `X`), + which are connected only to the tasklet itself. + + To simplify the topology, redundant streams are merged: + - A single unified input GPU stream connects to the predecessor and replaces (merges) + the per-tasklet input streams. + - A single unified output GPU stream connects to the successor and replaces (merges) + the per-tasklet output streams. + + + The simplification is easier to understand visually than in words. + Inspect the intermediate SDFGs produced by the minimal example below + to see the effect of the stream merging. + + Example + ------- + @dace.program + def example(A: dace.uint32[128], B: dace.uint32[128], + C: dace.uint32[128], D: dace.uint32[128]): + for i in dace.map[0:128:1]: + B[i] = A[i] + for i in dace.map[0:128:1]: + D[i] = C[i] + + sdfg = example.to_sdfg() + sdfg.apply_gpu_transformations() + """ + # Get the name of the GPU stream arry + gpustream_array_name = get_gpu_stream_array_name() + + #------------------------- Preprocess: Gather Information ---------------------------- + + # For each GPU Stream AccessNode having a predecessor and a successor: + # Determine with which Tasklet Source and which Tasklet sink nodes lie between its predecessor + # and its successor + merge_source_gpustream: Dict[Tuple[nodes.AccessNode, SDFGState], List[nodes.AccessNode]] = dict() + merge_sink_gpustream: Dict[Tuple[nodes.AccessNode, SDFGState], List[nodes.AccessNode]] = dict() + + for node, state in sdfg.all_nodes_recursive(): + + # Skip non-tasklets + if not isinstance(node, nodes.Tasklet): + continue + + # The tasklets of interest should have exactly one preceeding source GPU node and one following sink GPU node + # If not, we skip + node_predecessors = state.predecessors(node) + node_successors = state.successors(node) + downstream_gpustream_sinks = [ + succ for succ in node_successors if isinstance(succ, nodes.AccessNode) + and succ.desc(state).dtype == dtypes.gpuStream_t and state.out_degree(succ) == 0 + ] + upstream_gpustream_sources = [ + pre for pre in node_predecessors if isinstance(pre, nodes.AccessNode) + and pre.desc(state).dtype == dtypes.gpuStream_t and state.in_degree(pre) == 0 + ] + + # Skip not considered case + if not (len(upstream_gpustream_sources) == len(downstream_gpustream_sinks) + and len(upstream_gpustream_sources) == 1): + continue + + # Look for potential predecessor of a "passthrough" GPU Stream AccessNode + # which would also be the grand-predeccessor of the current node (=tasklet) + candidate_predecessor = [] + for pred in node_predecessors: + for grand_pred in state.predecessors(pred): + + # Current nodes grand pred is a candidate of a predecessor of a "passthrough" GPU Stream AccessNode + candidate = grand_pred + + # A PassThrough GPU stream node can only have MapExits and Tasklets as candidate predecessors + if not (isinstance(candidate, nodes.MapExit) and candidate.map.schedule + == dtypes.ScheduleType.GPU_Device or isinstance(candidate, nodes.Tasklet)): + continue + + has_passthrough_gpustream = any( + (isinstance(succ, nodes.AccessNode) and succ.desc(state).dtype == dtypes.gpuStream_t) and ( + state.in_degree(succ) > 0 and state.out_degree(succ) > 0) + for succ in state.successors(candidate)) + + if has_passthrough_gpustream: + candidate_predecessor.append(candidate) + + # Not "close" passthrough GPU node exists if no candidate predecessor exists + if len(candidate_predecessor) == 0: + continue + + # Niche case, more than one "close" passthrough GPU node exists: Out of scope + # Ignore this case (note: This Pass only makes the Graph visually nicer, so skipping has + # no effect on correctness) + if len(candidate_predecessor) > 1: + continue + + # Get the Kernel Exits GPU stream + candidate_predecessor = candidate_predecessor[0] + passthrough_gpu_node = [ + succ for succ in state.successors(candidate_predecessor) + if isinstance(succ, nodes.AccessNode) and succ.desc(state).dtype == dtypes.gpuStream_t + ][0] + + # Collect and store the GPU stream merging information + pre_gpustream: nodes.AccessNode = upstream_gpustream_sources[0] # Note: Len is 1 + succ_gpustream: nodes.AccessNode = downstream_gpustream_sinks[0] # Note: Len is 1 + if (passthrough_gpu_node, state) in merge_source_gpustream: + merge_source_gpustream[(passthrough_gpu_node, state)].append(pre_gpustream) + merge_sink_gpustream[(passthrough_gpu_node, state)].append(succ_gpustream) + else: + merge_source_gpustream[(passthrough_gpu_node, state)] = [pre_gpustream] + merge_sink_gpustream[(passthrough_gpu_node, state)] = [succ_gpustream] + + #------------------------- Merge the GPU Stream AccessNodes ---------------------------- + for passthrough_gpu_node, state in merge_sink_gpustream.keys(): + + # Add new AccessNodes which merge the other loose streams + unified_in_stream = state.add_access(gpustream_array_name) + unified_out_stream = state.add_access(gpustream_array_name) + + for in_edge in state.in_edges(passthrough_gpu_node): + src, src_conn, _, dst_conn, memlet = in_edge + state.add_edge(src, src_conn, unified_in_stream, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(in_edge) + + for out_edge in state.out_edges(passthrough_gpu_node): + _, src_conn, dst, dst_conn, memlet = out_edge + state.add_edge(unified_out_stream, src_conn, dst, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(out_edge) + + for source_stream in merge_source_gpustream[passthrough_gpu_node, state]: + for out_edge in state.out_edges(source_stream): + _, src_conn, dst, dst_conn, memlet = out_edge + state.add_edge(unified_in_stream, src_conn, dst, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(out_edge) + state.remove_node(source_stream) + + for sink_stream in merge_sink_gpustream[passthrough_gpu_node, state]: + for in_edge in state.in_edges(sink_stream): + src, src_conn, _, dst_conn, memlet = in_edge + state.add_edge(src, src_conn, unified_out_stream, dst_conn, copy.deepcopy(memlet)) + state.remove_edge(in_edge) + state.remove_node(sink_stream) + + state.remove_node(passthrough_gpu_node) diff --git a/dace/transformation/passes/gpu_specialization/helpers/__init__.py b/dace/transformation/passes/gpu_specialization/helpers/__init__.py new file mode 100644 index 0000000000..1469adb5ea --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/helpers/__init__.py @@ -0,0 +1 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. diff --git a/dace/transformation/passes/gpu_specialization/helpers/copy_strategies.py b/dace/transformation/passes/gpu_specialization/helpers/copy_strategies.py new file mode 100644 index 0000000000..2d4e287562 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/helpers/copy_strategies.py @@ -0,0 +1,525 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from abc import ABC, abstractmethod +from typing import Any, Dict, Tuple, Union +from dace import SDFG, SDFGState, data, dtypes, subsets +from dace import memlet as mm +from dace.codegen import common +from dace.codegen.targets import cpp +from dace.codegen.targets.cpp import sym2cpp +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import generate_sync_debug_call +from dace.dtypes import StorageType +from dace.sdfg import nodes, scope_contains_scope +from dace.sdfg.graph import MultiConnectorEdge +from dace.transformation import helpers + + +class CopyContext: + """ + Encapsulates inputs required for copy operations and exposes helper + methods to derive additional information. This keeps copy strategies + lightweight by letting them focus only on the relevant logic. + """ + + def __init__(self, sdfg: SDFG, state: SDFGState, src_node: nodes.Node, dst_node: nodes.Node, + edge: MultiConnectorEdge[mm.Memlet]): + + # Store the basic context as attributes + self.sdfg = sdfg + self.state = state + self.src_node = src_node + self.dst_node = dst_node + self.edge = edge + + memlet = edge.data + + self.copy_shape = memlet.subset.size_exact() + if isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode): + copy_shape, src_strides, dst_strides, src_expr, dst_expr = self.get_accessnode_to_accessnode_copy_info() + else: + copy_shape = memlet.subset.size_exact() + src_strides = dst_strides = src_expr = dst_expr = None + + self.copy_shape = copy_shape + self.src_strides = src_strides + self.dst_strides = dst_strides + self.src_expr = src_expr + self.dst_expr = dst_expr + + def get_storage_type(self, node: nodes.Node): + """ + Return the storage type associated with a given SDFG node. + + Tasklets are assumed to use register storage, while AccessNodes + return the storage type from their data descriptor. Raises + NotImplementedError for unsupported node types. + """ + if isinstance(node, nodes.Tasklet): + storage_type = StorageType.Register + + elif isinstance(node, nodes.AccessNode): + storage_type = node.desc(self.sdfg).storage + + else: + raise NotImplementedError(f"Unsupported node type {type(node)} for storage type retrieval; " + "expected AccessNode or Tasklet. Please extend this method accordingly.") + + return storage_type + + def get_assigned_gpustream(self) -> str: + """ + Return the GPU stream expression assigned to both source and destination nodes. + Defaults to `__dace_current_stream` placeholder, which can be changed by the scheduling pass + """ + # 2. Generate GPU stream expression + gpustream = "__dace_current_stream" + gpustream_expr = gpustream + + return gpustream_expr + + def get_memory_location(self) -> Tuple[str, str]: + """ + Determine whether the source and destination nodes reside in device or host memory. + + Uses the storage type of each node to classify it as either 'Device' + (GPU global memory) or 'Host' (all other storage types). + Used for GPU related copies outside the kernel (e.g. to construct + cudaMemcpyHostToDevice for example). + + Returns + ------- + Tuple[str, str] + (src_location, dst_location) where each is either 'Device' or 'Host'. + """ + src_storage = self.get_storage_type(self.src_node) + dst_storage = self.get_storage_type(self.dst_node) + src_location = 'Device' if src_storage == dtypes.StorageType.GPU_Global else 'Host' + dst_location = 'Device' if dst_storage == dtypes.StorageType.GPU_Global else 'Host' + + return src_location, dst_location + + def get_ctype(self) -> Any: + """ + Determine the C data type (ctype) of the source or destination node. + + The ctype is resolved from the data descriptor of the first node + (source or destination) that is an AccessNode (assumed to be the same + if both are AccessNodes). + + Returns + ------- + Any + The C type string (e.g., "float*", "int32") associated with the node. + + Raises + ------ + NotImplementedError + If neither the source nor the destination node is an AccessNode. + """ + sdfg = self.sdfg + src_node, dst_node = self.src_node, self.dst_node + + if isinstance(src_node, nodes.AccessNode): + return src_node.desc(sdfg).ctype + + if isinstance(dst_node, nodes.AccessNode): + return dst_node.desc(sdfg).ctype + + raise NotImplementedError( + f"Cannot determine ctype: neither src nor dst node is an AccessNode. " + f"Got src_node type: {type(src_node).__name__}, dst_node type: {type(dst_node).__name__}. " + "Please extend this case or fix the issue.") + + def get_accessnode_to_accessnode_copy_info(self): + """ + Compute copy shape, absolute strides, and pointer expressions for a copy + between two AccessNodes. Tries to mimic + cpp.memlet_copy_to_absolute_strides without requiring a dispatcher. + + Returns + ------- + (copy_shape, src_strides, dst_strides, src_expr, dst_expr) + + Raises + ------ + TypeError + If either endpoint is not an AccessNode. + NotImplementedError + If a descriptor is not Scalar or Array. + """ + + # ---------------------------- helpers ---------------------------- + def _collapse_strides(strides, subset): + """Remove size-1 dims; keep tile strides; default to [1] if none remain.""" + n = len(subset) + collapsed = [st for st, sz in zip(strides, subset.size()) if sz != 1] + collapsed.extend(strides[n:]) # include tiles + if len(collapsed) == 0: + return [1] + return collapsed + + def _ptr_name(desc, name): + if desc.transient and desc.lifetime in (dtypes.AllocationLifetime.Persistent, + dtypes.AllocationLifetime.External): + return f'__state->__{sdfg.cfg_id}_{name}' + return name + + def _expr_for(desc, name, subset): + ptr = _ptr_name(desc, name) + + if isinstance(desc, data.Scalar): + # GPU scalar special-case + if desc.storage in dtypes.GPU_STORAGES: + parent = state.sdfg.parent_nsdfg_node + if parent is not None and name in parent.in_connectors: + return f"&{ptr}" + return ptr + # CPU (or other) scalars + return f"&{ptr}" + + if isinstance(desc, data.Array): + offset = cpp.cpp_offset_expr(desc, subset) + return f"{ptr} + {offset}" if offset != "0" else ptr + + raise NotImplementedError( + f"Expected {name} to be either data.Scalar or data.Array, but got {type(desc).__name__}.") + + # ---------------------------- Get copy info ---------------------------- + # Get needed information + src_node, dst_node = self.src_node, self.dst_node + sdfg, edge, state = self.sdfg, self.edge, self.state + memlet, copy_shape = self.edge.data, self.copy_shape + + # Guard - only applicable if src and dst are AccessNodes + if not (isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode)): + raise TypeError( + f"get_accessnode_to_accessnode_copy_info requires both source and destination " + f"to be AccessNode instances, but got {type(src_node).__name__} and {type(dst_node).__name__}.") + + # Get node descriptors + src_nodedesc = src_node.desc(sdfg) + dst_nodedesc = dst_node.desc(sdfg) + + # Resolve subsets (fallback to full range) + src_subset = memlet.get_src_subset(edge, state) + dst_subset = memlet.get_dst_subset(edge, state) + + if src_subset is None: + src_subset = subsets.Range.from_array(src_nodedesc) + + if dst_subset is None: + dst_subset = src_subset + # dst_subset = subsets.Range.from_array(dst_nodedesc) + + # Get strides + src_strides = src_subset.absolute_strides(src_nodedesc.strides) + dst_strides = dst_subset.absolute_strides(dst_nodedesc.strides) + + # Try to convert to a degenerate/strided ND copy first + result = cpp.ndcopy_to_strided_copy( + copy_shape, + src_nodedesc.shape, + src_strides, + dst_nodedesc.shape, + dst_strides, + memlet.subset, + src_subset, + dst_subset, + ) + + if result is not None: + copy_shape, src_strides, dst_strides = result + else: + src_strides = _collapse_strides(src_strides, src_subset) + dst_strides = _collapse_strides(dst_strides, dst_subset) + copy_shape = [s for s in copy_shape if s != 1] or [1] + + # Extend copy shape to the largest among the data dimensions, + # and extend other array with the appropriate strides + if len(dst_strides) != len(copy_shape) or len(src_strides) != len(copy_shape): + if memlet.data == src_node.data: + copy_shape, dst_strides = cpp.reshape_strides(src_subset, src_strides, dst_strides, copy_shape) + elif memlet.data == dst_node.data: + copy_shape, src_strides = cpp.reshape_strides(dst_subset, dst_strides, src_strides, copy_shape) + + return copy_shape, src_strides, dst_strides, src_node.data, dst_node.data + + +class CopyStrategy(ABC): + """Abstract base class for memory copy strategies.""" + + @abstractmethod + def applicable(self, copy_context: CopyContext) -> bool: + """ + Return True if this strategy can handle the given memory copy. + """ + raise NotImplementedError('Abstract class') + + @abstractmethod + def generate_copy(self, copy_context: CopyContext) -> str: + """ + Generates and returns the copy code for the supported pattern. + """ + raise NotImplementedError('Abstract class') + + +class OutOfKernelCopyStrategy(CopyStrategy): + """ + Copy strategy for memory transfers that occur outside of kernel execution. + + This pattern often occurs when generating host-to-device copies for kernel inputs + (since kernels cannot access host memory directly), and device-to-host copies + to retrieve results for further processing. + """ + + def applicable(self, copy_context: CopyContext) -> bool: + """ + Determines whether the data movement is a host<->device memory copy. + + This function returns True if: + - We are not currently generating kernel code + - The copy occurs between two AccessNodes + - The data descriptors of source and destination are not views. + - The storage types of either src or dst is CPU_Pinned or GPU_Device + - We do not have a CPU-to-CPU copy + """ + # Retrieve needed information + state = copy_context.state + src_node, dst_node = copy_context.src_node, copy_context.dst_node + + # 1. Ensure copy is not occuring within a kernel + scope_dict = state.scope_dict() + deeper_node = dst_node if scope_contains_scope(scope_dict, src_node, dst_node) else src_node + + parent_map_tuple = helpers.get_parent_map(state, deeper_node) + while parent_map_tuple is not None: + parent_map, parent_state = parent_map_tuple + if parent_map.map.schedule in dtypes.GPU_SCHEDULES + dtypes.EXPERIMENTAL_GPU_SCHEDULES: + return False + else: + parent_map_tuple = helpers.get_parent_map(parent_state, parent_map) + + # 2. Check whether copy is between two AccessNodes + if not (isinstance(src_node, nodes.AccessNode) and isinstance(dst_node, nodes.AccessNode)): + return False + + # 3. The data descriptors of source and destination are not views + if isinstance(src_node.desc(state), data.View) or isinstance(dst_node.desc(state), data.View): + return False + + # 4. Check that one StorageType of either src or dst is CPU_Pinned or GPU_Device + src_storage = copy_context.get_storage_type(src_node) + dst_storage = copy_context.get_storage_type(dst_node) + if not (src_storage in (StorageType.GPU_Global, StorageType.CPU_Pinned) + or dst_storage in (StorageType.GPU_Global, StorageType.CPU_Pinned)): + return False + + # 5. Check that this is not a CPU to CPU copy + cpu_storage_types = [StorageType.CPU_Heap, StorageType.CPU_ThreadLocal, StorageType.CPU_Pinned] + if src_storage in cpu_storage_types and dst_storage in cpu_storage_types: + return False + + return True + + def generate_copy(self, copy_context: CopyContext) -> str: + """Execute host-device copy with CUDA memory operations""" + + # Guard + memlet = copy_context.edge.data + if memlet.wcr is not None: + src_location, dst_location = copy_context.get_memory_location() + raise NotImplementedError(f'Accumulate {src_location} to {dst_location} not implemented') + + # Based on the copy dimension, call appropiate helper function + num_dims = len(copy_context.copy_shape) + if num_dims == 1: + copy_call = self._generate_1d_copy(copy_context) + + elif num_dims == 2: + copy_call = self._generate_2d_copy(copy_context) + + else: + # sanity check + assert num_dims > 2, f"Expected copy shape with more than 2 dimensions, but got {num_dims}." + copy_call = self._generate_nd_copy(copy_context) + + return copy_call + + def _generate_1d_copy(self, copy_context: CopyContext) -> str: + """ + Generates a 1D memory copy between host and device using the GPU backend. + + Uses {backend}MemcpyAsync for contiguous memory. For strided memory, + {backend}Memcpy2DAsync is leveraged to efficiently handle the stride along one dimension. + """ + # ----------- Retrieve relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + + src_location, dst_location = copy_context.get_memory_location() + is_contiguous_copy = (src_strides[-1] == 1) and (dst_strides[-1] == 1) + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + + # ----------------- Generate backend call -------------------- + + if is_contiguous_copy: + # Memory is linear: can use {backend}MemcpyAsync + copysize = ' * '.join(sym2cpp(copy_shape)) + copysize += f' * sizeof({ctype})' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + call = f'DACE_GPU_CHECK({backend}MemcpyAsync(_out_{dst_expr}, _in_{src_expr}, {copysize}, {kind}, {gpustream}));\n' + + else: + # Memory is strided: use {backend}Memcpy2DAsync with dpitch/spitch + # This allows copying a strided 1D region + dpitch = f'{sym2cpp(dst_strides[0])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[0])} * sizeof({ctype})' + width = f'sizeof({ctype})' + height = sym2cpp(copy_shape[0]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync(_out_{dst_expr}, {dpitch}, _in_{src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Potentially snychronization required if syncdebug is set to true in configurations + call = call + generate_sync_debug_call() + return call + + def _generate_2d_copy(self, copy_context: CopyContext) -> None: + """ + Generates a 2D memory copy using {backend}Memcpy2DAsync. + + Three main cases are handled: + - Copy between row-major stored arrays with contiguous rows. + - Copy between column-major stored arrays with contiguous columns. + - A special case where a 2D copy can still be represented. + + Raises: + NotImplementedError: Raised if the source and destination strides do not match any of the handled patterns. + Such cases indicate an unsupported 2D copy and should be examined separately. + They can be implemented if valid, or a more descriptive error should be raised if the path should not occur. + + Note: + {backend}Memcpy2DAsync supports strided copies along only one dimension (row or column), + but not both simultaneously. + """ + + # ----------- Extract relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + src_location, dst_location = copy_context.get_memory_location() + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + + # ----------------- Generate backend call if supported -------------------- + # Case: Row-major layout, rows are not strided. + if (src_strides[1] == 1) and (dst_strides[1] == 1): + dpitch = f'{sym2cpp(dst_strides[0])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[0])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[1])} * sizeof({ctype})' + height = f'{sym2cpp(copy_shape[0])}' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync(_out_{dst_expr}, {dpitch}, _in_{src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Case: Column-major layout, no columns are strided. + elif (src_strides[0] == 1) and (dst_strides[0] == 1): + dpitch = f'{sym2cpp(dst_strides[1])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[1])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[0])} * sizeof({ctype})' + height = f'{sym2cpp(copy_shape[1])}' + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync(_out_{dst_expr}, {dpitch}, _in_{src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Special case + elif (src_strides[0] / src_strides[1] == copy_shape[1] and dst_strides[0] / dst_strides[1] == copy_shape[1]): + # Consider as an example this copy: A[0:I, 0:J, K] -> B[0:I, 0:J] with + # copy shape [I, J], src_strides[J*K, K], dst_strides[J, 1]. This can be represented with a + # {backend}Memcpy2DAsync call! + + dpitch = f'{sym2cpp(dst_strides[1])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[1])} * sizeof({ctype})' + width = f'sizeof({ctype})' + height = sym2cpp(copy_shape[0] * copy_shape[1]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + call = f'DACE_GPU_CHECK({backend}Memcpy2DAsync(_out_{dst_expr}, {dpitch}, _in_{src_expr}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + else: + raise NotImplementedError( + f"Unsupported 2D memory copy: shape={copy_shape}, src_strides={src_strides}, dst_strides={dst_strides}." + "Please implement this case if it is valid, or raise a more descriptive error if this path should not be taken." + ) + + # Potentially snychronization required if syncdebug is set to true in configurations + call = call + generate_sync_debug_call() + return call + + def _generate_nd_copy(self, copy_context: CopyContext) -> None: + """ + Generates GPU code for copying N-dimensional arrays using 2D memory copies. + + Uses {backend}Memcpy2DAsync for the last two dimensions, with nested loops + for any outer dimensions. Expects the copy to be contiguous and between + row-major storage locations. + """ + # ----------- Extract relevant copy parameters -------------- + backend: str = common.get_gpu_backend() + + # Due to applicable(), src and dst node must be AccessNodes + copy_shape, src_strides, dst_strides, src_expr, dst_expr = copy_context.get_accessnode_to_accessnode_copy_info() + + src_location, dst_location = copy_context.get_memory_location() + ctype = copy_context.get_ctype() + gpustream = copy_context.get_assigned_gpustream() + num_dims = len(copy_shape) + + # ----------- Guard for unsupported Pattern -------------- + if not (src_strides[-1] == 1) and (dst_strides[-1] == 1): + src_node, dst_node = copy_context.src_node, copy_context.dst_node + src_storage = copy_context.get_storage_type(src_node) + dst_storage = copy_context.get_storage_type(dst_node) + raise NotImplementedError( + "N-dimensional GPU memory copies, that are strided or contain column-major arrays, are currently not supported.\n" + f" Source node: {src_node} (storage: {src_storage})\n" + f" Destination node: {copy_context.dst_node} (storage: {dst_storage})\n" + f" Source strides: {src_strides}\n" + f" Destination strides: {dst_strides}\n" + f" copy shape: {copy_shape}\n") + + # ----------------- Generate and write backend call(s) -------------------- + + call = "" + # Write for-loop headers + for dim in range(num_dims - 2): + call += f"for (int __copyidx{dim} = 0; __copyidx{dim} < {copy_shape[dim]}; ++__copyidx{dim}) {{\n" + + # Write Memcopy2DAsync + offset_src = ' + '.join(f'(__copyidx{d} * ({sym2cpp(s)}))' for d, s in enumerate(src_strides[:-2])) + offset_dst = ' + '.join(f'(__copyidx{d} * ({sym2cpp(s)}))' for d, s in enumerate(dst_strides[:-2])) + + src = f'{src_expr} + {offset_src}' + dst = f'{dst_expr} + {offset_dst}' + + dpitch = f'{sym2cpp(dst_strides[-2])} * sizeof({ctype})' + spitch = f'{sym2cpp(src_strides[-2])} * sizeof({ctype})' + width = f'{sym2cpp(copy_shape[-1])} * sizeof({ctype})' + height = sym2cpp(copy_shape[-2]) + kind = f'{backend}Memcpy{src_location}To{dst_location}' + + # Generate call and write it + call += f'DACE_GPU_CHECK({backend}Memcpy2DAsync(_out_{dst}, {dpitch}, _in_{src}, {spitch}, {width}, {height}, {kind}, {gpustream}));\n' + + # Potentially snychronization required if syncdebug is set to true in configurations + call += generate_sync_debug_call() + + # Write for-loop footers + for dim in range(num_dims - 2): + call += "\n}" + + # Return the code + return call diff --git a/dace/transformation/passes/gpu_specialization/helpers/gpu_helpers.py b/dace/transformation/passes/gpu_specialization/helpers/gpu_helpers.py new file mode 100644 index 0000000000..f7f49a8943 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/helpers/gpu_helpers.py @@ -0,0 +1,43 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from dace import Config +from dace.codegen import common + + +def get_gpu_stream_array_name() -> str: + return "gpu_streams" + + +def get_gpu_stream_connector_name() -> str: + return "__stream_" + + +def get_dace_runtime_gpu_stream_name() -> str: + return "__dace_current_stream" + + +def get_default_gpu_stream_name() -> str: + return "__default_stream" + + +def generate_sync_debug_call() -> str: + """ + Generate backend sync and error-check calls as a string if + synchronous debugging is enabled. + + Parameters + ---------- + backend : str + Backend API prefix (e.g., 'cuda'). + + Returns + ------- + str + The generated debug call code, or an empty string if debugging is disabled. + """ + backend: str = common.get_gpu_backend() + sync_call: str = "" + if Config.get_bool('compiler', 'cuda', 'syncdebug'): + sync_call = (f"DACE_GPU_CHECK({backend}GetLastError());\n" + f"DACE_GPU_CHECK({backend}DeviceSynchronize());\n") + + return sync_call diff --git a/dace/transformation/passes/gpu_specialization/insert_explicit_gpu_global_memory_copies.py b/dace/transformation/passes/gpu_specialization/insert_explicit_gpu_global_memory_copies.py new file mode 100644 index 0000000000..34cd37de4a --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_explicit_gpu_global_memory_copies.py @@ -0,0 +1,254 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import SDFG, SDFGState, dtypes, properties +from dace.transformation.passes.gpu_specialization.helpers.copy_strategies import CopyContext, OutOfKernelCopyStrategy +from dace.sdfg.graph import Edge, MultiConnectorEdge +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import get_gpu_stream_connector_name + + +def create_viewed_copy_kernel(parent_state: dace.SDFGState, src_node: dace.nodes.AccessNode, + dst_node: dace.nodes.AccessNode, edge: Edge[dace.Memlet]) -> dace.SDFG: + # Currently only 1D and 2D copies are supported + map_ranges = dict() + for i, dim in enumerate(edge.data.subset): + map_ranges[f"i{i}"] = f"0:{dim[1]+1-dim[0]}:{dim[2]}" + + access_expr = ",".join(f"i{i}" for i in range(len(edge.data.subset))) + + src_desc = parent_state.sdfg.arrays[src_node.data] + dst_desc = parent_state.sdfg.arrays[dst_node.data] + + # Add new arrays for the copy SDFG + # Determine src and dst subsets + src_subset = edge.data.subset if edge.data.data == src_node.data else edge.data.other_subset + dst_subset = edge.data.other_subset if edge.data.data == src_node.data else edge.data.subset + + # Collect the new shapes + src_shape = [e + 1 - b for b, e, s in src_subset] + dst_shape = [e + 1 - b for b, e, s in dst_subset] + + # Preserve strides as-is + src_strides = src_desc.strides + dst_strides = dst_desc.strides + + _, src_view = parent_state.sdfg.add_view("view_" + src_node.data, src_shape, src_desc.dtype, src_desc.storage, + src_strides) + _, dst_view = parent_state.sdfg.add_view("view_" + dst_node.data, dst_shape, dst_desc.dtype, dst_desc.storage, + dst_strides) + + # In nested SDFG we add "view_" prefix + view_src_node = parent_state.add_access("view_" + src_node.data) + view_dst_node = parent_state.add_access("view_" + dst_node.data) + + # Create string subset expressions to return + src_subset_expr = ", ".join([f"{b}:{e+1}:1" for b, e, s in src_subset]) + dst_subset_expr = ", ".join([f"{b}:{e+1}:1" for b, e, s in dst_subset]) + + # Add copy kernel + tasklet, map_entry, map_exit = parent_state.add_mapped_tasklet( + name="gpu_copy_kernel_fallback", + map_ranges=map_ranges, + inputs={"_in": dace.memlet.Memlet(f"{view_src_node.data}[{access_expr}]")}, + outputs={"_out": dace.memlet.Memlet(f"{view_dst_node.data}[{access_expr}]")}, + code="_out = _in", + schedule=dtypes.ScheduleType.GPU_Device, + unroll_map=False, + language=dtypes.Language.Python, + external_edges=True, + propagate=True, + input_nodes={view_src_node.data: view_src_node}, + output_nodes={view_dst_node.data: view_dst_node}, + ) + + return view_src_node, src_subset_expr, view_dst_node, dst_subset_expr + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertExplicitGPUGlobalMemoryCopies(ppl.Pass): + """ + This pass inserts explicit copy tasklets for data transfers that need to be handled + by the GPU and occur outside a kernel (for example, copying data from host memory + to the GPU before executing a kernel). + + It identifies such copy locations and inserts the corresponding tasklets. For each + memlet path describing a copy, the first edge is duplicated: one edge goes from the original + source to the tasklet, and the other from the tasklet to the original destination, while + the original edge is removed. + + This is experimental and could later serve as inspiration for making all copies explicit. + Considerations for future work include allowing tasklets to access array addresses + from connectors and describing in memlets how data will be moved, since currently + tasklets only support value inputs. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + depending_passes = set() + return depending_passes + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Tasklets | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]) -> Dict: + """ + Inserts out-of-kernel GPU copy tasklets into the SDFG based on GPU stream scheduling. + Out-of-kernel copies are copies which are handled by the GPU and occur out of a kernel + function. + + Parameters + ---------- + sdfg : SDFG + The SDFG to transform by adding out-of-kernel GPU copy tasklets. + pipeline_results : Dict[str, Any] + Results from previous transformation passes, including GPU stream assignments. + + Returns + ------- + dict + Currently returns an empty dictionary. + """ + # Prepare GPU stream + + # Initialize the strategy for copies that occur outside of kernel execution + out_of_kernel_copy = OutOfKernelCopyStrategy() + + # Get all data copies to process the out of kernel copies + copy_worklist = self.find_all_data_copies(sdfg) + + for copy_sdfg, state, src_node, dst_node, edge in copy_worklist: + + copy_context = CopyContext(copy_sdfg, state, src_node, dst_node, edge) + + # Only insert copy tasklets for GPU related copies occuring out of the + # kernel (i.e. a GPU_device scheduled map) + if not out_of_kernel_copy.applicable(copy_context): + continue + + # If the subset has more than 2 dimensions and is not contiguous (represented as a 1D memcpy) then fallback to a copy kernel + if len(edge.data.subset) > 2 and not edge.data.subset.is_contiguous_subset( + state.sdfg.arrays[edge.data.data]): + + # If other subset is not None, we do not need a nested SDFG + if edge.data.other_subset is None: + # Currently only 1D and 2D copies are supported + map_ranges = dict() + for i, dim in enumerate(edge.data.subset): + map_ranges[f"i{i}"] = f"{dim[0]}:{dim[1]+1}:{dim[2]}" + access_expr = ",".join(f"i{i}" for i in range(len(edge.data.subset))) + + tasklet, map_entry, map_exit = state.add_mapped_tasklet( + name="gpu_copy_kernel_fallback", + map_ranges=map_ranges, + inputs={"_in": dace.memlet.Memlet(f"{src_node.data}[{access_expr}]")}, + outputs={"_out": dace.memlet.Memlet(f"{dst_node.data}[{access_expr}]")}, + code="_out = _in", + schedule=dtypes.ScheduleType.GPU_Device, + unroll_map=False, + language=dtypes.Language.Python, + external_edges=True, + propagate=True, + input_nodes={src_node.data: src_node}, + output_nodes={dst_node.data: dst_node}, + ) + # Add connectors to the out edge of map_entry and in edge of map_exit + state.remove_edge(edge) + else: + view_src_node, src_subset_expr, view_dst_node, dst_subset_expr = create_viewed_copy_kernel( + state, src_node, dst_node, edge) + state.remove_edge(edge) + state.add_edge(src_node, None, view_src_node, "views", + dace.Memlet(f"{src_node.data}[{src_subset_expr}]")) + state.add_edge(view_dst_node, "views", dst_node, None, + dace.Memlet(f"{dst_node.data}[{dst_subset_expr}]")) + else: + # Generatae the copy call + code = out_of_kernel_copy.generate_copy(copy_context) + + # Prepare GPU ustream connectors and the stream to be accessed from the + # GPU stream array + # Create the tasklet and add GPU stream related connectors + tasklet = state.add_tasklet("gpu_copy", {"_in_" + src_node.data}, {"_out_" + dst_node.data}, + code, + language=dtypes.Language.CPP) + + # Put the tasklet in between the edge + dst_node_pred, dst_node_conn, _, dst_conn, memlet = edge + + if memlet.other_subset is None: + src_memlet = copy.deepcopy(memlet) + src_memlet.data = src_node.data + state.add_edge(dst_node_pred, dst_node_conn, tasklet, "_in_" + src_node.data, src_memlet) + dst_memlet = copy.deepcopy(memlet) + dst_memlet.data = dst_node.data + state.add_edge(tasklet, "_out_" + dst_node.data, dst_node, dst_conn, dst_memlet) + state.remove_edge(edge) + else: + src_subset = memlet.subset if edge.data.data == src_node.data else memlet.other_subset + dst_subset = memlet.other_subset if edge.data.data == src_node.data else memlet.subset + state.add_edge(dst_node_pred, dst_node_conn, tasklet, "_in_" + src_node.data, + dace.Memlet(data=src_node.data, subset=src_subset)) + state.add_edge(tasklet, "_out_" + dst_node.data, dst_node, dst_conn, + dace.Memlet(data=dst_node.data, subset=dst_subset)) + state.remove_edge(edge) + + return {} + + def find_all_data_copies( + self, sdfg: SDFG + ) -> List[Tuple[SDFG, SDFGState, dace.nodes.Node, dace.nodes.Node, MultiConnectorEdge[dace.Memlet]]]: + """ + Finds and returns all data copies in the SDFG as tuples containing the SDFG, state, source node, + destination node, and the first memlet edge of in the memlet path between source and destination node. + + Parameters + ---------- + sdfg : SDFG + The SDFG to analyze for potential data copies. + + Returns + ------- + List[Tuple[SDFG, SDFGState, dace.nodes.Node, dace.nodes.Node, MultiConnectorEdge[dace.Memlet]]] + A list of tuples representing the data copy, each containing: + - The SDFG containing the copy + - The state in which the copy occurs + - The source node of the copy + - The destination node of the copy + - The first memlet edge representing the data movement + """ + copy_worklist: List[Tuple[SDFG, SDFGState, dace.nodes.Node, dace.nodes.Node, + MultiConnectorEdge[dace.Memlet]]] = [] + visited_edges: Set[MultiConnectorEdge[dace.Memlet]] = set() + + for sub_sdfg in sdfg.all_sdfgs_recursive(): + for state in sub_sdfg.states(): + for edge in state.edges(): + + # Skip edges that were already processed + if edge in visited_edges: + continue + + # Get the memlet path and mark all edges in the path as visited + memlet_path = state.memlet_path(edge) + visited_edges.update(set(memlet_path)) + + # Get source and destination noces + first_edge = memlet_path[0] + last_edge = memlet_path[-1] + src_node = first_edge.src + dst_node = last_edge.dst + + # Skip empty memlets + if first_edge.data.subset is None: + continue + + # Add copy to the worklist + copy_worklist.append((sub_sdfg, state, src_node, dst_node, first_edge)) + + return copy_worklist diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py new file mode 100644 index 0000000000..eb779de12a --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_stream_sync_tasklets.py @@ -0,0 +1,287 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, List, Set, Tuple, Type, Union +import copy + +import dace +from dace import dtypes, properties, SDFG, SDFGState +from dace.codegen import common +from dace.config import Config +from dace.sdfg import nodes +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.helpers import is_within_schedule_types +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import get_gpu_stream_array_name, get_gpu_stream_connector_name +from dace.transformation.passes.gpu_specialization.insert_gpu_streams import InsertGPUStreams, get_dace_runtime_gpu_stream_name +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertGPUStreamSyncTasklets(ppl.Pass): + """ + Inserts GPU stream synchronization tasklets in an SDFG where needed. + + This pass uses a heuristic approach to find locations matching specific patterns + that require synchronization. Additional locations can be added easily if new + cases are discovered. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler, InsertGPUStreams, ConnectGPUStreamsToKernels, ConnectGPUStreamsToTasklets} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Tasklets | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Inserts GPU stream synchronization tasklets at required locations + after certain nodes and at the end of a state, for GPU streams used in the state. + """ + stream_assignments: Dict[nodes.Node, int] = pipeline_results['NaiveGPUStreamScheduler'] + + # Get sync locations + sync_state, sync_node = self._identify_sync_locations(sdfg, stream_assignments) + + # Synchronize after a node when required + self._insert_gpu_stream_sync_after_node(sdfg, sync_node, stream_assignments) + + # Synchronize all used streams at the end of a state + self._insert_gpu_stream_sync_at_state_end(sdfg, sync_state, stream_assignments) + return {} + + def _identify_sync_locations( + self, sdfg: SDFG, + stream_assignments: Dict[nodes.Node, int]) -> Tuple[Dict[SDFGState, Set[int]], Dict[nodes.Node, SDFGState]]: + """ + Heuristically identifies GPU stream synchronization points in an SDFG. + + Parameters + ---------- + sdfg : SDFG + The SDFG to analyze. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream ids. + + Returns + ------- + Tuple[Dict[SDFGState, Set[int]], Dict[nodes.Node, SDFGState]] + - **sync_state**: Maps each state to the set of stream IDs that should be + synchronized at the end of the state. + - **sync_node**: The keys of this dictionary are nodes after which synchronization + is needed, and their corresponding value is the state they belong to. + """ + + # ------------------ Helper predicates ----------------------------- + + def is_gpu_global_accessnode(node, state): + return isinstance(node, nodes.AccessNode) and node.desc( + state.parent).storage == dtypes.StorageType.GPU_Global + + def is_nongpu_accessnode(node, state): + return isinstance(node, nodes.AccessNode) and node.desc( + state.parent).storage not in dtypes.GPU_KERNEL_ACCESSIBLE_STORAGES + + def is_kernel_exit(node): + return isinstance(node, nodes.ExitNode) and node.schedule == dtypes.ScheduleType.GPU_Device + + def is_sink_node(node, state): + return state.out_degree(node) == 0 + + def edge_within_kernel(state, src, dst): + gpu_schedules = dtypes.GPU_SCHEDULES + dtypes.EXPERIMENTAL_GPU_SCHEDULES + src_in_kernel = is_within_schedule_types(state, src, gpu_schedules) + dst_in_kernel = is_within_schedule_types(state, dst, gpu_schedules) + return src_in_kernel and dst_in_kernel + + def is_tasklet_with_stream_use(src): + return isinstance(src, nodes.Tasklet) and get_dace_runtime_gpu_stream_name() in src.code.as_string + + # ------------------ Sync detection logic ----------------------------- + + sync_state: Dict[SDFGState, Set[int]] = {} + sync_node: Dict[nodes.Node, SDFGState] = {} + + for edge, state in sdfg.all_edges_recursive(): + src, dst = edge.src, edge.dst + + # Ensure state is initialized in sync_state + if state not in sync_state: + sync_state[state] = set() + + # --- Heuristics for when to sync --- + if (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) and is_sink_node(dst, state) + and not edge_within_kernel(state, src, dst)): + sync_state[state].add(stream_assignments[dst]) + + elif (is_gpu_global_accessnode(src, state) and is_nongpu_accessnode(dst, state) + and not is_sink_node(dst, state) and not edge_within_kernel(state, src, dst)): + sync_node[dst] = state + sync_state[state].add(stream_assignments[dst]) + + elif (is_nongpu_accessnode(src, state) and is_gpu_global_accessnode(dst, state) + and not edge_within_kernel(state, src, dst)): + sync_state[state].add(stream_assignments[dst]) + + elif (is_kernel_exit(src) and is_gpu_global_accessnode(dst, state) and not is_sink_node(dst, state)): + sync_state[state].add(stream_assignments[src]) + sync_state[state].add(stream_assignments[src]) + + elif (is_kernel_exit(src) and is_gpu_global_accessnode(dst, state) and is_sink_node(dst, state)): + sync_state[state].add(stream_assignments[dst]) + + elif is_tasklet_with_stream_use(src): + sync_state[state].add(stream_assignments[src]) + + else: + continue + + # Check that state is indeed a SDFGState when added to the dictionary, to be on the safe side + if not isinstance(state, SDFGState): + raise NotImplementedError(f"Unexpected parent type '{type(state).__name__}' for edge '{edge}'. " + "Expected 'SDFGState'. Please handle this case explicitly.") + + # Remove states with no syncs + sync_state = {state: streams for state, streams in sync_state.items() if len(streams) > 0} + + return sync_state, sync_node + + def _insert_gpu_stream_sync_at_state_end(self, sdfg: SDFG, sync_state: Dict[SDFGState, Set[int]], + stream_assignments: Dict[nodes.Node, int]) -> None: + """ + Inserts GPU stream synchronization tasklets at the end of SDFG states. + + For each state that requires synchronization, this method: + + 1. Generates a tasklet that synchronizes all assigned GPU streams using + the appropriate backend (e.g., CUDA). + 2. Ensures all other operations in the state complete before synchronization + by connecting all sink nodes to the tasklet. + 3. Guarantees that only a single GPU stream AccessNode connects to the sync + tasklet, creating one if needed. + + Parameters + ---------- + sdfg : SDFG + The top level SDFG. + sync_state : Dict[SDFGState, Set[int] + Mapping of states to sets of stream IDs that require synchronization at the end of the state. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream IDs. + """ + # Prepare GPU stream info and backend + stream_array_name = get_gpu_stream_array_name() + stream_var_name_prefix = get_gpu_stream_connector_name() + backend: str = common.get_gpu_backend() + + for state, streams in sync_state.items(): + + #----------------- Generate GPU stream synchronization Tasklet ----------------- + + # Build synchronization calls for all streams used in this state + sync_code_lines = [] + for stream in streams: + gpu_stream_var_name = f"{stream_var_name_prefix}{stream}" + sync_call = f"DACE_GPU_CHECK({backend}StreamSynchronize({gpu_stream_var_name}));" + sync_code_lines.append(sync_call) + sync_code = "\n".join(sync_code_lines) + + # Create the tasklet + tasklet = state.add_tasklet(name=f"gpu_stream_{stream}_synchronization", + inputs=set(), + outputs=set(), + code=sync_code, + language=dtypes.Language.CPP) + + # ----------------- Connect sink nodes to the synchronization tasklet ----------------- + + # 1. Seperate GPU stream sink nodes and other sink nodes + stream_sink_nodes: List[nodes.AccessNode] = [] + non_stream_sink_nodes: List[nodes.Node] = [] + for sink_node in state.sink_nodes(): + if isinstance(sink_node, nodes.AccessNode) and sink_node.desc(state).dtype == dtypes.gpuStream_t: + stream_sink_nodes.append(sink_node) + + elif sink_node != tasklet: + non_stream_sink_nodes.append(sink_node) + + # 2. Connect non-stream sink nodes to the sync tasklet + for sink_node in non_stream_sink_nodes: + state.add_edge(sink_node, None, tasklet, None, dace.Memlet()) + + # 3. Connect a single GPU stream sink node (create or merge if needed) + if len(stream_sink_nodes) == 0: + combined_stream_node = state.add_access(stream_array_name) + + else: + combined_stream_node = stream_sink_nodes.pop() + for stream_node in stream_sink_nodes: + for edge in state.in_edges(stream_node): + state.add_edge(edge.src, edge.src_conn, combined_stream_node, edge.dst_conn, edge.data) + state.remove_edge(edge) + state.remove_node(stream_node) + + # Connect back to output stream node + output_stream_node = state.add_access(combined_stream_node.data) + for stream in streams: + accessed_gpu_stream = f"{stream_array_name}[{stream}]" + conn = f"{stream_var_name_prefix}{stream}" # Note: Same as "gpu_stream_var_name" from tasklet + + tasklet.add_in_connector(conn, dtypes.gpuStream_t) + state.add_edge(combined_stream_node, None, tasklet, conn, dace.Memlet(accessed_gpu_stream)) + state.add_edge(tasklet, None, output_stream_node, None, dace.Memlet(None)) + + def _insert_gpu_stream_sync_after_node(self, sdfg: SDFG, sync_node: Dict[nodes.Node, SDFGState], + stream_assignments: Dict[nodes.Node, int]) -> None: + """ + Insert a GPU stream synchronization tasklet immediately after specified nodes. + + Parameters + ---------- + sdfg : SDFG + The top level SDFG. + sync_node : Dict[nodes.Node, SDFGState] + Mapping of nodes to their parent state. After after the node a GPU stream synchronization should occur. + stream_assignments : Dict[nodes.Node, int] + Mapping of nodes to their assigned GPU stream IDs. + """ + # Prepare GPU stream info and backend + stream_array_name = get_gpu_stream_array_name() + stream_var_name_prefix = get_gpu_stream_connector_name() + backend: str = common.get_gpu_backend() + + for node, state in sync_node.items(): + + #----------------- Generate GPU stream synchronization Tasklet ----------------- + + # Get assigned GPU stream + stream = stream_assignments.get(node, "nullptr") + if stream == "nullptr": + raise NotImplementedError("Using the default 'nullptr' gpu stream is not supported yet.") + + # Create the tasklet + stream_var_name = f"{stream_var_name_prefix}{stream}" + sync_call = f"DACE_GPU_CHECK({backend}StreamSynchronize({stream_var_name}));\n" + tasklet = state.add_tasklet(name=f"gpu_stream_{stream}_synchronization", + inputs=set(), + outputs=set(), + code=sync_call, + language=dtypes.Language.CPP) + + #----------------- Place tasklet between node and successors, link GPU streams ---------------- + + # 1. Put the tasklet between the node and its successors + for succ in state.successors(node): + state.add_edge(tasklet, None, succ, None, dace.Memlet()) + state.add_edge(node, None, tasklet, None, dace.Memlet()) + + # 2. Connect tasklet to GPU stream AccessNodes + in_stream = state.add_access(stream_array_name) + out_stream = state.add_access(stream_array_name) + accessed_stream = f"{stream_array_name}[{stream}]" + state.add_edge(in_stream, None, tasklet, stream_var_name, dace.Memlet(accessed_stream)) + state.add_edge(tasklet, None, out_stream, None, dace.Memlet(None)) + tasklet.add_in_connector(stream_var_name, dtypes.gpuStream_t, force=True) diff --git a/dace/transformation/passes/gpu_specialization/insert_gpu_streams.py b/dace/transformation/passes/gpu_specialization/insert_gpu_streams.py new file mode 100644 index 0000000000..93495fd406 --- /dev/null +++ b/dace/transformation/passes/gpu_specialization/insert_gpu_streams.py @@ -0,0 +1,153 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +from typing import Any, Dict, Set, Type, Union + +import dace +from dace import SDFG, dtypes, properties +from dace.config import Config +from dace.sdfg import is_devicelevel_gpu +from dace.sdfg.nodes import AccessNode, MapEntry, MapExit, Node, Tasklet +from dace.transformation import pass_pipeline as ppl, transformation +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.helpers.gpu_helpers import get_dace_runtime_gpu_stream_name, get_gpu_stream_array_name, get_gpu_stream_connector_name + + +@properties.make_properties +@transformation.explicit_cf_compatible +class InsertGPUStreams(ppl.Pass): + """ + Inserts a GPU stream array into the top-level SDFG and propagates it to all + nested SDFGs that require it, including intermediate SDFGs along the hierarchy. + + This pass guarantees that every relevant SDFG has the array defined, avoiding + duplication and allowing subsequent passes in the GPU stream pipeline to rely + on its presence without redefining it. + """ + + def depends_on(self) -> Set[Union[Type[ppl.Pass], ppl.Pass]]: + return {NaiveGPUStreamScheduler} + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.AccessNodes | ppl.Modifies.Memlets + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return False + + def apply_pass(self, sdfg: SDFG, pipeline_results: Dict[str, Any]): + """ + Ensure that a GPU stream array is available in all SDFGs that require it. + + The pass creates the array once at the top-level SDFG and propagates it + down the hierarchy by inserting matching arrays in child SDFGs and wiring + them through nested SDFG connectors. This way, all SDFGs share a consistent + reference to the same GPU stream array. + """ + + # Extract stream array name and number of streams to allocate + stream_array_name = get_gpu_stream_array_name() + stream_assignments: Dict[Node, Union[int, str]] = pipeline_results['NaiveGPUStreamScheduler'] + num_assigned_streams = max(stream_assignments.values(), default=0) + 1 + + # Add the GPU stream array at the top level + sdfg.add_transient(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Ensure GPU stream array is defined where required + for child_sdfg in self.find_child_sdfgs_requiring_gpu_stream(sdfg): + + # Skip if this child already has the array (inserted higher up in the hierarchy) + if stream_array_name in child_sdfg.arrays: + continue + + # Add the array to the child SDFG + inner_sdfg = child_sdfg + inner_sdfg.add_array(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Walk up the hierarchy until the array is found, inserting it into each parent + outer_sdfg = inner_sdfg.parent_sdfg + while stream_array_name not in outer_sdfg.arrays: + + # Insert array in parent SDFG + outer_sdfg.add_array(stream_array_name, (num_assigned_streams, ), + dtype=dace.dtypes.gpuStream_t, + storage=dace.dtypes.StorageType.Register) + + # Connect parent SDFG array to nested SDFG node + inner_nsdfg_node = inner_sdfg.parent_nsdfg_node + inner_parent_state = inner_sdfg.parent + inner_nsdfg_node.add_in_connector(stream_array_name, dtypes.gpuStream_t) + inp_gpu_stream: AccessNode = inner_parent_state.add_access(stream_array_name) + inner_parent_state.add_edge(inp_gpu_stream, None, inner_nsdfg_node, stream_array_name, + dace.Memlet(stream_array_name)) + + # Continue climbing up the hierarchy + inner_sdfg = outer_sdfg + outer_sdfg = outer_sdfg.parent_sdfg + + # Ensure final connection from the first parent that had the array down to this SDFG + inner_nsdfg_node = inner_sdfg.parent_nsdfg_node + inner_parent_state = inner_sdfg.parent + inner_nsdfg_node.add_in_connector(stream_array_name, dtypes.gpuStream_t) + inp_gpu_stream: AccessNode = inner_parent_state.add_access(stream_array_name) + inner_parent_state.add_edge(inp_gpu_stream, None, inner_nsdfg_node, stream_array_name, + dace.Memlet(f"{stream_array_name}[0:{num_assigned_streams}]")) + + outer_sdfg = inner_sdfg.parent_sdfg + + return {} + + def find_child_sdfgs_requiring_gpu_stream(self, sdfg) -> Set[SDFG]: + """ + Identify all child SDFGs that require a GPU stream array in their + array descriptor store. A child SDFG requires a GPU stream if: + + - It launches GPU kernels (MapEntry/MapExit with GPU_Device schedule). + - It contains special Tasklets (e.g., from library node expansion) that + use the GPU stream they are assigned to in the code. + - It accesses GPU global memory outside device-level GPU scopes, which + implies memory copies or kernel data feeds. + + Parameters + ---------- + sdfg : SDFG + The root SDFG to inspect. + + Returns + ------- + Set[SDFG] + The set of child SDFGs that need a GPU stream array in their array descriptor + store. + """ + requiring_gpu_stream = set() + for child_sdfg in sdfg.all_sdfgs_recursive(): + + # Skip the root SDFG itself + if child_sdfg is sdfg: + continue + + for state in child_sdfg.states(): + for node in state.nodes(): + + # Case 1: Kernel launch nodes + if isinstance(node, (MapEntry, MapExit)) and node.map.schedule == dtypes.ScheduleType.GPU_Device: + requiring_gpu_stream.add(child_sdfg) + break + + # Case 2: Tasklets that use GPU stream in their code + if isinstance(node, Tasklet) and get_dace_runtime_gpu_stream_name() in node.code.as_string: + requiring_gpu_stream.add(child_sdfg) + break + + # Case 3: Accessing GPU global memory outside device-level scopes + if (isinstance(node, AccessNode) and node.desc(state).storage == dtypes.StorageType.GPU_Global + and not is_devicelevel_gpu(state.sdfg, state, node)): + requiring_gpu_stream.add(child_sdfg) + break + + # Stop scanning this SDFG once a reason is found + if child_sdfg in requiring_gpu_stream: + break + + return requiring_gpu_stream diff --git a/tests/gpu_specialization/explicit_global_memory_copy_test.py b/tests/gpu_specialization/explicit_global_memory_copy_test.py new file mode 100644 index 0000000000..92cefed48a --- /dev/null +++ b/tests/gpu_specialization/explicit_global_memory_copy_test.py @@ -0,0 +1,331 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +import dace +import pytest +import numpy as np +from typing import Tuple +from dace.transformation.passes.gpu_specialization.insert_explicit_gpu_global_memory_copies import InsertExplicitGPUGlobalMemoryCopies + + +def _get_sdfg(name_str: str, dimension: Tuple[int], copy_strides: Tuple[int]) -> dace.SDFG: + sdfg = dace.SDFG(name_str) + state = sdfg.add_state("state0", is_start_block=True) + for arr_name in ["A", "B"]: + sdfg.add_array(arr_name, dimension, dace.float32, dace.dtypes.StorageType.GPU_Global) + a = state.add_access("A") + b = state.add_access("B") + copy_str = ", ".join([f"0:{dimension[i]}:{copy_strides[i]}" for i in range(len(dimension))]) + state.add_edge(a, None, b, None, dace.Memlet(f"A[{copy_str}]")) + sdfg.validate() + return sdfg + + +def _get_sdfg_with_other_subset(name_str: str, dimension: Tuple[int], copy_strides: Tuple[int]) -> dace.SDFG: + sdfg = dace.SDFG(name_str) + state = sdfg.add_state("state0", is_start_block=True) + for arr_name in ["A", "B"]: + sdfg.add_array(arr_name, dimension, dace.float32, dace.dtypes.StorageType.GPU_Global) + a = state.add_access("A") + b = state.add_access("B") + # copy_str = ", ".join([f"0:{dimension[i]}:{copy_strides[i]}" for i in range(len(dimension))]) + src_subset = dace.subsets.Range([((dimension[i] // 2), dimension[i] - 1, copy_strides[i]) + for i in range(len(dimension))]) + dst_subset = dace.subsets.Range([(0, (dimension[i] // 2) - 1, copy_strides[i]) for i in range(len(dimension))]) + state.add_edge(a, None, b, None, dace.Memlet(data="B", subset=dst_subset, other_subset=src_subset)) + sdfg.validate() + return sdfg + + +def _count_tasklets(sdfg: dace.SDFG) -> int: + """Count the number of tasklets in the SDFG.""" + count = 0 + for state in sdfg.nodes(): + for node in state.nodes(): + if isinstance(node, dace.nodes.Tasklet): + count += 1 + return count + + +def _count_nsdfgs(sdfg: dace.SDFG) -> int: + """Count the number of nested SDFGs in the SDFG.""" + count = 0 + for state in sdfg.nodes(): + for node in state.nodes(): + if isinstance(node, dace.nodes.NestedSDFG): + count += 1 + return count + + +@pytest.mark.gpu +def test_1d_copy(): + """Test 1D unit stride copy.""" + import cupy as cp + + dimension = (8, ) + copy_strides = (1, ) + + sdfg = _get_sdfg("test_1d_copy", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = A[::copy_strides[0]] + cp.testing.assert_array_equal(B, expected) + assert num_tasklets == 1 + + +@pytest.mark.gpu +def test_1d_copy_w_other_subset(): + """Test 1D unit stride copy.""" + import cupy as cp + + dimension = (8, ) + copy_strides = (1, ) + + sdfg = _get_sdfg_with_other_subset("test_1d_copy_w_other_subset", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + sdfg.save("x.sdfg") + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = A[4:8:copy_strides[0]] + cp.testing.assert_array_equal(B[0:4], expected) + assert num_tasklets == 1 + + +@pytest.mark.gpu +def test_2d_copy(): + """Test 2D unit stride copy with other subset not None.""" + import cupy as cp + + dimension = (8, 8) + copy_strides = (1, 1) + + sdfg = _get_sdfg("test_2d_copy", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + assert num_tasklets == 1 + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = A[::copy_strides[0], ::copy_strides[1]] + cp.testing.assert_array_equal(B, expected) + + assert num_tasklets == 1 + + +@pytest.mark.gpu +def test_2d_copy_with_other_subset(): + """Test 2D unit stride copy with other subset not None.""" + import cupy as cp + + dimension = (8, 8) + copy_strides = (1, 1) + + sdfg = _get_sdfg_with_other_subset("test_2d_copy_with_other_subset", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = A[4:8:copy_strides[0], 4:8:copy_strides[1]] + cp.testing.assert_array_equal(B[0:4, 0:4], expected) + assert num_tasklets == 1 + + +@pytest.mark.gpu +def test_3d_copy(): + """Test 3D unit stride copy.""" + import cupy as cp + + dimension = (8, 4, 4) + copy_strides = (1, 1, 1) + + sdfg = _get_sdfg("test_3d_copy", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = A[::copy_strides[0], ::copy_strides[1], ::copy_strides[2]] + cp.testing.assert_array_equal(B, expected) + + assert num_tasklets == 1 + + +@pytest.mark.gpu +@pytest.mark.parametrize("stride", [2, 4]) +def test_1d_strided_copy(stride): + """Test 1D strided copy with varying strides.""" + import cupy as cp + + dimension = (8, ) + copy_strides = (stride, ) + + sdfg = _get_sdfg(f"test_1d_strided_copy_s{stride}", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + assert num_tasklets == 1 + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness - only elements at stride intervals should be copied + expected = cp.zeros_like(A) + expected[::stride] = A[::stride] + cp.testing.assert_array_equal(B[::stride], expected[::stride]) + + +@pytest.mark.gpu +@pytest.mark.parametrize("stride_1,stride_2", [(2, 1), (4, 1), (1, 2), (1, 4)]) +def test_2d_strided_copy(stride_1, stride_2): + """Test 2D strided copy. First dimension is unit stride, second is strided.""" + import cupy as cp + + dimension = (8, 4) + copy_strides = (stride_1, stride_2) + + sdfg = _get_sdfg(f"test_2d_strided_copy_s{stride_1}_{stride_2}", dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + assert num_tasklets == 1 + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = cp.zeros_like(A) + expected[::stride_1, ::stride_2] = A[::stride_1, ::stride_2] + cp.testing.assert_array_equal(B[::stride_1, ::stride_2], expected[::stride_1, ::stride_2]) + + +@pytest.mark.gpu +@pytest.mark.parametrize("stride_1,stride_2,stride_3", [(1, 2, 2), (1, 2, 4), (1, 4, 2), (4, 1, 1), (4, 2, 1), + (2, 2, 1)]) +def test_3d_strided_copy(stride_1, stride_2, stride_3): + """Test 3D strided copy. First dimension is unit stride, others are strided.""" + import cupy as cp + + dimension = (8, 4, 4) + copy_strides = (stride_1, stride_2, stride_3) + + sdfg = _get_sdfg(f"test_3d_strided_copy_s{stride_1}_{stride_2}_{stride_3}", dimension, copy_strides) + sdfg.save("x1.sdfg") + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + sdfg.save("x2.sdfg") + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + assert num_tasklets == 1 + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + expected = cp.zeros_like(A) + expected[::stride_1, ::stride_2, ::stride_3] = A[::stride_1, ::stride_2, ::stride_3] + cp.testing.assert_array_equal(B, expected) + + +@pytest.mark.gpu +@pytest.mark.parametrize("stride_1,stride_2,stride_3", [ + (1, 2, 2), + (1, 2, 4), + (1, 4, 2), + (2, 2, 1), +]) +def test_3d_strided_copy_w_other_subset(stride_1, stride_2, stride_3): + """Test 3D strided copy. First dimension is unit stride, others are strided.""" + import cupy as cp + + dimension = (8, 8, 8) + copy_strides = (stride_1, stride_2, stride_3) + + sdfg = _get_sdfg_with_other_subset(f"test_3d_strided_copy_s{stride_1}_{stride_2}_{stride_3}_w_other_subset", + dimension, copy_strides) + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + + # Count tasklets + num_tasklets = _count_tasklets(sdfg) + assert num_tasklets == 1 + + # Test with cupy + A = cp.random.rand(*dimension).astype(np.float32) + B = cp.zeros_like(A) + + sdfg(A=A, B=B) + + # Verify correctness + cp.testing.assert_array_equal(B[0:4:copy_strides[0], 0:4:copy_strides[1], 0:4:copy_strides[2]], + A[4:8:copy_strides[0], 4:8:copy_strides[1], 4:8:copy_strides[2]]) + + +@pytest.mark.gpu +def test_independent_copies(): + + @dace.program + def independent_copies(A: dace.uint32[128], B: dace.uint32[128], C: dace.uint32[128], D: dace.uint32[128]): + for i in dace.map[0:128:1]: + B[i] = A[i] + for i in dace.map[0:128:1]: + D[i] = C[i] + + sdfg = independent_copies.to_sdfg() + sdfg.apply_gpu_transformations() + sdfg.validate() + sdfg.save("s1.sdfg") + + InsertExplicitGPUGlobalMemoryCopies().apply_pass(sdfg, {}) + sdfg.save("s2.sdfg") + + sdfg.validate() + sdfg.compile() diff --git a/tests/gpu_specialization/explicit_gpu_stream_management_test.py b/tests/gpu_specialization/explicit_gpu_stream_management_test.py new file mode 100644 index 0000000000..8c81d19bd3 --- /dev/null +++ b/tests/gpu_specialization/explicit_gpu_stream_management_test.py @@ -0,0 +1,208 @@ +# Copyright 2019-2026 ETH Zurich and the DaCe authors. All rights reserved. +import pytest + +import dace +from dace.codegen import common +from dace.transformation.pass_pipeline import Pipeline +from dace.transformation.passes.gpu_specialization.gpu_stream_scheduling import NaiveGPUStreamScheduler +from dace.transformation.passes.gpu_specialization.insert_explicit_gpu_global_memory_copies import InsertExplicitGPUGlobalMemoryCopies +from dace.transformation.passes.gpu_specialization.insert_gpu_streams import InsertGPUStreams +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_kernels import ConnectGPUStreamsToKernels +from dace.transformation.passes.gpu_specialization.connect_gpu_streams_to_tasklets import ConnectGPUStreamsToTasklets +from dace.transformation.passes.gpu_specialization.insert_gpu_stream_sync_tasklets import InsertGPUStreamSyncTasklets +from dace.transformation.passes.gpu_specialization.gpu_stream_topology_simplification import GPUStreamTopologySimplification + +gpu_stream_pipeline = Pipeline([ + InsertExplicitGPUGlobalMemoryCopies(), + NaiveGPUStreamScheduler(), + InsertGPUStreams(), + ConnectGPUStreamsToKernels(), + ConnectGPUStreamsToTasklets(), + InsertGPUStreamSyncTasklets(), + GPUStreamTopologySimplification(), +]) + +backend = common.get_gpu_backend() + + +@pytest.mark.gpu +def test_basic(): + """ + A simple memory copy program. + Since the SDFG has a single connected component, exactly one GPU stream is used + and must be synchronized at the end of the state. For each synchronized stream, + the pipeline introduces a memlet from the synchronization tasklet to a GPU stream + AccessNode. Therefore, it is sufficient to verify there is only one sink node with one ingoing + edge, verify its dtype, and check for the presence of a preceeding synchronization tasklet. + """ + + @dace.program + def simple_copy(A: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global, + B: dace.uint32[128] @ dace.dtypes.StorageType.GPU_Global): + for i in dace.map[0:128:1] @ dace.dtypes.ScheduleType.GPU_Device: + B[i] = A[i] + + sdfg = simple_copy.to_sdfg() + gpu_stream_pipeline.apply_pass(sdfg, {}) + + state = sdfg.states()[0] + sink_nodes = state.sink_nodes() + node = sink_nodes[0] + assert ( + len(sink_nodes) == 1 and len(state.in_edges(node)) == 1 and isinstance(node, dace.nodes.AccessNode) + and node.desc(state).dtype == dace.dtypes.gpuStream_t + ), ("Only one sink node with should exist, which is a GPU stream AccessNode and it should have one ingoing edge.") + + assert (isinstance(pre, dace.nodes.Tasklet) and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node)), ("At then end of each state any used stream must be synchronized.") + + sdfg.compile() + + +@pytest.mark.gpu +def test_extended(): + """ + A program that performs two independent memory copies. + The input arrays reside in host memory, and `gpu_transformations()` is applied to + the program. As a result, the data is first copied to GPU global memory, after + which the two copies are executed on the GPU. Since these copies form two + independent connected components in the resulting SDFG, the naive GPU stream + scheduler assigns them to different GPU streams. + This test verifies that exactly two GPU streams are used, that both streams are + synchronized at the end of the state, and that the corresponding asynchronous + memory copy tasklets are correctly associated with their assigned streams. + """ + + @dace.program + def independent_copies(A: dace.uint32[128], B: dace.uint32[128], C: dace.uint32[128], D: dace.uint32[128]): + for i in dace.map[0:128:1]: + B[i] = A[i] + for i in dace.map[0:128:1]: + D[i] = C[i] + + sdfg = independent_copies.to_sdfg() + + # Transform such that program can run on GPU and apply GPU stream pipeline + sdfg.apply_gpu_transformations() + gpu_stream_pipeline.apply_pass(sdfg, {}) + + # Test 1: Two GPU streams were used since we use the Naive Stream scheduler + state = sdfg.states()[0] + sink_nodes = state.sink_nodes() + node = sink_nodes[0] + assert (len(sink_nodes) == 1 and len(state.in_edges(node)) == 2 and isinstance(node, dace.nodes.AccessNode) + and node.desc(state).dtype == dace.dtypes.gpuStream_t), ( + "Only one sink node with should exist, which is a GPU stream AccessNode and it " + "should have two ingoing edges as original graph consisted of two connected components.") + + # Test 2: We synchronize at the end of the state + assert (isinstance(pre, dace.nodes.Tasklet) and f"{backend}StreamSynchronize(" in pre.code.as_string + for pre in state.predecessors(node)), ("At then end of each state any used stream must be synchronized.") + + # Test 3: Check that we have memory copy tasklets (as we perform two "Main Memory -> GPU GLobal" + # memory copies and two "GPU Global -> Main Memory" memory copies by applying the gpu tranformation) + # and that they use the name of the in connector of the GPU stream in the copy call + memcopy_tasklets = [ + n for n in state.nodes() if isinstance(n, dace.nodes.Tasklet) and f"{backend}MemcpyAsync(" in n.code.as_string + ] + for tasklet in memcopy_tasklets: + assert len(tasklet.in_connectors) == 2, ("Memcpy tasklets must have one connector " + "corresponding to the GPU stream and copy-in node.") + + sdfg.compile() + + +@pytest.mark.gpu +def test_numerical_correctness(): + """ + Test that verifies numerical correctness by comparing CPU and GPU program outputs. + The test creates a simple computation (element-wise multiplication and addition), + runs it on both CPU and GPU, and verifies that the results match within tolerance. + """ + import numpy as np + + @dace.program + def compute(A: dace.float32[128], B: dace.float32[128], C: dace.float32[128]): + for i in dace.map[0:128:1]: + C[i] = A[i] * 2.0 + B[i] + + # Create test data + rng = np.random.default_rng(42) + A = rng.random(128, dtype=np.float32) + B = rng.random(128, dtype=np.float32) + C_cpu = np.zeros(128, dtype=np.float32) + C_gpu = np.zeros(128, dtype=np.float32) + + # Run on CPU + sdfg_cpu = compute.to_sdfg() + sdfg_cpu(A=A.copy(), B=B.copy(), C=C_cpu) + + # Run on GPU + sdfg_gpu = compute.to_sdfg() + sdfg_gpu.apply_gpu_transformations() + gpu_stream_pipeline.apply_pass(sdfg_gpu, {}) + sdfg_gpu(A=A.copy(), B=B.copy(), C=C_gpu) + + # Verify numerical correctness + assert np.allclose( + C_cpu, C_gpu, rtol=1e-5, + atol=1e-7), (f"CPU and GPU results do not match. Max difference: {np.max(np.abs(C_cpu - C_gpu))}") + + # Verify expected result + expected = A * 2.0 + B + assert np.allclose(C_cpu, expected, rtol=1e-5, atol=1e-7), ("CPU result does not match expected computation") + assert np.allclose(C_gpu, expected, rtol=1e-5, atol=1e-7), ("GPU result does not match expected computation") + + +@pytest.mark.gpu +def test_numerical_correctness_complex(): + """ + Test numerical correctness for a more complex computation involving + multiple operations and dependencies between array elements. + """ + import numpy as np + + @dace.program + def complex_compute(A: dace.float64[128], B: dace.float64[128], C: dace.float64[128], D: dace.float64[128]): + # First map: C = A * B + for i in dace.map[0:128:1]: + C[i] = A[i] * B[i] + + # Second map: D = C + A (depends on result of first map) + for i in dace.map[0:128:1]: + D[i] = C[i] + A[i] + + # Create test data + rng = np.random.default_rng(123) + A = rng.random(128, dtype=np.float64) + B = rng.random(128, dtype=np.float64) + C_cpu = np.zeros(128, dtype=np.float64) + D_cpu = np.zeros(128, dtype=np.float64) + C_gpu = np.zeros(128, dtype=np.float64) + D_gpu = np.zeros(128, dtype=np.float64) + + # Run on CPU + sdfg_cpu = complex_compute.to_sdfg() + sdfg_cpu(A=A.copy(), B=B.copy(), C=C_cpu, D=D_cpu) + + # Run on GPU + sdfg_gpu = complex_compute.to_sdfg() + sdfg_gpu.apply_gpu_transformations() + gpu_stream_pipeline.apply_pass(sdfg_gpu, {}) + sdfg_gpu(A=A.copy(), B=B.copy(), C=C_gpu, D=D_gpu) + + # Verify numerical correctness for intermediate result C + assert np.allclose( + C_cpu, C_gpu, rtol=1e-12, + atol=1e-14), (f"CPU and GPU results for C do not match. Max difference: {np.max(np.abs(C_cpu - C_gpu))}") + + # Verify numerical correctness for final result D + assert np.allclose( + D_cpu, D_gpu, rtol=1e-12, + atol=1e-14), (f"CPU and GPU results for D do not match. Max difference: {np.max(np.abs(D_cpu - D_gpu))}") + + # Verify expected results + expected_C = A * B + expected_D = expected_C + A + assert np.allclose(D_cpu, expected_D, rtol=1e-12, atol=1e-14), ("CPU result does not match expected computation") + assert np.allclose(D_gpu, expected_D, rtol=1e-12, atol=1e-14), ("GPU result does not match expected computation")