diff --git a/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection.py b/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection.py index 23318563de..06f96129cc 100644 --- a/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection.py +++ b/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection.py @@ -188,7 +188,9 @@ def run( log.debug("advection run - start") log.debug("communication of prep_adv cell field: mass_flx_ic - start") - self._exchange.exchange_and_wait(dims.CellDim, prep_adv.mass_flx_ic) + self._exchange.exchange( + dims.CellDim, prep_adv.mass_flx_ic, stream=decomposition.DEFAULT_STREAM + ) log.debug("communication of prep_adv cell field: mass_flx_ic - end") log.debug("running stencil copy_cell_kdim_field - start") @@ -272,7 +274,11 @@ def run( log.debug("advection run - start") log.debug("communication of prep_adv cell field: mass_flx_ic - start") - self._exchange.exchange_and_wait(dims.CellDim, prep_adv.mass_flx_ic) + self._exchange.exchange( + dims.CellDim, + prep_adv.mass_flx_ic, + stream=decomposition.DEFAULT_STREAM, + ) log.debug("communication of prep_adv cell field: mass_flx_ic - end") # reintegrate density for conservation of mass @@ -365,7 +371,11 @@ def run( # exchange updated tracer values, originally happens only if iforcing /= inwp log.debug("communication of advection cell field: p_tracer_new - start") - self._exchange.exchange_and_wait(dims.CellDim, p_tracer_new) + self._exchange.exchange( + dims.CellDim, + p_tracer_new, + stream=decomposition.DEFAULT_STREAM, + ) log.debug("communication of advection cell field: p_tracer_new - end") # finalize step diff --git a/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection_horizontal.py b/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection_horizontal.py index 200bed37db..02ad6807c5 100644 --- a/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection_horizontal.py +++ b/model/atmosphere/advection/src/icon4py/model/atmosphere/advection/advection_horizontal.py @@ -155,7 +155,7 @@ def apply_flux_limiter( ) log.debug("communication of advection cell field: r_m - start") - self._exchange.exchange_and_wait(dims.CellDim, self._r_m) + self._exchange.exchange(dims.CellDim, self._r_m, stream=decomposition.DEFAULT_STREAM) log.debug("communication of advection cell field: r_m - end") # limit outward fluxes diff --git a/model/atmosphere/diffusion/src/icon4py/model/atmosphere/diffusion/diffusion.py b/model/atmosphere/diffusion/src/icon4py/model/atmosphere/diffusion/diffusion.py index 4e87d3de17..883795ec6b 100644 --- a/model/atmosphere/diffusion/src/icon4py/model/atmosphere/diffusion/diffusion.py +++ b/model/atmosphere/diffusion/src/icon4py/model/atmosphere/diffusion/diffusion.py @@ -386,7 +386,7 @@ def __init__( self._cell_params = cell_params self.halo_exchange_wait = decomposition.create_halo_exchange_wait( - self._exchange + self._exchange, ) # wait on a communication handle self.rd_o_cvd: float = constants.GAS_CONSTANT_DRY_AIR / ( constants.CPD - constants.GAS_CONSTANT_DRY_AIR @@ -731,11 +731,12 @@ def _sync_cell_fields(self, prognostic_state): IF ( linit .OR. (iforcing /= inwp .AND. iforcing /= iaes) ) THEN """ log.debug("communication of prognostic cell fields: theta, w, exner - start") - self._exchange.exchange_and_wait( + self._exchange.exchange( dims.CellDim, prognostic_state.w, prognostic_state.theta_v, prognostic_state.exner, + stream=decomposition.DEFAULT_STREAM, ) log.debug("communication of prognostic cell fields: theta, w, exner - done") @@ -772,12 +773,17 @@ def _do_diffusion_step( log.debug("rbf interpolation 1: end") # 2. HALO EXCHANGE -- CALL sync_patch_array_mult u_vert and v_vert + # TODO(phimuell, muellch): Is asynchronous mode okay here. + # NOTE: We do not specify a stream here but rely on the default argument. + # We do this to ensure that the orchestrator works, but it is not aware + # of the streams. log.debug("communication rbf extrapolation of vn - start") self._exchange( self.u_vert, self.v_vert, dim=dims.VertexDim, - wait=True, + full_exchange=True, + # stream=decomposition.DEFAULT_STREAM, # noqa: ERA001 # See NOTE above. ) log.debug("communication rbf extrapolation of vn - end") @@ -817,7 +823,13 @@ def _do_diffusion_step( # TODO(halungge): move this up and do asynchronous exchange if self.config.type_vn_diffu > 1: log.debug("communication rbf extrapolation of z_nable2_e - start") - self._exchange(self.z_nabla2_e, dim=dims.EdgeDim, wait=True) + # TODO(phimuell, muellch): Is asynchronous mode okay here? + self._exchange( + self.z_nabla2_e, + dim=dims.EdgeDim, + full_exchange=True, + # stream=decomposition.DEFAULT_STREAM, # noqa: ERA001 # See NOTE above. + ) log.debug("communication rbf extrapolation of z_nable2_e - end") log.debug("2nd rbf interpolation: start") @@ -827,12 +839,14 @@ def _do_diffusion_step( log.debug("2nd rbf interpolation: end") # 6. HALO EXCHANGE -- CALL sync_patch_array_mult (Vertex Fields) + # TODO(phimuell, muellch): Is asynchronous mode okay here. log.debug("communication rbf extrapolation of z_nable2_e - start") self._exchange( self.u_vert, self.v_vert, dim=dims.VertexDim, - wait=True, + full_exchange=True, + # stream=decomposition.DEFAULT_STREAM, # noqa: ERA001 # See NOTE above. ) log.debug("communication rbf extrapolation of z_nable2_e - end") @@ -848,7 +862,12 @@ def _do_diffusion_step( log.debug("running stencils 04 05 06 (apply_diffusion_to_vn): end") log.debug("communication of prognistic.vn : start") - handle_edge_comm = self._exchange(prognostic_state.vn, dim=dims.EdgeDim, wait=False) + handle_edge_comm = self._exchange( + prognostic_state.vn, + dim=dims.EdgeDim, + full_exchange=False, + # stream=decomposition.DEFAULT_STREAM, # noqa: ERA001 # See NOTE above. + ) log.debug( "running stencils 07 08 09 10 (apply_diffusion_to_w_and_compute_horizontal_gradients_for_turbulence): start" @@ -894,7 +913,8 @@ def _do_diffusion_step( log.debug("running stencil 13 to 16 apply_diffusion_to_theta_and_exner: end") self.halo_exchange_wait( - handle_edge_comm + handle_edge_comm, + # stream=decomposition.DEFAULT_STREAM, # noqa: ERA001 # See NOTE above. ) # need to do this here, since we currently only use 1 communication object. log.debug("communication of prognogistic.vn - end") diff --git a/model/atmosphere/dycore/src/icon4py/model/atmosphere/dycore/solve_nonhydro.py b/model/atmosphere/dycore/src/icon4py/model/atmosphere/dycore/solve_nonhydro.py index b1b2c2dade..2d15ac70cc 100644 --- a/model/atmosphere/dycore/src/icon4py/model/atmosphere/dycore/solve_nonhydro.py +++ b/model/atmosphere/dycore/src/icon4py/model/atmosphere/dycore/solve_nonhydro.py @@ -1169,8 +1169,11 @@ def run_predictor_step( ) log.debug("exchanging prognostic field 'vn' and local field 'rho_at_edges_on_model_levels'") - self._exchange.exchange_and_wait( - dims.EdgeDim, prognostic_states.next.vn, z_fields.rho_at_edges_on_model_levels + self._exchange.exchange( + dims.EdgeDim, + prognostic_states.next.vn, + z_fields.rho_at_edges_on_model_levels, + stream=decomposition.DEFAULT_STREAM, ) self._compute_horizontal_velocity_quantities_and_fluxes( @@ -1241,12 +1244,19 @@ def run_predictor_step( log.debug( "exchanging prognostic field 'w' and local field 'dwdz_at_cells_on_model_levels'" ) - self._exchange.exchange_and_wait( - dims.CellDim, prognostic_states.next.w, z_fields.dwdz_at_cells_on_model_levels + self._exchange.exchange( + dims.CellDim, + prognostic_states.next.w, + z_fields.dwdz_at_cells_on_model_levels, + stream=decomposition.DEFAULT_STREAM, ) else: log.debug("exchanging prognostic field 'w'") - self._exchange.exchange_and_wait(dims.CellDim, prognostic_states.next.w) + self._exchange.exchange( + dims.CellDim, + prognostic_states.next.w, + stream=decomposition.DEFAULT_STREAM, + ) def run_corrector_step( self, @@ -1334,7 +1344,11 @@ def run_corrector_step( ) log.debug("exchanging prognostic field 'vn'") - self._exchange.exchange_and_wait(dims.EdgeDim, (prognostic_states.next.vn)) + self._exchange.exchange( + dims.EdgeDim, + prognostic_states.next.vn, + stream=decomposition.DEFAULT_STREAM, + ) self._compute_averaged_vn_and_fluxes( spatially_averaged_vn=self.z_vn_avg, @@ -1401,9 +1415,10 @@ def run_corrector_step( r_nsubsteps=r_nsubsteps, ) log.debug("exchange prognostic fields 'rho' , 'exner', 'w'") - self._exchange.exchange_and_wait( + self._exchange.exchange( dims.CellDim, prognostic_states.next.rho, prognostic_states.next.exner, prognostic_states.next.w, + stream=decomposition.DEFAULT_STREAM, ) diff --git a/model/common/src/icon4py/model/common/decomposition/definitions.py b/model/common/src/icon4py/model/common/decomposition/definitions.py index e02cb5a1a7..342f5d2e02 100644 --- a/model/common/src/icon4py/model/common/decomposition/definitions.py +++ b/model/common/src/icon4py/model/common/decomposition/definitions.py @@ -14,7 +14,7 @@ from collections.abc import Sequence from enum import Enum from types import ModuleType -from typing import Any, Literal, Protocol, overload, runtime_checkable +from typing import Any, Literal, Protocol, TypeAlias, overload, runtime_checkable import dace # type: ignore[import-untyped] import gt4py.next as gtx @@ -28,6 +28,74 @@ log = logging.getLogger(__name__) +# TODO(all): Currently this is the only place that uses/needs stream. Move them to an +# appropriate location once the need arises. + + +class CupyLikeStream(Protocol): + """The type follows the CuPy convention of a stream. + + This means they have an attribute `ptr` that returns the address of the + underlying GPU stream. + See: https://docs.cupy.dev/en/stable/reference/generated/cupy.cuda.Stream.html#cupy-cuda-stream + + Todo: + Drop once we fully translated to CuPy 13. + """ + + @property + def ptr(self) -> int: ... + + +class CudaStreamProtocolLike(Protocol): + """The type follows the CUDA stream protocol. + + This means it provides a method called `__cuda_stream__()` returning a pair of + integers. The first is the protocol version and the second value is the + address of the stream. + See: https://nvidia.github.io/cuda-python/cuda-core/latest/interoperability.html#cuda-stream-protocol + """ + + def __cuda_stream__(self) -> tuple[int, int]: ... + + +StreamLike: TypeAlias = CupyLikeStream | CudaStreamProtocolLike + + +@dataclasses.dataclass(frozen=True) +class Stream: + """Stream object used in ICON4Py + + Args: + ptr: The address of the underlying stream. + """ + + ptr: int + + def __cuda_stream__(self) -> tuple[int, int]: + return 1, self.ptr + + +DEFAULT_STREAM = Stream(0) +"""Default stream of the device. + +Its availability is not tied to a particular device, it is thus also present in a +purely CPU setting, where it is save to use and usually represents fully blocking +semantic. +""" + + +class Block: + pass + + +BLOCK = Block() +""" +Constant used by `ExchangeResult.finish()` to indicate that blocking semantic should +be used, i.e. wait until the exchange has fully finished not until it has merely been +scheduled on the device, should be used. +""" + class ProcessProperties(Protocol): comm: Any @@ -151,24 +219,161 @@ def global_index( class ExchangeResult(Protocol): - def wait(self) -> None: ... + def finish( + self, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: + """Wait on the halo exchange. + + Finishes the halo exchange represented by this `ExchangeResult`, see + `ExchangeRuntime.start()` for more. + When the function returns the exchange has not necessarily completed yet, + but has been scheduled on the device using stream `stream`. This means + that all further work submitted to `stream` will wait until the exchange + has completed. By default `DEFAULT_STREAM` is used. + + In case `stream` is `BLOCK` the function will only return after the exchange + has been completed. - def is_ready(self) -> bool: ... + For fields located on the host the only supported behaviour is `stream=BLOCK`. + """ + ... + + def is_ready(self) -> bool: + """Check if communication has been finished. + + For an exchange involving device memory, calling this function is equivalent + to the call `self.finish(stream=BLOCK)`. + """ + ... @runtime_checkable class ExchangeRuntime(Protocol): @overload - def exchange(self, dim: gtx.Dimension, *fields: gtx.Field) -> ExchangeResult: ... + def start( + self, + dim: gtx.Dimension, + *buffers: data_alloc.NDArray, + stream: StreamLike, + ) -> ExchangeResult: ... @overload - def exchange(self, dim: gtx.Dimension, *buffers: data_alloc.NDArray) -> ExchangeResult: ... + def start( + self, + dim: gtx.Dimension, + *fields: gtx.Field, + stream: StreamLike = DEFAULT_STREAM, + ) -> ExchangeResult: + """Initiate a halo exchanges. + + The exchange will synchronize with `stream`, i.e. not start before all work + previously submitted to `stream` has finished. `stream` defaults to + `DEFAULT_STREAM`. To complete the exchange `finish()` must be called on the + returned `ExchangeResult`. There is also the `exchange()` function which + combines these two steps into one. + + Once the function returns it is safe to reuse the memory of `fields` (is this + still true for NCCL). + + Note: + For fields on the host the exchange will begin immediately, regardless + which stream has been passed. + """ + ... @overload - def exchange_and_wait(self, dim: gtx.Dimension, *fields: gtx.Field) -> None: ... + def exchange( + self, + dim: gtx.Dimension, + *buffers: data_alloc.NDArray, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: ... @overload - def exchange_and_wait(self, dim: gtx.Dimension, *buffers: data_alloc.NDArray) -> None: ... + def exchange( + self, + dim: gtx.Dimension, + *fields: gtx.Field, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: ... + + def exchange( + self, + dim: gtx.Dimension, + *fields: gtx.Field | data_alloc.NDArray, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: + """Perform a full hallo exchange. + + The exchange will synchronize with `stream`, i.e. not start before tasks + previously submitted to it are done. The function returns before the exchange + has been completed, but it will synchronize with `stream`, i.e. work submitted + to `stream` will not start before the exchange has finished. + + It is possible to split the exchange into a send part, for which + `exchange_request = self.start()` can be used and a receive part for which + `exchange_request.finish()` can be used. + + In case `stream` is `BLOCK` then the function will only return once the exchange + has been completed entirely. In this case the send part of will be performed + as `DEFAULT_STREAM` was passed. + + Note: + The protocol supplies a default implementation. + """ + ex_req = self.start( + dim, + *fields, + stream=(DEFAULT_STREAM if stream is BLOCK else stream), # type: ignore[arg-type] + ) + ex_req.finish(stream) + + @overload + def __call__( + self, + *fields: gtx.Field | data_alloc.NDArray, + dim: gtx.Dimension, + full_exchange: Literal[True], + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: ... + + @overload + def __call__( + self, + *fields: gtx.Field | data_alloc.NDArray, + dim: gtx.Dimension, + full_exchange: Literal[False], + stream: StreamLike = DEFAULT_STREAM, + ) -> ExchangeResult: ... + + def __call__( + self, + *fields: gtx.Field | data_alloc.NDArray, + dim: gtx.Dimension, + full_exchange: bool = True, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None | ExchangeResult: + """Performs either a full exchange or a partial exchange. + + If `full_exchange` is `True` then this function is equivalent to + `self.exchange()` otherwise it behaves as `self.start()` and the exchange + result object is returned. + + Note: + - This function is deprecated and should no longer be used. + - The order of `*fields` and `dim` is reversed compared to `exchange()`. + - The protocol supplies a default implementation. + """ + ex_req = self.start( + dim, + *fields, + stream=(DEFAULT_STREAM if stream is BLOCK else stream), # type: ignore[arg-type] + ) + if not full_exchange: + return ex_req + ex_req.finish(stream=stream) + return None def get_size(self) -> int: ... @@ -179,44 +384,26 @@ def __str__(self) -> str: @dataclasses.dataclass -class SingleNodeExchange: - def exchange( - self, dim: gtx.Dimension, *fields: gtx.Field | data_alloc.NDArray +class SingleNodeExchange(ExchangeRuntime): + def start( + self, + dim: gtx.Dimension, + *fields: gtx.Field | data_alloc.NDArray, + stream: StreamLike = DEFAULT_STREAM, ) -> ExchangeResult: return SingleNodeResult() - def exchange_and_wait( - self, dim: gtx.Dimension, *fields: gtx.Field | data_alloc.NDArray - ) -> None: - return None - def my_rank(self) -> int: return 0 def get_size(self) -> int: return 1 - def __call__(self, *args: Any, dim: gtx.Dimension, wait: bool = True) -> ExchangeResult | None: # type: ignore[return] # return statment in else condition - """Perform a halo exchange operation. - - Args: - args: The fields to be exchanged. - - Keyword Args: - dim: The dimension along which the exchange is performed. - wait: If True, the operation will block until the exchange is completed (default: True). - """ - - res = self.exchange(dim, *args) - if wait: - res.wait() - else: - return res - # Implementation of DaCe SDFGConvertible interface # For more see [dace repo]/dace/frontend/python/common.py#[class SDFGConvertible] + # NOTE: Stream are not supported here. def dace__sdfg__( - self, *args: Any, dim: gtx.Dimension, wait: bool = True + self, *args: Any, dim: gtx.Dimension, full_exchange: bool = True ) -> dace.sdfg.sdfg.SDFG: sdfg = DummyNestedSDFG().__sdfg__() sdfg.name = "_halo_exchange_" @@ -236,9 +423,20 @@ def dace__sdfg_signature__(self) -> tuple[Sequence[str], Sequence[str]]: class HaloExchangeWaitRuntime(Protocol): """Protocol for halo exchange wait.""" - def __call__(self, communication_handle: ExchangeResult) -> None: - """Wait on the communication handle.""" - ... + def __call__( + self, + communication_handle: ExchangeResult, + stream: StreamLike | Block = DEFAULT_STREAM, + ) -> None: + """Calls `wait()` on the provided communication handle. + + Args: + stream: The stream forwarded to the `wait()` call, defaults to `DEFAULT_STREAM`. + + Note: + - The protocol provides a default implementation. + """ + communication_handle.finish(stream=stream) def __sdfg__(self, *args: Any, **kwargs: dict[str, Any]) -> dace.sdfg.sdfg.SDFG: """DaCe related: SDFGConvertible interface.""" @@ -254,15 +452,16 @@ def __sdfg_signature__(self) -> tuple[Sequence[str], Sequence[str]]: @dataclasses.dataclass -class HaloExchangeWait: +class HaloExchangeWait(HaloExchangeWaitRuntime): exchange_object: SingleNodeExchange # maintain the same interface with the MPI counterpart - def __call__(self, communication_handle: SingleNodeResult) -> None: - communication_handle.wait() - # Implementation of DaCe SDFGConvertible interface def dace__sdfg__( - self, *args: Any, dim: gtx.Dimension, wait: bool = True + self, + *args: Any, + dim: gtx.Dimension, + full_exchange: bool = True, + stream: StreamLike | Block = DEFAULT_STREAM, ) -> dace.sdfg.sdfg.SDFG: sdfg = DummyNestedSDFG().__sdfg__() sdfg.name = "_halo_exchange_wait_" @@ -274,7 +473,7 @@ def dace__sdfg_closure__(self, reevaluate: dict[str, str] | None = None) -> dict def dace__sdfg_signature__(self) -> tuple[Sequence[str], Sequence[str]]: return DummyNestedSDFG().__sdfg_signature__() - __sdfg__ = dace__sdfg__ + __sdfg__ = dace__sdfg__ # type: ignore[assignment] __sdfg_closure__ = dace__sdfg_closure__ __sdfg_signature__ = dace__sdfg_signature__ @@ -289,8 +488,8 @@ def create_single_node_halo_exchange_wait(runtime: SingleNodeExchange) -> HaloEx return HaloExchangeWait(runtime) -class SingleNodeResult: - def wait(self) -> None: +class SingleNodeResult(ExchangeResult): + def finish(self, stream: StreamLike | Block = DEFAULT_STREAM) -> None: pass def is_ready(self) -> bool: diff --git a/model/common/src/icon4py/model/common/decomposition/mpi_decomposition.py b/model/common/src/icon4py/model/common/decomposition/mpi_decomposition.py index 7d77432a8f..b893ce2fce 100644 --- a/model/common/src/icon4py/model/common/decomposition/mpi_decomposition.py +++ b/model/common/src/icon4py/model/common/decomposition/mpi_decomposition.py @@ -124,7 +124,7 @@ def comm_size(self) -> int: # type: ignore [override] return self.comm.Get_size() -class GHexMultiNodeExchange: +class GHexMultiNodeExchange(definitions.ExchangeRuntime): max_num_of_fields_to_communicate_dace: Final[int] = ( 10 # maximum number of fields to perform halo exchange on (DaCe-related) ) @@ -244,53 +244,45 @@ def _get_applied_pattern(self, dim: gtx.Dimension, f: gtx.Field | data_alloc.NDA assert f.ndim in (1, 2), "Buffers must be 1d or 2d" return self._patterns[dim](self._make_field_descriptor(dim, f)) - def exchange( - self, dim: gtx.Dimension, *fields: gtx.Field | data_alloc.NDArray + def start( + self, + dim: gtx.Dimension, + *fields: gtx.Field | data_alloc.NDArray, + stream: definitions.StreamLike = definitions.DEFAULT_STREAM, ) -> MultiNodeResult: - """ - Exchange method that slices the fields based on the dimension and then performs halo exchange. - """ + """Synchronize with `stream` and start the halo exchange of `*fields`.""" assert ( dim in dims.MAIN_HORIZONTAL_DIMENSIONS.values() ), f"first dimension must be one of ({dims.MAIN_HORIZONTAL_DIMENSIONS.values()})" applied_patterns = [self._get_applied_pattern(dim, f) for f in fields] - # With https://github.com/ghex-org/GHEX/pull/186, ghex will schedule/sync work on the default stream, - # otherwise we need an explicit device synchronize here. - handle = self._comm.exchange(applied_patterns) + if not ghex.__config__["gpu"]: + # No GPU support fall back to the regular exchange function. + handle = self._comm.exchange(applied_patterns) + else: + assert stream is not None + handle = self._comm.schedule_exchange( + patterns=applied_patterns, + stream=stream, + ) log.debug(f"exchange for {len(fields)} fields of dimension ='{dim.value}' initiated.") return MultiNodeResult(handle, applied_patterns) - def exchange_and_wait( - self, dim: gtx.Dimension, *fields: gtx.Field | data_alloc.NDArray + def exchange( + self, + dim: gtx.Dimension, + *fields: gtx.Field | data_alloc.NDArray, + stream: definitions.StreamLike | definitions.Block = definitions.DEFAULT_STREAM, ) -> None: - res = self.exchange(dim, *fields) - res.wait() + # Fall back to the default implementation provided by the protocol. + super().exchange(dim, *fields, stream=stream) log.debug(f"exchange for {len(fields)} fields of dimension ='{dim.value}' done.") - def __call__(self, *args: Any, dim: gtx.Dimension, wait: bool = True) -> MultiNodeResult | None: # type: ignore[return] # return statment in else condition - """Perform a halo exchange operation. - - Args: - args: The fields to be exchanged. - - Keyword Args: - dim: The dimension along which the exchange is performed. - wait: If True, the operation will block until the exchange is completed (default: True). - """ - if dim is None: - raise ValueError("Need to define a dimension.") - - res = self.exchange(dim, *args) - if wait: - res.wait() - else: - return res - # Implementation of DaCe SDFGConvertible interface def dace__sdfg__( - self, *args: Any, dim: gtx.Dimension, wait: bool = True + self, *args: Any, dim: gtx.Dimension, full_exchange: bool = True ) -> dace.sdfg.sdfg.SDFG: + # NOTE: Streams are not supported here. if len(args) > GHexMultiNodeExchange.max_num_of_fields_to_communicate_dace: raise ValueError( f"Maximum number of fields to communicate is {GHexMultiNodeExchange.max_num_of_fields_to_communicate_dace}. Adapt the max number accordingly." @@ -307,7 +299,14 @@ def dace__sdfg__( } # Field name : Data Descriptor halo_exchange.add_halo_tasklet( - sdfg, state, global_buffers, self, dim, id(self), wait, self.num_of_halo_tasklets + sdfg, + state, + global_buffers, + self, + dim, + id(self), + full_exchange, + self.num_of_halo_tasklets, ) sdfg.arg_names.extend(self.__sdfg_signature__()[0]) @@ -332,19 +331,16 @@ def dace__sdfg_signature__(self) -> tuple[Sequence[str], Sequence[str]]: @dataclass -class HaloExchangeWait: - exchange_object: GHexMultiNodeExchange - +class HaloExchangeWait(definitions.HaloExchangeWaitRuntime): buffer_name: ClassVar[str] = "communication_handle" # DaCe-related - - def __call__(self, communication_handle: MultiNodeResult) -> None: - """Wait on the communication handle.""" - communication_handle.wait() + exchange_object: GHexMultiNodeExchange # Implementation of DaCe SDFGConvertible interface def dace__sdfg__( - self, *args: Any, dim: gtx.Dimension, wait: bool = True + self, *args: Any, dim: gtx.Dimension, full_exchange: bool = True ) -> dace.sdfg.sdfg.SDFG: + # Streams are not supported by the orchestrator. This is the reason why they + # do not accept the `stream` argument. sdfg = dace.SDFG("_halo_exchange_wait_") state = sdfg.add_state() @@ -355,7 +351,7 @@ def dace__sdfg__( "_halo_exchange_wait_", inputs=None, outputs=None, - code=f"h_{id(self.exchange_object)}.wait();\n//__out = 1234;", + code=f"h_{id(self.exchange_object)}.wait();", language=dace.dtypes.Language.CPP, side_effects=False, ) @@ -369,33 +365,16 @@ def dace__sdfg__( state.add_edge( buffer, None, tasklet, "IN_" + buffer_name, dace.Memlet(buffer_name, subset="0") ) - - """ - # noqa: ERA001 - - # Dummy return, otherwise dead-dataflow-elimination kicks in. Return something to generate code. - sdfg.add_scalar(name="__return", dtype=dace.int32) - ret = state.add_write("__return") - state.add_edge(tasklet, "__out", ret, None, dace.Memlet(data="__return", subset="0")) - tasklet.out_connectors["__out"] = dace.int32 - """ - sdfg.arg_names.extend(self.__sdfg_signature__()[0]) - return sdfg def dace__sdfg_closure__(self, reevaluate: dict[str, str] | None = None) -> dict[str, Any]: return {} def dace__sdfg_signature__(self) -> tuple[Sequence[str], Sequence[str]]: - return ( - [ - HaloExchangeWait.buffer_name, - ], - [], - ) + return ([HaloExchangeWait.buffer_name], []) - __sdfg__ = dace__sdfg__ + __sdfg__ = dace__sdfg__ # type: ignore[assignment] __sdfg_closure__ = dace__sdfg_closure__ __sdfg_signature__ = dace__sdfg_signature__ @@ -406,12 +385,23 @@ def create_multinode_halo_exchange_wait(runtime: GHexMultiNodeExchange) -> HaloE @dataclass -class MultiNodeResult: +class MultiNodeResult(definitions.ExchangeResult): handle: Any pattern_refs: Any - def wait(self) -> None: - self.handle.wait() + def finish( + self, + stream: definitions.StreamLike | definitions.Block = definitions.DEFAULT_STREAM, + ) -> None: + """Finish the initiated halo exchange any synchronize with `stream` or block if `stream` is `BLOCK`.""" + if (not ghex.__config__["gpu"]) or stream is definitions.BLOCK: + # No GPU support or blocking wait requested -> use normal `wait()`. + self.handle.wait() + else: + # Stream given, perform a scheduled wait. + self.handle.schedule_wait(stream) + + # TODO(msimberg, phimuell, havogt): Is it safe to delete that here, even in the scheduled mode? del self.pattern_refs def is_ready(self) -> bool: diff --git a/model/common/src/icon4py/model/common/interpolation/interpolation_factory.py b/model/common/src/icon4py/model/common/interpolation/interpolation_factory.py index a22b909fd8..137d6e8cac 100644 --- a/model/common/src/icon4py/model/common/interpolation/interpolation_factory.py +++ b/model/common/src/icon4py/model/common/interpolation/interpolation_factory.py @@ -145,7 +145,9 @@ def _register_computed_fields(self) -> None: geofac_n2s = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_geofac_n2s, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, dims.CellDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.GEOFAC_N2S,), @@ -166,7 +168,9 @@ def _register_computed_fields(self) -> None: geofac_grdiv = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_geofac_grdiv, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.GEOFAC_GRDIV,), @@ -233,7 +237,11 @@ def _register_computed_fields(self) -> None: cell_average_weight = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_mass_conserving_bilinear_cell_average_weight, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, + dims.CellDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.C_BLN_AVG,), @@ -280,7 +288,11 @@ def _register_computed_fields(self) -> None: pos_on_tplane_e_x_y = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_pos_on_tplane_e_x_y, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, + dims.EdgeDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.POS_ON_TPLANE_E_X, attrs.POS_ON_TPLANE_E_Y), @@ -310,7 +322,11 @@ def _register_computed_fields(self) -> None: cell_average_weight = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_mass_conserving_bilinear_cell_average_weight_torus, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, + dims.CellDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.C_BLN_AVG,), @@ -350,7 +366,11 @@ def _register_computed_fields(self) -> None: pos_on_tplane_e_x_y = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_pos_on_tplane_e_x_y_torus, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, + dims.EdgeDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.POS_ON_TPLANE_E_X, attrs.POS_ON_TPLANE_E_Y), @@ -366,7 +386,9 @@ def _register_computed_fields(self) -> None: c_lin_e = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_c_lin_e, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.C_LIN_E,), @@ -387,7 +409,9 @@ def _register_computed_fields(self) -> None: geofac_grg = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_geofac_grg, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, dims.CellDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.GEOFAC_GRG_X, attrs.GEOFAC_GRG_Y), @@ -411,7 +435,9 @@ def _register_computed_fields(self) -> None: e_flx_avg = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_e_flx_avg, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.E_FLX_AVG,), @@ -444,7 +470,11 @@ def _register_computed_fields(self) -> None: cells_aw_verts = factory.NumpyDataProvider( func=functools.partial( interpolation_fields.compute_cells_aw_verts, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.VertexDim), + exchange=functools.partial( + self._exchange.exchange, + dims.VertexDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.CELL_AW_VERTS,), @@ -471,7 +501,9 @@ def _register_computed_fields(self) -> None: rbf_vec_coeff_c = factory.NumpyDataProvider( func=functools.partial( rbf.compute_rbf_interpolation_coeffs_cell, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, dims.CellDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.RBF_VEC_COEFF_C1, attrs.RBF_VEC_COEFF_C2), @@ -510,7 +542,9 @@ def _register_computed_fields(self) -> None: rbf_vec_coeff_e = factory.NumpyDataProvider( func=functools.partial( rbf.compute_rbf_interpolation_coeffs_edge, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), fields=(attrs.RBF_VEC_COEFF_E,), @@ -548,7 +582,11 @@ def _register_computed_fields(self) -> None: rbf_vec_coeff_v = factory.NumpyDataProvider( func=functools.partial( rbf.compute_rbf_interpolation_coeffs_vertex, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.VertexDim), + exchange=functools.partial( + self._exchange.exchange, + dims.VertexDim, + stream=decomposition.BLOCK, + ), array_ns=self._xp, ), fields=(attrs.RBF_VEC_COEFF_V1, attrs.RBF_VEC_COEFF_V2), diff --git a/model/common/src/icon4py/model/common/interpolation/rbf_interpolation.py b/model/common/src/icon4py/model/common/interpolation/rbf_interpolation.py index 4dcb0670c0..ce966a1612 100644 --- a/model/common/src/icon4py/model/common/interpolation/rbf_interpolation.py +++ b/model/common/src/icon4py/model/common/interpolation/rbf_interpolation.py @@ -17,6 +17,7 @@ from gt4py.next import astype from icon4py.model.common import dimension as dims, type_alias as ta +from icon4py.model.common.decomposition import definitions as decomposition from icon4py.model.common.grid import base as base_grid from icon4py.model.common.utils import data_allocation as data_alloc @@ -460,7 +461,7 @@ def index_offset(f): rbf_vec_coeff[j][horizontal_start:] /= array_ns.sum( nxnx[j] * rbf_vec_coeff[j][horizontal_start:], axis=1 )[:, array_ns.newaxis] - exchange(*rbf_vec_coeff) + exchange(*rbf_vec_coeff, stream=decomposition.BLOCK) return rbf_vec_coeff diff --git a/model/common/src/icon4py/model/common/metrics/metrics_factory.py b/model/common/src/icon4py/model/common/metrics/metrics_factory.py index 813a6d4fd8..6aca9f4852 100644 --- a/model/common/src/icon4py/model/common/metrics/metrics_factory.py +++ b/model/common/src/icon4py/model/common/metrics/metrics_factory.py @@ -151,7 +151,9 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen func=functools.partial( v_grid.compute_vertical_coordinate, array_ns=self._xp, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, dims.CellDim, stream=decomposition.BLOCK + ), ), fields=(attrs.CELL_HEIGHT_ON_HALF_LEVEL,), domain=(dims.CellDim, dims.KHalfDim), @@ -640,7 +642,9 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen max_flat_index_provider = factory.NumpyDataProvider( func=functools.partial( mf.compute_flat_max_idx, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), array_ns=self._xp, ), deps={ @@ -756,7 +760,9 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen func=functools.partial( compute_zdiff_gradp_dsl.compute_zdiff_gradp_dsl, array_ns=self._xp, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), ), deps={ "z_mc": attrs.Z_MC, @@ -787,7 +793,7 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen func=functools.partial( compute_coeff_gradekin.compute_coeff_gradekin, array_ns=self._xp, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial(self._exchange.exchange, dims.EdgeDim), ), domain=(dims.EdgeDim, dims.E2CDim), fields=(attrs.COEFF_GRADEKIN,), @@ -817,7 +823,9 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen func=functools.partial( weight_factors.compute_wgtfacq_e_dsl, array_ns=self._xp, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.EdgeDim), + exchange=functools.partial( + self._exchange.exchange, dims.EdgeDim, stream=decomposition.BLOCK + ), ), deps={ "z_ifc": attrs.CELL_HEIGHT_ON_HALF_LEVEL, @@ -879,7 +887,9 @@ def _register_computed_fields(self) -> None: # noqa: PLR0915 [too-many-statemen func=functools.partial( compute_diffusion_metrics.compute_max_nbhgt_array_ns, array_ns=self._xp, - exchange=functools.partial(self._exchange.exchange_and_wait, dims.CellDim), + exchange=functools.partial( + self._exchange.exchange, dims.CellDim, stream=decomposition.BLOCK + ), ), deps={ "z_mc": attrs.Z_MC, diff --git a/model/common/src/icon4py/model/common/orchestration/decorator.py b/model/common/src/icon4py/model/common/orchestration/decorator.py index 834ef47270..16de033401 100644 --- a/model/common/src/icon4py/model/common/orchestration/decorator.py +++ b/model/common/src/icon4py/model/common/orchestration/decorator.py @@ -297,7 +297,7 @@ def wait(comm_handle: int | decomposition.ExchangeResult): if isinstance(comm_handle, int): pass else: - comm_handle.wait() + comm_handle.finish() def to_dace_annotations(fuse_func: Callable) -> dict[str, Any]: diff --git a/model/common/src/icon4py/model/common/states/factory.py b/model/common/src/icon4py/model/common/states/factory.py index c3d0f7656c..4a0012d58c 100644 --- a/model/common/src/icon4py/model/common/states/factory.py +++ b/model/common/src/icon4py/model/common/states/factory.py @@ -100,7 +100,10 @@ class NeedsExchange(Protocol): def needs_exchange(self) -> bool: ... def exchange( - self, fields: Mapping[str, state_utils.FieldType], exchange: decomposition.ExchangeRuntime + self, + fields: Mapping[str, state_utils.FieldType], + exchange: decomposition.ExchangeRuntime, + stream: decomposition.StreamLike | decomposition.Block = decomposition.DEFAULT_STREAM, ) -> None: log.debug(f"provider for fields {fields.keys()} needs exchange {self.needs_exchange()}") if self.needs_exchange(): @@ -113,7 +116,7 @@ def exchange( first_dim in dims.MAIN_HORIZONTAL_DIMENSIONS.values() ), f"1st dimension {first_dim} needs to be one of (CellDim, EdgeDim, VertexDim) for exchange" with as_exchangeable_field(field) as buffer: - exchange.exchange_and_wait(first_dim, buffer) + exchange.exchange(first_dim, buffer, stream=stream) log.debug(f"exchanged buffer for {name}") diff --git a/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py b/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py index 0f0d4c59f1..ed715b2f18 100644 --- a/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py +++ b/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py @@ -263,7 +263,7 @@ def test_exchange_on_dummy_data( dimension, definitions.DecompositionInfo.EntryType.OWNED ) assert np.all(input_field.asnumpy() == number) - exchange.exchange_and_wait(dimension, input_field) + exchange.exchange(dimension, input_field, stream=definitions.BLOCK) result = input_field.asnumpy() print(f"rank={processor_props.rank} - num of halo points ={halo_points.shape}") print( @@ -311,7 +311,7 @@ def test_halo_exchange_for_sparse_field( print( f"{processor_props.rank}/{processor_props.comm_size}: size of computed field {result.asnumpy().shape}" ) - exchange.exchange_and_wait(dims.CellDim, result) + exchange.exchange(dims.CellDim, result, stream=definitions.BLOCK) assert test_helpers.dallclose(result.asnumpy(), field_ref.asnumpy()) diff --git a/model/common/tests/common/utils.py b/model/common/tests/common/utils.py index f3d689155d..602e187dc4 100644 --- a/model/common/tests/common/utils.py +++ b/model/common/tests/common/utils.py @@ -6,9 +6,10 @@ # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause +from typing import Any from icon4py.model.common.utils import data_allocation as data_alloc -def dummy_exchange(*field: data_alloc.NDArray) -> None: +def dummy_exchange(*field: data_alloc.NDArray, stream: Any = None) -> None: return None diff --git a/pyproject.toml b/pyproject.toml index e349356eb7..febd9f3b29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -361,7 +361,8 @@ url = 'https://gridtools.github.io/pypi/' [tool.uv.sources] dace = {index = "gridtools"} -ghex = {git = "https://github.com/msimberg/GHEX.git", branch = "async-mpi"} +# ghex = {git = "https://github.com/msimberg/GHEX.git", branch = "async-mpi"} +ghex = {git = "https://github.com/philip-paul-mueller/GHEX/", branch = "phimuell__async-mpi-2"} # gt4py = {git = "https://github.com/GridTools/gt4py", branch = "main"} # gt4py = {index = "test.pypi"} icon4py-atmosphere-advection = {workspace = true} diff --git a/uv.lock b/uv.lock index 7d547ead32..6f06916761 100644 --- a/uv.lock +++ b/uv.lock @@ -29,11 +29,11 @@ members = [ [[package]] name = "aenum" -version = "3.1.16" +version = "3.1.15" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/09/7a/61ed58e8be9e30c3fe518899cc78c284896d246d51381bab59b5db11e1f3/aenum-3.1.16.tar.gz", hash = "sha256:bfaf9589bdb418ee3a986d85750c7318d9d2839c1b1a1d6fe8fc53ec201cf140", size = 137693, upload-time = "2026-01-12T22:34:38.819Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/f8/33e75863394f42e429bb553e05fda7c59763f0fd6848de847a25b3fbccf6/aenum-3.1.15.tar.gz", hash = "sha256:8cbd76cd18c4f870ff39b24284d3ea028fbe8731a58df3aa581e434c575b9559", size = 134730, upload-time = "2023-06-27T00:19:52.546Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e3/52/6ad8f63ec8da1bf40f96996d25d5b650fdd38f5975f8c813732c47388f18/aenum-3.1.16-py3-none-any.whl", hash = "sha256:9035092855a98e41b66e3d0998bd7b96280e85ceb3a04cc035636138a1943eaf", size = 165627, upload-time = "2025-04-25T03:17:58.89Z" }, + { url = "https://files.pythonhosted.org/packages/d0/fa/ca0c66b388624ba9dbbf35aab3a9f326bfdf5e56a7237fe8f1b600da6864/aenum-3.1.15-py3-none-any.whl", hash = "sha256:e0dfaeea4c2bd362144b87377e2c61d91958c5ed0b4daf89cb6f45ae23af6288", size = 137633, upload-time = "2023-06-27T00:19:55.112Z" }, ] [[package]] @@ -1055,11 +1055,11 @@ wheels = [ [[package]] name = "dill" -version = "0.4.0" +version = "0.3.9" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/12/80/630b4b88364e9a8c8c5797f4602d0f76ef820909ee32f0bacb9f90654042/dill-0.4.0.tar.gz", hash = "sha256:0633f1d2df477324f53a895b02c901fb961bdbf65a17122586ea7019292cbcf0", size = 186976, upload-time = "2025-04-16T00:41:48.867Z" } +sdist = { url = "https://files.pythonhosted.org/packages/70/43/86fe3f9e130c4137b0f1b50784dd70a5087b911fe07fa81e53e0c4c47fea/dill-0.3.9.tar.gz", hash = "sha256:81aa267dddf68cbfe8029c42ca9ec6a4ab3b22371d1c450abc54422577b4512c", size = 187000, upload-time = "2024-09-29T00:03:20.958Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/50/3d/9373ad9c56321fdab5b41197068e1d8c25883b3fea29dd361f9b55116869/dill-0.4.0-py3-none-any.whl", hash = "sha256:44f54bf6412c2c8464c14e8243eb163690a9800dbe2c367330883b19c7561049", size = 119668, upload-time = "2025-04-16T00:41:47.671Z" }, + { url = "https://files.pythonhosted.org/packages/46/d1/e73b6ad76f0b1fb7f23c35c6d95dbc506a9c8804f43dda8cb5b0fa6331fd/dill-0.3.9-py3-none-any.whl", hash = "sha256:468dff3b89520b474c0397703366b7b95eebe6303f108adf9b19da1f702be87a", size = 119418, upload-time = "2024-09-29T00:03:19.344Z" }, ] [[package]] @@ -1309,14 +1309,14 @@ wheels = [ [[package]] name = "fparser" -version = "0.2.1" +version = "0.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "setuptools-scm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/9e/b5/b130088d66fc2a0ef3a83c6694137782059a90c178ddcceda3de9a373cb5/fparser-0.2.1.tar.gz", hash = "sha256:1ca89a760ef23747fc54c53918c03d9165026736d9f0ea6347885bd79fe4be85", size = 439050, upload-time = "2025-09-29T08:34:13.533Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f6/af/570c36d7bc374646ab82f579e2bf9d24a619cc53d83f95b38b0992de3492/fparser-0.2.0.tar.gz", hash = "sha256:3901d31c104062c4e532248286929e7405e43b79a6a85815146a176673e69c82", size = 433559, upload-time = "2024-11-26T08:19:10.683Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c5/ca/9030a90ba06c778f66cf2e8c071fb9a9b80ba034433361f5567d7c5aa322/fparser-0.2.1-py3-none-any.whl", hash = "sha256:bd410ccf96e98699cf86115f07758a9c2a73a059dd7bbcd14f5736003747dca8", size = 643858, upload-time = "2025-09-29T08:34:11.98Z" }, + { url = "https://files.pythonhosted.org/packages/7a/91/03999b30650f5621dd5ec9e8245024dea1b71c4e28e52e0c7300aa0c769d/fparser-0.2.0-py3-none-any.whl", hash = "sha256:49fab105e3a977b9b9d5d4489649287c5060e94c688f9936f3d5af3a45d6f4eb", size = 639408, upload-time = "2024-11-26T08:19:08.856Z" }, ] [[package]] @@ -1362,7 +1362,7 @@ wheels = [ [[package]] name = "ghex" version = "0.4.1" -source = { git = "https://github.com/msimberg/GHEX.git?branch=async-mpi#6d896166994cedbcfc50da1873239a5edb212e3f" } +source = { git = "https://github.com/philip-paul-mueller/GHEX/?branch=phimuell__async-mpi-2#80c0650fdae40bdd40e0435e5687267bada4cdd2" } dependencies = [ { name = "mpi4py" }, { name = "numpy" }, @@ -1891,7 +1891,7 @@ requires-dist = [ { name = "cupy-cuda12x", marker = "extra == 'cuda12'", specifier = ">=13.0" }, { name = "dace", specifier = "==43!2026.1.21", index = "https://gridtools.github.io/pypi/" }, { name = "datashader", marker = "extra == 'io'", specifier = ">=0.16.1" }, - { name = "ghex", marker = "extra == 'distributed'", git = "https://github.com/msimberg/GHEX.git?branch=async-mpi" }, + { name = "ghex", marker = "extra == 'distributed'", git = "https://github.com/philip-paul-mueller/GHEX/?branch=phimuell__async-mpi-2" }, { name = "gt4py", specifier = "==1.1.3" }, { name = "gt4py", extras = ["cuda11"], marker = "extra == 'cuda11'" }, { name = "gt4py", extras = ["cuda12"], marker = "extra == 'cuda12'" }, @@ -3975,16 +3975,16 @@ wheels = [ [[package]] name = "setuptools-scm" -version = "9.2.2" +version = "8.1.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "packaging" }, { name = "setuptools" }, { name = "tomli", marker = "python_full_version < '3.11' or (extra == 'extra-7-icon4py-cuda11' and extra == 'extra-7-icon4py-cuda12')" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/7b/b1/19587742aad604f1988a8a362e660e8c3ac03adccdb71c96d86526e5eb62/setuptools_scm-9.2.2.tar.gz", hash = "sha256:1c674ab4665686a0887d7e24c03ab25f24201c213e82ea689d2f3e169ef7ef57", size = 203385, upload-time = "2025-10-19T22:08:05.608Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4f/a4/00a9ac1b555294710d4a68d2ce8dfdf39d72aa4d769a7395d05218d88a42/setuptools_scm-8.1.0.tar.gz", hash = "sha256:42dea1b65771cba93b7a515d65a65d8246e560768a66b9106a592c8e7f26c8a7", size = 76465, upload-time = "2024-05-06T15:07:56.934Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/3d/ea/ac2bf868899d0d2e82ef72d350d97a846110c709bacf2d968431576ca915/setuptools_scm-9.2.2-py3-none-any.whl", hash = "sha256:30e8f84d2ab1ba7cb0e653429b179395d0c33775d54807fc5f1dd6671801aef7", size = 62975, upload-time = "2025-10-19T22:08:04.007Z" }, + { url = "https://files.pythonhosted.org/packages/a0/b9/1906bfeb30f2fc13bb39bf7ddb8749784c05faadbd18a21cf141ba37bff2/setuptools_scm-8.1.0-py3-none-any.whl", hash = "sha256:897a3226a6fd4a6eb2f068745e49733261a21f70b1bb28fce0339feb978d9af3", size = 43666, upload-time = "2024-05-06T15:07:55.071Z" }, ] [[package]] @@ -4303,14 +4303,14 @@ wheels = [ [[package]] name = "sympy" -version = "1.14.0" +version = "1.12.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "mpmath" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/83/d3/803453b36afefb7c2bb238361cd4ae6125a569b4db67cd9e79846ba2d68c/sympy-1.14.0.tar.gz", hash = "sha256:d3d3fe8df1e5a0b42f0e7bdf50541697dbe7d23746e894990c030e2b05e72517", size = 7793921, upload-time = "2025-04-27T18:05:01.611Z" } +sdist = { url = "https://files.pythonhosted.org/packages/41/8a/0d1bbd33cd3091c913d298746e56f40586fa954788f51b816c6336424675/sympy-1.12.1.tar.gz", hash = "sha256:2877b03f998cd8c08f07cd0de5b767119cd3ef40d09f41c30d722f6686b0fb88", size = 6722359, upload-time = "2024-05-29T17:56:07.991Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" }, + { url = "https://files.pythonhosted.org/packages/61/53/e18c8c97d0b2724d85c9830477e3ebea3acf1dcdc6deb344d5d9c93a9946/sympy-1.12.1-py3-none-any.whl", hash = "sha256:9b2cbc7f1a640289430e13d2a56f02f867a1da0190f2f99d8968c2f74da0e515", size = 5743129, upload-time = "2024-05-29T17:56:05.445Z" }, ] [[package]]