diff --git a/src/c/backend/include/device.hpp b/src/c/backend/include/device.hpp index cd4ab645..2e98a4ff 100644 --- a/src/c/backend/include/device.hpp +++ b/src/c/backend/include/device.hpp @@ -24,13 +24,13 @@ class DeviceRequirement; /** * @brief Architecture types for devices. */ -enum class DeviceType { INVALID = -2, All = -1, CPU = 0, CUDA = 1 }; +enum class DeviceType { INVALID = -2, Any = -1, CPU = 0, GPU = 1 }; inline const constexpr std::array architecture_types{DeviceType::CPU, - DeviceType::CUDA}; + DeviceType::GPU}; inline const constexpr int NUM_DEVICE_TYPES = architecture_types.size(); inline const std::array architecture_names{ - "CPU", "CUDA"}; + "CPU", "GPU"}; /// Devices can be distinguished from other devices /// by a class type and its index. @@ -145,7 +145,7 @@ class Device { class CUDADevice : public Device { public: CUDADevice(DevID_t dev_id, size_t mem_sz, size_t num_vcus, void *py_dev) - : Device(DeviceType::CUDA, dev_id, mem_sz, num_vcus, py_dev, 3) {} + : Device(DeviceType::GPU, dev_id, mem_sz, num_vcus, py_dev, 3) {} private: }; diff --git a/src/c/backend/include/device_manager.hpp b/src/c/backend/include/device_manager.hpp index 9642aa5e..d47e91fe 100644 --- a/src/c/backend/include/device_manager.hpp +++ b/src/c/backend/include/device_manager.hpp @@ -44,12 +44,12 @@ class DeviceManager { } template int get_num_devices() { - if constexpr (T == DeviceType::All) { + if constexpr (T == DeviceType::Any) { return all_devices_.size(); } else if constexpr (T == DeviceType::CPU) { return arch_devices_[static_cast(DeviceType::CPU)].size(); - } else if constexpr (T == DeviceType::CUDA) { - return arch_devices_[static_cast(DeviceType::CUDA)].size(); + } else if constexpr (T == DeviceType::GPU) { + return arch_devices_[static_cast(DeviceType::GPU)].size(); } } @@ -57,19 +57,19 @@ class DeviceManager { switch (dev_type) { case DeviceType::CPU: return get_num_devices(); - case DeviceType::CUDA: - return get_num_devices(); + case DeviceType::GPU: + return get_num_devices(); default: - return get_num_devices(); + return get_num_devices(); } } template std::vector &get_devices() { if constexpr (T == DeviceType::CPU) { return arch_devices_[static_cast(DeviceType::CPU)]; - } else if constexpr (T == DeviceType::CUDA) { - return arch_devices_[static_cast(DeviceType::CUDA)]; - } else if constexpr (T == DeviceType::All) { + } else if constexpr (T == DeviceType::GPU) { + return arch_devices_[static_cast(DeviceType::GPU)]; + } else if constexpr (T == DeviceType::Any) { return all_devices_; } } @@ -87,10 +87,10 @@ class DeviceManager { switch (dev_type) { case DeviceType::CPU: return get_devices(); - case DeviceType::CUDA: - return get_devices(); + case DeviceType::GPU: + return get_devices(); default: - return get_devices(); + return get_devices(); } } diff --git a/src/c/backend/parray_tracker.cpp b/src/c/backend/parray_tracker.cpp index 960260bd..239142df 100644 --- a/src/c/backend/parray_tracker.cpp +++ b/src/c/backend/parray_tracker.cpp @@ -3,7 +3,7 @@ PArrayTracker::PArrayTracker(DeviceManager *device_manager) : device_manager_(device_manager) { this->managed_parrays_.resize( - device_manager->template get_num_devices()); + device_manager->template get_num_devices()); } void PArrayTracker::track_parray(const InnerPArray &parray, DevID_t dev_id) { diff --git a/src/c/backend/policy.cpp b/src/c/backend/policy.cpp index 1cf1c762..155afe20 100644 --- a/src/c/backend/policy.cpp +++ b/src/c/backend/policy.cpp @@ -160,7 +160,7 @@ bool LocalityLoadBalancingMappingPolicy::calc_score_mdevplacement( // multiple times. This vector marks an assigned device and filter it // out at the next device decision. std::vector is_dev_assigned( - this->device_manager_->get_num_devices(), false); + this->device_manager_->get_num_devices(), false); // Iterate requirements of the devices specified in multi-device placement. // All of the member devices should be available. for (DevID_t did = 0; did < placement_reqs_vec.size(); ++did) { @@ -208,7 +208,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( std::vector> *chosen_devices, const std::vector>> &parray_list, - std::vector> *placement_req_options_vec) { + std::vector> + *placement_req_options_vec) { // A set of chosen devices to a task. Score_t best_score{-1}; // If any device was chosen as a candidate device, @@ -242,8 +243,8 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( std::shared_ptr dev_req = std::dynamic_pointer_cast(base_req); Score_t score{0}; - bool is_req_available = calc_score_devplacement( - task, dev_req, mapper, &score, parray_list[0]); + bool is_req_available = calc_score_devplacement(task, dev_req, mapper, + &score, parray_list[0]); if (!is_req_available) { continue; } @@ -263,9 +264,9 @@ void LocalityLoadBalancingMappingPolicy::run_task_mapping( // std::cout << "[Mapper] Task name:" << task->get_name() << ", " << // "Checking arch requirement." // << "\n"; - bool is_req_available = calc_score_archplacement( - task, arch_req, mapper, chosen_dev_req, &chosen_dev_score, - parray_list[0]); + bool is_req_available = + calc_score_archplacement(task, arch_req, mapper, chosen_dev_req, + &chosen_dev_score, parray_list[0]); if (!is_req_available) { continue; } diff --git a/src/python/parla/common/array.py b/src/python/parla/common/array.py index c77264a2..77567dc9 100644 --- a/src/python/parla/common/array.py +++ b/src/python/parla/common/array.py @@ -77,7 +77,7 @@ def copy_from(self, src, target_device_id: int): current_context = get_current_context() current_device = current_context.devices[0] - is_gpu = (current_device.architecture == DeviceType.CUDA) + is_gpu = (current_device.architecture == DeviceType.GPU) if CUPY_ENABLED and isinstance(src, cupy.ndarray): if is_gpu and (src.flags['C_CONTIGUOUS'] or src.flags['F_CONTIGUOUS']): @@ -121,7 +121,7 @@ def copy_from(self, src, target_device_id: int): current_context = get_current_context() current_device = current_context.devices[0] - is_gpu = (current_device.architecture == DeviceType.CUDA) + is_gpu = (current_device.architecture == DeviceType.GPU) if isinstance(src, cupy.ndarray) or isinstance(src, numpy.ndarray): @@ -298,7 +298,7 @@ def clone_here(source, kind=None): current_device = current_content.devices[0] # FIXME: Make this a property of the device - if (current_device.architecture == DeviceType.CUDA) and CUPY_ENABLED: + if (current_device.architecture == DeviceType.GPU) and CUPY_ENABLED: AType = CupyArray() else: AType = NumpyArray() diff --git a/src/python/parla/common/globals.py b/src/python/parla/common/globals.py index a3a68318..72681c12 100644 --- a/src/python/parla/common/globals.py +++ b/src/python/parla/common/globals.py @@ -83,7 +83,7 @@ class DeviceType(IntEnum): INVALID = -2 ANY = -1 CPU = 0 - CUDA = 1 + GPU = 1 class AccessMode(IntEnum): diff --git a/src/python/parla/cython/device.pxd b/src/python/parla/cython/device.pxd index c165f07d..bddeea9c 100644 --- a/src/python/parla/cython/device.pxd +++ b/src/python/parla/cython/device.pxd @@ -8,14 +8,6 @@ from libcpp.vector cimport vector cdef extern from "include/device.hpp" nogil: - cdef enum DeviceType: - All "DeviceType::All" - CPU "DeviceType::CPU" - CUDA "DeviceType::CUDA" - # TODO(hc): For now, we only support CUDA gpu devices. - # Laster, it would be extended to more gpu types - # like for AMD - cdef cppclass Device: Device(string, int, long, long, void*) except + int get_id() except + diff --git a/src/python/parla/cython/device.pyx b/src/python/parla/cython/device.pyx index cc123c5d..66d70e23 100644 --- a/src/python/parla/cython/device.pyx +++ b/src/python/parla/cython/device.pyx @@ -11,7 +11,7 @@ cimport cython from parla.common.globals import _Locals as Locals from parla.common.globals import cupy, CUPY_ENABLED -from parla.common.globals import DeviceType as PyDeviceType +from parla.common.globals import DeviceType from parla.common.globals import VCU_BASELINE, get_device_manager from abc import ABCMeta, abstractmethod @@ -81,13 +81,13 @@ class DeviceConfiguration: """ A dataclass to represent a device configuration. """ - type: PyDeviceType + type: DeviceType = DeviceType.CPU id: int = 0 memory: long = 0 vcus: int = 1000 __annotations__ = { - "type": PyDeviceType, + "type": DeviceType, "id": int, "memory": long, "vcus": int @@ -112,9 +112,9 @@ class PyDevice: This class is to abstract a single device in Python and manages a device context as a task runs in Python. """ - def __init__(self, dev_type: PyDeviceType, dev_type_name, dev_id: int): + def __init__(self, dev_type: DeviceType, dev_type_name, dev_id: int): self._dev_type = dev_type - self._device_name = dev_type_name + ":" + str(dev_id) + self._device_name = f"{dev_type_name}[{str(dev_id)}]" self._device = self self._device_id = dev_id @@ -203,7 +203,7 @@ class PyDevice: return hash(self._device_name) def __eq__(self, other) -> bool: - if isinstance(other, int) or isinstance(other, PyDeviceType): + if isinstance(other, int) or isinstance(other, DeviceType): return self.architecture == other elif isinstance(other, PyDevice): return self._device_name == other._device_name @@ -233,7 +233,7 @@ class PyCUDADevice(PyDevice): """ def __init__(self, dev_id: int = 0, mem_sz: long = 0, num_vcus: long = 1): - super().__init__(DeviceType.CUDA, "CUDA", dev_id) + super().__init__(DeviceType.GPU, "GPU", dev_id) #TODO(wlr): If we ever support VECs, we might need to move this device initialization self._cy_device = CyCUDADevice(dev_id, mem_sz, num_vcus, self) @@ -322,7 +322,7 @@ class PyArchitecture(metaclass=ABCMeta): return self._devices def __eq__(self, o: object) -> bool: - if isinstance(o, int) or isinstance(o, PyDeviceType): + if isinstance(o, int) or isinstance(o, DeviceType): return self.id == o elif isinstance(o, type(self)): return (self.id == o.id) @@ -333,7 +333,7 @@ class PyArchitecture(metaclass=ABCMeta): return hash(self._id) def __repr__(self): - return type(self).__name__ + return f"{self.name}[-1]" def __mul__(self, num_archs: int): arch_ps = [self for i in range(0, num_archs)] @@ -396,7 +396,7 @@ class ImportableArchitecture(PyArchitecture): return self._architecture_type def __repr__(self): - return type(self).__name__ + return f"{self.name}[-1]" def __mul__(self, num_archs: int): #architecture = get_device_manager().get_architecture(self._architecture_type) @@ -414,16 +414,16 @@ class ImportableArchitecture(PyArchitecture): class PyCUDAArchitecture(PyArchitecture): def __init__(self): - super().__init__("CUDAArch", DeviceType.CUDA) + super().__init__("GPU", DeviceType.GPU) class ImportableCUDAArchitecture(PyCUDAArchitecture, ImportableArchitecture): def __init__(self): - ImportableArchitecture.__init__(self, "CUDAArch", DeviceType.CUDA) + ImportableArchitecture.__init__(self, "GPU", DeviceType.GPU) class PyCPUArchitecture(PyArchitecture): def __init__(self): - super().__init__("CPUArch", PyDeviceType.CPU) + super().__init__("CPU", DeviceType.CPU) def add_device(self, device): assert isinstance(device, PyCPUDevice) @@ -431,7 +431,7 @@ class PyCPUArchitecture(PyArchitecture): class ImportableCPUArchitecture(PyCPUArchitecture, ImportableArchitecture): def __init__(self): - ImportableArchitecture.__init__(self, "CPUArch", DeviceType.CPU) + ImportableArchitecture.__init__(self, "CPU", DeviceType.CPU) # TODO(hc): use dataclass later. diff --git a/src/python/parla/cython/device_manager.pyx b/src/python/parla/cython/device_manager.pyx index c9d94009..be11d1d9 100644 --- a/src/python/parla/cython/device_manager.pyx +++ b/src/python/parla/cython/device_manager.pyx @@ -153,7 +153,7 @@ class PyDeviceManager: #self.register_devices_to_cpp() # Initialize Device Hardware Queues - self.stream_pool = StreamPool(self.get_devices(DeviceType.CUDA)) + self.stream_pool = StreamPool(self.get_devices(DeviceType.GPU)) def __dealloc__(self): for arch in self.py_registered_archs: diff --git a/src/python/parla/cython/scheduler.pyx b/src/python/parla/cython/scheduler.pyx index 48b0b63c..4424b546 100644 --- a/src/python/parla/cython/scheduler.pyx +++ b/src/python/parla/cython/scheduler.pyx @@ -140,7 +140,7 @@ class WorkerThread(ControllableThread, SchedulerContext): #TODO(wlr): Fix this in device_manager (see todo there) if CUPY_ENABLED: - gpu_arch = device_manager.py_registered_archs[DeviceType.CUDA] + gpu_arch = device_manager.py_registered_archs[DeviceType.GPU] ngpus = len(gpu_arch) for index in range(ngpus): diff --git a/src/python/parla/cython/tasks.pyx b/src/python/parla/cython/tasks.pyx index c79bf9cd..c5d85cf8 100644 --- a/src/python/parla/cython/tasks.pyx +++ b/src/python/parla/cython/tasks.pyx @@ -15,7 +15,7 @@ from parla.cython import device from parla.common.globals import _Locals as Locals from parla.common.globals import get_stream_pool, get_scheduler -from parla.common.globals import DeviceType as PyDeviceType +from parla.common.globals import DeviceType from parla.common.globals import AccessMode, Storage from parla.common.parray.core import PArray @@ -27,8 +27,6 @@ PyCPUDevice = device.PyCPUDevice PyArchitecture = device.PyArchitecture PyCUDAArchitecture = device.PyCUDAArchitecture -DeviceType = PyDeviceType - from abc import abstractmethod, ABCMeta from typing import Optional, List, Iterable, Union from typing import Awaitable, Collection, Iterable, FrozenSet @@ -769,7 +767,7 @@ def create_device_env(device): if isinstance(device, PyCPUDevice): return CPUEnvironment(device), DeviceType.CPU elif isinstance(device, PyCUDADevice): - return GPUEnvironment(device), DeviceType.CUDA + return GPUEnvironment(device), DeviceType.GPU def create_env(sources): """! @@ -813,7 +811,7 @@ class TaskEnvironment: self.event_dict = {} self.event_dict['default'] = None - for env in environment_list: + for idx, env in enumerate(environment_list): for dev in env.devices: self.device_list.append(dev) self.device_dict[dev.architecture].append(dev) @@ -834,14 +832,14 @@ class TaskEnvironment: """ Returns the CUDA_VISIBLE_DEVICES ids of the GPU devices in this environment. """ - return [device_env.get_parla_device().id for device_env in self.device_dict[DeviceType.CUDA]] + return [device_env.get_parla_device().id for device_env in self.device_dict[DeviceType.GPU]] @property def gpu_id(self): """ Returns the CUDA_VISIBLE_DEVICES id of the first GPU device in this environment. """ - return self.device_dict[DeviceType.CUDA][0].get_parla_device().id + return self.device_dict[DeviceType.GPU][0].get_parla_device().id def __repr__(self): return f"TaskEnvironment({self.env_list})" @@ -886,7 +884,7 @@ class TaskEnvironment: if envlist is None: envlist = self.contexts - for env in envlist: + for idx, env in enumerate(envlist): env.__enter__() yield env env.__exit__(None, None, None) @@ -914,7 +912,7 @@ class TaskEnvironment: return self.devices[0] def get_cupy_devices(self): - return [dev.device for dev in self.get_devices(DeviceType.CUDA)] + return [dev.device for dev in self.get_devices(DeviceType.GPU)] def synchronize(self, events=False, tags=['default'], return_to_pool=True): #print(f"Synchronizing {self}..", flush=True) @@ -1167,6 +1165,11 @@ class TerminalEnvironment(TaskEnvironment): def architecture(self): return self._arch_type + @property + def id(self): + return self._device.id + + def __eq__(self, other): if isinstance(other, int) or isinstance(other, PyDevice): return self._device == other diff --git a/src/python/parla/cython/variants.pyx b/src/python/parla/cython/variants.pyx index 55e076a2..072660a6 100644 --- a/src/python/parla/cython/variants.pyx +++ b/src/python/parla/cython/variants.pyx @@ -29,7 +29,7 @@ class _VariantFunction(object): functools.update_wrapper(self, func) - def variant(self, spec_list, override=False, architecture=None, max_amount=8): + def variant(self, spec_list=None, override=False, architecture=None, max_amount=8): """! @brief Decorator to declare a variant of this function for a specific architecture. diff --git a/src/python/parla/utility/execute.py b/src/python/parla/utility/execute.py index 36825aaf..9562c6f9 100644 --- a/src/python/parla/utility/execute.py +++ b/src/python/parla/utility/execute.py @@ -11,10 +11,7 @@ from .threads import Propagate -from .graphs import LogState, DeviceType, MovementType, DataInitType, TaskID, TaskRuntimeInfo, TaskDataInfo, TaskInfo, DataInfo, TaskTime, TimeSample -from .graphs import RunConfig, GraphConfig, TaskConfig, TaskConfigs, SerialConfig, IndependentConfig, ReductionConfig, ReductionScatterConfig -from .graphs import generate_serial_graph, generate_independent_graph, generate_reduction_graph, generate_reduction_scatter_graph, shuffle_tasks -from .graphs import read_pgraph, parse_blog +from .graphs import * import os import tempfile @@ -23,8 +20,8 @@ import itertools from parla import Parla, spawn, TaskSpace, parray -from parla import sleep_gil as lock_sleep -from parla import sleep_nogil as free_sleep +from parla import sleep_gil +from parla import sleep_nogil from parla.common.array import clone_here from parla.common.globals import get_current_devices, get_current_stream, cupy, CUPY_ENABLED, get_current_context from parla.common.parray.from_data import asarray @@ -84,43 +81,56 @@ class GPUInfo(): # cycles_per_second = 47994628114801.04 cycles_per_second = 1949802881.4819772 - def update_cycles(self, cycles=None): + def update_cycles(self, cycles: float =None): if cycles is None: cycles = estimate_frequency() self.cycles_per_second = cycles - def get_cycles_per_second(self): + def get_cycles(self) -> int: return self.cycles_per_second _GPUInfo = GPUInfo() +@specialize +def free_sleep(duration: float, config: RunConfig = None): + sleep_nogil(duration) + +@free_sleep.variant(architecture=gpu) +def free_sleep_gpu(duration: float, config: RunConfig = None): + """ + Assumes all GPUs on the system are the same. + """ + device = get_current_devices()[0] + stream = get_current_stream() + + cycles_per_second = _GPUInfo.get_cycles() + ticks = int(cycles_per_second * duration) + gpu_bsleep_nogil(device.id, ticks, stream) + + if config.inner_sync: + stream.synchronize() + +@specialize +def lock_sleep(duration: float, config: RunConfig = None): + sleep_gil(duration) + +@lock_sleep.variant(architecture=gpu) +def lock_sleep_gpu(duration: float, config: RunConfig = None): + """ + Assumes all GPUs on the system are the same. + """ + device = get_current_devices()[0] + stream = get_current_stream() + + cycles_per_second = _GPUInfo.get_cycles() + ticks = int(cycles_per_second * duration) + gpu_bsleep_gil(device.id, ticks, stream) + + if config.inner_sync: + stream.synchronize() -def get_placement_set_from(ps_str_set, num_gpus): - ps_set = [] - # TODO(hc): This assumes a single device task. - for ps_str in ps_str_set[0]: - dev_type = int(ps_str) - if dev_type == DeviceType.ANY_GPU_DEVICE: - ps_set.append(gpu) - elif dev_type == DeviceType.CPU_DEVICE: - ps_set.append(cpu) - # TODO(hc): just assume that system has 4 gpus. - elif dev_type == DeviceType.GPU_0: - ps_set.append(gpu(0)) - elif dev_type == DeviceType.GPU_1: - ps_set.append(gpu(1)) - elif dev_type == DeviceType.GPU_2: - ps_set.append(gpu(2)) - elif dev_type == DeviceType.GPU_3: - ps_set.append(gpu(3)) - elif dev_type >= DeviceType.USER_CHOSEN_DEVICE: - gpu_idx = (dev_type - DeviceType.USER_CHOSEN_DEVICE) % num_gpus - ps_set.append(gpu(gpu_idx)) - else: - raise ValueError("Does not support this placement:", dev_type) - return tuple(ps_set) def generate_data(data_config: Dict[int, DataInfo], data_scale: float, data_movement_type) -> List[np.ndarray]: @@ -175,26 +185,68 @@ def generate_data(data_config: Dict[int, DataInfo], data_scale: float, data_move return data_list -@specialize -def synthetic_kernel(total_time: int, gil_fraction: Union[Fraction, float], gil_accesses: int, config: RunConfig): +def get_kernel_info(info: TaskRuntimeInfo, config: RunConfig = None) -> Tuple[float, float, int]: + task_time = info.task_time + gil_fraction = info.gil_fraction + gil_accesses = info.gil_accesses + + if config is not None: + if config.task_time is not None: + task_time = config.task_time + if config.gil_accesses is not None: + gil_accesses = config.gil_accesses + if config.gil_fraction is not None: + gil_fraction = config.gil_fraction + + kernel_time = task_time / max(gil_accesses, 1) + free_time = kernel_time * (1 - gil_fraction) + gil_time = kernel_time * gil_fraction + + return (free_time, gil_time), gil_accesses + +def convert_context_to_devices(context): + device_list = [] + for device in context.devices: + if device.architecture.name == "CPU": + dev = Device(Architecture.CPU, device.id) + elif device.architecture.name == "GPU": + dev = Device(Architecture.GPU, device.id) + else: + raise ValueError(f"Invalid architecture: {device.architecture.name}") + device_list.append(dev) + return tuple(device_list) + + + +def synthetic_kernel(runtime_info: TaskPlacementInfo, config: RunConfig): """ A simple synthetic kernel that simulates a task that takes a given amount of time and accesses the GIL a given number of times. The GIL is accessed in a fraction of the total time given. """ + if config.verbose: task_internal_start_t = time.perf_counter() - # Simulate task work - kernel_time = total_time / gil_accesses - free_time = kernel_time * (1 - gil_fraction) - gil_time = kernel_time * gil_fraction - - #print(f"gil accesses: {gil_accesses}, free time: {free_time}, gil time: {gil_time}", flush=True) - - for i in range(gil_accesses): - free_sleep(free_time) - lock_sleep(gil_time) + context = get_current_context() + devices = convert_context_to_devices(context) + details = runtime_info[devices] + + if len(devices) == 0: + raise ValueError("No devices provided to busy sleep kernel.") + if len(devices) != len(details): + raise ValueError("Not enough TaskRuntimeInfo provided to busy sleep kernel. Must be equal to number of devices.") + + info = [] + for idx, device in enumerate(devices): + if isinstance(details, TaskRuntimeInfo): + info.append(details) + else: + info.append(details[idx]) + if info is None: + raise ValueError(f"TaskRuntimeInfo cannot be None for {device}. Please check the runtime info passed to the task.") + + waste_time(info, config) if config.verbose: task_internal_end_t = time.perf_counter() @@ -203,342 +255,184 @@ def synthetic_kernel(total_time: int, gil_fraction: Union[Fraction, float], gil_ return None +@specialize +def waste_time(info_list: List[TaskRuntimeInfo], config: RunConfig): + + if len(info_list) == 0: + raise ValueError("No TaskRuntimeInfo provided to busy sleep kernel.") + + info = info_list[0] -@synthetic_kernel.variant(architecture=gpu) -def synthetic_kernel_gpu(total_time: int, gil_fraction: Union[Fraction, float], gil_accesses: int, config: RunConfig): - """ - A simple synthetic kernel that simulates a task that takes a given amount of time - and accesses the GIL a given number of times. The GIL is accessed in a fraction of - the total time given. - """ - if config.verbose: - task_internal_start_t = time.perf_counter() - - # Simulate task work - cycles_per_second = _GPUInfo.get_cycles_per_second() - kernel_time = total_time / gil_accesses - - free_time = kernel_time * (1 - gil_fraction) - gil_time = kernel_time * gil_fraction - - free_ticks = int((free_time/(10**6))*cycles_per_second) - gil_ticks = int((gil_time/(10**6))*cycles_per_second) - - # print(f"gil accesses: {gil_accesses}, free time: {free_time}, gil time: {gil_time}") - for i in range(gil_accesses): - print(dev_id[0]().device_id, parla_cuda_stream.stream, flush=True) - gpu_bsleep_nogil(dev_id[0]().device_id, int( - ticks), parla_cuda_stream.stream) - parla_cuda_stream.stream.synchronize() - lock_sleep(gil_time) - - context = get_current_context() - - for device in context.loop(): - device_idx = device.gpu_id - stream = device.cupy_stream + (free_time, gil_time), gil_accesses = get_kernel_info(info, config=config) + if gil_accesses == 0: + free_sleep(free_time) + return + + else: for i in range(gil_accesses): + free_sleep(free_time) + lock_sleep(gil_time) - gpu_bsleep_nogil(device_idx, free_ticks, stream) - gpu_bsleep_gil(device_idx, gil_ticks, stream) +@waste_time.variant(architecture=gpu) +def waste_time_gpu(info_list: List[TaskRuntimeInfo], config: RunConfig): - if config.inner_sync: - stream.synchronize() + context = get_current_context() + if len(info_list) < len(context.devices): + raise ValueError("Not enough TaskRuntimeInfo provided to busy sleep kernel. Must be equal to number of devices.") + for idx, device in enumerate(context.loop()): + print("Device: ", device) + info = info_list[idx] + (free_time, gil_time), gil_accesses = get_kernel_info(info, config=config) + if gil_accesses == 0: + free_sleep(free_time, config=config) + else: + for i in range(gil_accesses): + free_sleep(free_time, config=config) + lock_sleep(gil_time, config=config) + if config.outer_sync: context.synchronize() - if config.verbose: - task_internal_end_t = time.perf_counter() - task_internal_duration = task_internal_end_t - task_internal_start_t - return task_internal_duration - - task_internal_end_t = time.perf_counter() - task_internal_duration = task_internal_end_t - task_internal_start_t - # print("Wall clock duration:", task_internal_duration, ", user passed total time:", total_time, ", ticks:", ticks , flush=True) - - return None +def build_parla_device(mapping: Device, runtime_info: TaskRuntimeInfo): -def create_task_no_data(task, taskspaces, config, data_list=None): - - try: - # Task ID - task_idx = task.task_id.task_idx - taskspace = taskspaces[task.task_id.taskspace] - - # Dependency Info - dependencies = [taskspaces[dep.taskspace][dep.task_idx] - for dep in task.task_dependencies] - - # Valid Placement Set - num_gpus = config.num_gpus - placement_key = task.task_runtime.keys() - placement_set_str = list(placement_key) - placement_set = get_placement_set_from(placement_set_str, num_gpus) + if mapping.architecture == Architecture.CPU: + arch = cpu + elif mapping.architecture == Architecture.GPU: + arch = gpu + elif mapping.architecture == Architecture.ANY: + raise NotImplementedError("ANY architecture not supported for Parla devices.") + else: + raise ValueError(f"Invalid architecture: {mapping.architecture}") + + device_memory = runtime_info.memory + device_fraction = runtime_info.device_fraction - print("placement key: ", placement_key) - print("placement set str: ", placement_set_str) - print("placement set: ", placement_set) + #Instatiate the Parla device object (may require scheduler to be active) + if mapping.device_id != -1: + device = arch(mapping.device_id)[{'memory': device_memory, 'vcus': device_fraction}] + else: + device = arch[{'memory': device_memory, 'vcus': device_fraction}] - # TODO: This needs rework with Device support - # TODO(hc): This assumes that this task is a single task - # and does not have multiple placement options. - runtime_info = task.task_runtime[placement_set_str[0]] + return device - # Task Constraints - device_fraction = runtime_info.device_fraction - if config.device_fraction is not None: - device_fraction = config.device_fraction +def build_parla_device_tuple(mapping: Device | Tuple[Device], runtime_info: TaskPlacementInfo): - # Task Work - total_time = runtime_info.task_time - gil_accesses = runtime_info.gil_accesses - gil_fraction = runtime_info.gil_fraction + if isinstance(mapping, Device): + mapping = (mapping,) + + device_constraints = runtime_info[mapping] - if config.task_time is not None: - total_time = config.task_time + if device_constraints is None: + raise ValueError(f"Device constraints cannot be None for {mapping}. Please check the runtime info passed to the task.") + if len(device_constraints) != len(mapping): + raise ValueError(f"Device constraints must be the same length as the mapping. Please check the runtime info passed to the task.") + + device_list = [] - if config.gil_accesses is not None: - gil_accesses = config.gil_accesses + for idx, device in enumerate(mapping): + if device is None: + raise ValueError(f"Device cannot be None in mapping. Please check the runtime info passed to the task.") + + parla_device = build_parla_device(device, device_constraints[idx]) + device_list.append(parla_device) + + return tuple(device_list) - if config.gil_fraction is not None: - gil_fraction = config.gil_fraction +def build_parla_placement(mapping: Device | Tuple[Device] | None, task_placment_info: TaskPlacementInfo): - # print("task idx:", task_idx, " dependencies:", dependencies, " vcu:", device_fraction, - # " placement:", placement_set, " placement key:", placement_set_str) + if mapping is None: + mapping_list = task_placment_info.locations + mapping_list = [build_parla_device_tuple(mapping, task_placment_info) for mapping in mapping_list] + return mapping_list + + return [build_parla_device_tuple(mapping, task_placment_info)] + - @spawn(taskspace[task_idx], dependencies=dependencies, vcus=device_fraction, placement=[placement_set]) - async def task_func(): - if config.verbose: - print(f"+{task.task_id} Running", flush=True) +def parse_task_info(task: TaskInfo, taskspaces: Dict[str, TaskSpace], config: RunConfig, data_list: List): + """ + Parse a tasks configuration into Parla objects to launch the task. + """ - elapsed = synthetic_kernel(total_time, gil_fraction, - gil_accesses, config=config) + # Task ID + task_idx = task.id.task_idx + taskspace = taskspaces[task.id.taskspace] + task_name = task.id - if config.verbose: - print(f"-{task.task_id} Finished: {elapsed} seconds", flush=True) + # Dependency Info (List of Parla Tasks) + dependencies = [taskspaces[dep.taskspace][dep.task_idx] for dep in task.dependencies] - except Exception as e: - print(f"Failed creating Task {task.task_id}: {e}", flush=True) - finally: - return + # Valid Placement Set + placement_info = task.runtime + placement_list = build_parla_placement(task.mapping, placement_info) + #Data information + data_information = task.data_dependencies -def create_task_eager_data(task, taskspaces, config=None, data_list=None): - try: - # Task ID - task_idx = task.task_id.task_idx - taskspace = taskspaces[task.task_id.taskspace] - - # Dependency Info - dependencies = [taskspaces[dep.taskspace][dep.task_idx] - for dep in task.task_dependencies] - - # Valid Placement Set - num_gpus = config.num_gpus - placement_key = task.task_runtime.keys() - placement_set_str = list(placement_key) - placement_set = get_placement_set_from(placement_set_str, num_gpus) - - # In/out/inout data information - # read: list, write: list, read_write: list - data_information = task.data_dependencies + #TODO(wlr): Fix Lazy Movement + if config.movement_type == MovementType.EAGER_MOVEMENT: read_data_list = data_information.read write_data_list = data_information.write rw_data_list = data_information.read_write + else: + read_data_list = [] + write_data_list = [] + rw_data_list = [] + + # Remove duplicated data blocks between in/out and inout + if len(read_data_list) > 0 and len(rw_data_list) > 0: + read_data_list = list( + set(read_data_list).difference(set(rw_data_list))) + if len(write_data_list) > 0 and len(rw_data_list) > 0: + write_data_list = list( + set(write_data_list).difference(set(rw_data_list))) + + # Construct data blocks. + INOUT = [] if len(rw_data_list) == 0 else [ + data_list[d] for d in rw_data_list + ] + + IN = [] if len(read_data_list) == 0 else [ + data_list[d] for d in read_data_list + ] + OUT = [] if len(write_data_list) == 0 else [ + data_list[d] for d in write_data_list + ] + + return task_name, (task_idx, taskspace, dependencies, placement_list), (IN, OUT, INOUT), placement_info - # Remove duplicated data blocks between in/out and inout - if len(read_data_list) > 0 and len(rw_data_list) > 0: - read_data_list = list( - set(read_data_list).difference(set(rw_data_list))) - if len(write_data_list) > 0 and len(rw_data_list) > 0: - write_data_list = list( - set(write_data_list).difference(set(rw_data_list))) - - """ - print("RW data list:", rw_data_list) - print("R data list:", read_data_list) - print("W data list:", write_data_list) - print("Data list:", data_list) - """ - - # Construct data blocks. - INOUT = [] if len(rw_data_list) == 0 else [ - (data_list[d], 0) for d in rw_data_list] - IN = [] if len(read_data_list) == 0 else [(data_list[d], 0) - for d in read_data_list] - OUT = [] if len(write_data_list) == 0 else [(data_list[d], 0) - for d in write_data_list] - - # TODO: This needs rework with Device support - # TODO(hc): This assumes that this task is a single task - # and does not have multiple placement options. - runtime_info = task.task_runtime[placement_set_str[0]] - - # Task Constraints - device_fraction = runtime_info.device_fraction - if config.device_fraction is not None: - device_fraction = config.device_fraction - - # Task Work - total_time = runtime_info.task_time - gil_accesses = runtime_info.gil_accesses - gil_fraction = runtime_info.gil_fraction - - if config.task_time is not None: - total_time = config.task_time - - if config.gil_accesses is not None: - gil_accesses = config.gil_accesses - - if config.gil_fraction is not None: - gil_fraction = config.gil_fraction - - # print("Eager data in:", IN, " out:", OUT, " inout:", INOUT, flush=True) - """ - print("task idx:", task_idx, " dependencies:", dependencies, " vcu:", device_fraction, - " placement:", placement_set) - """ - - # TODO(hc): Add data checking. - @spawn(taskspace[task_idx], dependencies=dependencies, vcus=device_fraction, placement=[placement_set], input=IN, output=OUT, inout=INOUT) - async def task_func(): - if config.verbose: - print(f"+{task.task_id} Running", flush=True) - - elapsed = synthetic_kernel(total_time, gil_fraction, - gil_accesses, config=config) - - if config.verbose: - print(f"-{task.task_id} Finished: {elapsed} seconds", flush=True) - - except Exception as e: - print(f"Failed creating Task {task.task_id}: {e}", flush=True) - finally: - return - - -def create_task_lazy_data(task, taskspaces, config=None, data_list=None): - +def create_task(task_name, task_info, data_info, runtime_info, config: RunConfig): try: - # Task ID - task_idx = task.task_id.task_idx - taskspace = taskspaces[task.task_id.taskspace] - - # Dependency Info - dependencies = [taskspaces[dep.taskspace][dep.task_idx] - for dep in task.task_dependencies] - - # Valid Placement Set - num_gpus = config.num_gpus - placement_key = task.task_runtime.keys() - placement_set_str = list(placement_key) - placement_set = get_placement_set_from(placement_set_str, num_gpus) - - # In/out/inout data information - # read: list, write: list, read_write: list - data_information = task.data_dependencies - read_data_list = data_information.read - write_data_list = data_information.write - rw_data_list = data_information.read_write - - # Remove duplicated data blocks between in/out and inout - if len(read_data_list) > 0 and len(rw_data_list) > 0: - read_data_list = list( - set(read_data_list).difference(set(rw_data_list))) - if len(write_data_list) > 0 and len(rw_data_list) > 0: - write_data_list = list( - set(write_data_list).difference(set(rw_data_list))) - - """ - print("RW data list:", rw_data_list) - print("R data list:", read_data_list) - print("W data list:", write_data_list) - print("Data list:", data_list) - """ - - # TODO: This needs rework with Device support - # TODO(hc): This assumes that this task is a single task - # and does not have multiple placement options. - runtime_info = task.task_runtime[placement_set_str[0]] - - # Task Constraints - device_fraction = runtime_info.device_fraction - if config.device_fraction is not None: - device_fraction = config.device_fraction - - # Task Work - total_time = runtime_info.task_time - gil_accesses = runtime_info.gil_accesses - gil_fraction = runtime_info.gil_fraction - - if config.task_time is not None: - total_time = config.task_time - - if config.gil_accesses is not None: - gil_accesses = config.gil_accesses - - if config.gil_fraction is not None: - gil_fraction = config.gil_fraction - print("task idx:", task_idx, " dependencies:", dependencies, " vcu:", device_fraction, - " placement:", placement_set) - - @spawn(taskspace[task_idx], dependencies=dependencies, vcus=device_fraction, placement=[placement_set]) + task_idx, T, dependencies, placement_set = task_info + IN, OUT, INOUT = data_info + + @spawn(T[task_idx], dependencies=dependencies, placement=placement_set, input=IN, inout=INOUT) async def task_func(): - if config.verbose: - print(f"+{task.task_id} Running", flush=True) - - local_data = dict() - for d in itertools.chain(read_data_list, rw_data_list): - data = data_list[d] - where = -1 if isinstance(data, np.ndarray) else data.device.id - local_data[d] = clone_here(data) - old = None - if config.do_check: - old = np.copy(data[0, 1]) - local_data[d][0, 1] = -old - if config.verbose: - print( - f"=Task {task_idx} moved Data[{d}] from Device[{where}]. Block=[{local_data[d][0, 0]}] | Value=[{local_data[d][0, 1]}], <{old}>", flush=True) - - elapsed = synthetic_kernel(total_time, gil_fraction, - gil_accesses, config=config) + if config.verbose: + print(f"+{task_name} Running", flush=True) - for d in itertools.chain(write_data_list, rw_data_list): - data_list[d] = local_data[d] + elapsed = synthetic_kernel(runtime_info, config=config) if config.verbose: - print(f"-{task.task_id} Finished: {elapsed} seconds", flush=True) + print(f"-{task_name} Finished: {elapsed} seconds", flush=True) except Exception as e: - print(f"Failed creating Task {task.task_id}: {e}", flush=True) + print(f"Failed creating Task {task_name}: {e}", flush=True) finally: return - def execute_tasks(taskspaces, tasks: Dict[TaskID, TaskInfo], run_config: RunConfig, data_list=None): spawn_start_t = time.perf_counter() # Spawn tasks for task, details in tasks.items(): - # print("task:", task, ", details:", details) - if run_config.movement_type == MovementType.NO_MOVEMENT: - # print("No data movement") - create_task_no_data(details, taskspaces, - config=run_config, data_list=data_list) - elif run_config.movement_type == MovementType.EAGER_MOVEMENT: - # print("Eager data movement") - create_task_eager_data(details, taskspaces, - config=run_config, data_list=data_list) - elif run_config.movement_type == MovementType.LAZY_MOVEMENT: - # print("Lazy data movement") - create_task_lazy_data(details, taskspaces, - config=run_config, data_list=data_list) + task_name, task_info, data_info, runtime_info = parse_task_info(details, taskspaces, run_config, data_list) + create_task(task_name, task_info, data_info, runtime_info, run_config) spawn_end_t = time.perf_counter() @@ -547,7 +441,7 @@ def execute_tasks(taskspaces, tasks: Dict[TaskID, TaskInfo], run_config: RunConf def execute_graph(data_config: Dict[int, DataInfo], tasks: Dict[TaskID, TaskInfo], run_config: RunConfig, timing: List[TimeSample]): - @spawn(vcus=0) + @spawn(vcus=0, placement=cpu) async def main_task(): graph_times = [] @@ -560,7 +454,7 @@ async def main_task(): taskspaces = {} for task, details in tasks.items(): - space_name = details.task_id.taskspace + space_name = details.id.taskspace if space_name not in taskspaces: taskspaces[space_name] = TaskSpace(space_name) @@ -617,7 +511,7 @@ def verify_order(log_times: Dict[TaskID, TaskTime], truth_graph: Dict[TaskID, Li details = truth_graph[task] - for dependency in details.task_dependencies: + for dependency in details.dependencies: if log_times[task].start_t < log_times[dependency].end_t: print("Task {task} started before dependency {dependency}") return False @@ -634,7 +528,7 @@ def verify_dependencies(log_graph: Dict[TaskID, List[TaskID]], truth_graph: Dict details = truth_graph[task] - for dependency in details.task_dependencies: + for dependency in details.dependencies: if dependency not in log_graph: print( f"Dependency {dependency} of task {task} not in log graph") @@ -666,7 +560,7 @@ def verify_time(log_times: Dict[TaskID, TaskTime], truth_graph: Dict[TaskID, Lis # TODO: This needs to be fixed for device support device_idx = (-1,) # CPU - expected_time = details.task_runtime[device_idx].task_time + expected_time = details.runtime[device_idx].task_time observed_time = log_times[task].duration / 1000 if observed_time > expected_time * factor: diff --git a/src/python/parla/utility/graphs.py b/src/python/parla/utility/graphs.py index 35ebff40..cdd2713e 100644 --- a/src/python/parla/utility/graphs.py +++ b/src/python/parla/utility/graphs.py @@ -4,14 +4,12 @@ @brief Provides the core classes for representing and generating synthetic task graphs. """ -import pprint -import os -from ast import literal_eval as make_tuple + from fractions import Fraction import numpy as np -import subprocess import re -from typing import NamedTuple, Union, List, Dict, Tuple + +from typing import NamedTuple, Union, List, Dict, Tuple, FrozenSet from dataclasses import dataclass, field import tempfile import time @@ -19,256 +17,27 @@ from collections import defaultdict -# Synthetic Graphs ENUMS - -# Assume that a system has 4 gpus. -num_gpus = 4 - - -class DeviceType(IntEnum): - """ - Used to specify the valid placement of a device in a synthetic task graph - """ - ANY_DEVICE = -2 - CPU_DEVICE = -1 - ANY_GPU_DEVICE = 0 - GPU_0 = 1 - GPU_1 = 2 - GPU_2 = 3 - GPU_3 = 4 - USER_CHOSEN_DEVICE = 5 - - -class LogState(IntEnum): - """ - Specifies the meaning of a log line. Used for parsing the log file. - """ - ADDING_DEPENDENCIES = 0 - ADD_CONSTRAINT = 1 - ASSIGNED_TASK = 2 - START_TASK = 3 - RUNAHEAD_TASK = 4 - NOTIFY_DEPENDENTS = 6 - COMPLETED_TASK = 6 - UNKNOWN = 7 - - -class MovementType(IntEnum): - """ - Used to specify the type of data movement to be used in a synthetic task graph execution. - """ - NO_MOVEMENT = 0 - LAZY_MOVEMENT = 1 - EAGER_MOVEMENT = 2 - - -class DataInitType(IntEnum): - """ - Used to specify the data movement pattern and initialization in a synthetic task graph execution. - """ - NO_DATA = 0 - INDEPENDENT_DATA = 1 - OVERLAPPED_DATA = 2 - - -class TaskID(NamedTuple): - """ - The identifier for a task in a synthetic task graph. - """ - taskspace: str = "T" # The task space the task belongs to - task_idx: Tuple[int] = (0,) # The index of the task in the task space - # How many times the task has been spawned (continuation number) - instance: int = 0 - - -class TaskRuntimeInfo(NamedTuple): - """ - The collection of important runtime information / constraints for a task in a synthetic task graph. - """ - task_time: float - device_fraction: Union[float, Fraction] - gil_accesses: int - gil_fraction: Union[float, Fraction] - memory: int - - -class TaskDataInfo(NamedTuple): - """ - The data dependencies for a task in a synthetic task graph. - """ - read: list[int] - write: list[int] - read_write: list[int] - - -class TaskInfo(NamedTuple): - """ - The collection of important information for a task in a synthetic task graph. - """ - task_id: TaskID - task_runtime: Dict[Tuple[int, ...], TaskRuntimeInfo] - task_dependencies: list[TaskID] - data_dependencies: TaskDataInfo - - -class DataInfo(NamedTuple): - """ - The collection of important information for a data object in a synthetic task graph. - """ - idx: int - size: int - location: int - - -class TaskTime(NamedTuple): - """ - The parsed timing information from a task from an execution log. - """ - assigned_t: float - start_t: float - end_t: float - duration: float - - -class TimeSample(NamedTuple): - """ - A collection of timing information. - """ - mean: float - median: float - std: float - min: float - max: float - n: int - - -@dataclass -class TaskConfig: - """ - Constraint configuration for a task on a device type in a synthetic task graph. - """ - task_time: int = 1000 - gil_accesses: int = 1 - gil_fraction: float = 0 - device_fraction: float = 0.25 - memory: int = 0 - - -@dataclass -class TaskConfigs: - """ - Holds the map of devices to task configurations for a synthetic task graph. - """ - configurations: Dict[Tuple, TaskConfig] = field(default_factory=dict) - - def add(self, device_id, TaskConfig): - - if not isinstance(device_id, tuple): - device_id = (device_id,) - - device_id = tuple(int(d) for d in device_id) - - self.configurations[device_id] = TaskConfig - - def remove(self, device_id): - del self.configurations[device_id] - - -@dataclass -class GraphConfig: - """ - Configures information about generating the synthetic task graph. - """ - task_config: TaskConfigs = None - fixed_placement: bool = False - data_pattern: int = DataInitType.NO_DATA - total_data_width: int = 2**23 - data_partitions: int = 1 - num_gpus: int = 4 - - -@dataclass -class IndependentConfig(GraphConfig): - """ - Used to configure the generation of an independent synthetic task graph. - """ - task_count: int = 1 - +from .types import * +from .load import * -@dataclass -class SerialConfig(GraphConfig): - """ - Used to configure the generation of a serial synthetic task graph. - """ - steps: int = 1 # Number of steps in the serial graph chain - # Number of dependency backlinks per task (used for stress testing) - dependency_count: int = 1 - chains: int = 1 # Number of chains to generate that can run in parallel - - -@dataclass -class ReductionConfig(GraphConfig): - """ - Used to configure the generation of a reduction synthetic task graph. - """ - levels: int = 8 # Number of levels in the tree - branch_factor: int = 2 # Number of children per node +# try: +# from rich import print +# from rich.traceback import install +# install(show_locals=False, max_frames=2) +# except ImportError: +# pass +# Synthetic Graphs ENUMS -@dataclass -class ReductionScatterConfig(GraphConfig): - """ - Used to configure the generation of a reduction-scatter task graph. - """ - # The total number of tasks. - # The number of tasks for each level is calculated based on this. - # e.g., 1000 total tasks and 4 levels, then about 333 tasks exist for each level - # with 2 bridge tasks. - task_count: int = 1 - levels: int = 4 # Number of levels in the tree +graph_generators = [] -@dataclass -class RunConfig: - """ - Configuration object for executing a synthetic task graph. - """ - outer_iterations: int = 1 # Number of times to launch the Parla runtime and execute the task graph - # Number of times to execute the task graph within the same Parla runtime - inner_iterations: int = 1 - inner_sync: bool = False # Whether to synchronize after each kernel launch - outer_sync: bool = False # Whether to synchronize at the end of the task - verbose: bool = False # Whether to print the task graph to the console - device_fraction: float = None # VCUs - data_scale: float = None # Scaling factor to increase the size of the data objects - threads: int = None # Number of threads to use for the Parla runtime - # Total time for all tasks (this overrides the time in the graphs) - task_time: float = None - # Fraction of time spent in the GIL (this overrides the time in the graphs) - gil_fraction: float = None - # Number of kernel launches/GIL accesses per task (this overrides the time in the graphs) - gil_accesses: int = None - movement_type: int = MovementType.NO_MOVEMENT # The data movement pattern to use - logfile: str = "testing.blog" # The log file location - do_check: bool = False # If this is true, validate configuration/execution - num_gpus: int = 4 - - # TODO(hc): it is duplicated with GraphConfig. - #Comment(wlr): No it's not. This represents the number of GPUs to run on. GraphConfig is the number of GPUs to define in the graph. - - -task_filter = re.compile(r'InnerTask\{ .*? \}') - - -def convert_to_dictionary(task_list: List[TaskInfo]) -> Dict[TaskID, TaskInfo]: +def register_graph_generator(func): """ - Converts a task list to a task graph dictionary + Registers a graph generator function to be used for generating synthetic task graphs. """ - task_dict = dict() - for task in task_list: - task_dict[task.task_id] = task - - return task_dict + graph_generators.append(func) + return func def shuffle_tasks(tasks: Dict[TaskID, TaskInfo]) -> Dict[TaskID, TaskInfo]: @@ -280,426 +49,221 @@ def shuffle_tasks(tasks: Dict[TaskID, TaskInfo]) -> Dict[TaskID, TaskInfo]: return convert_to_dictionary(task_list) -def extract(string: str) -> Union[int, Fraction]: - """ - Extracts string as decimal or int - """ - if "." in string: - return Fraction(string) - else: - return int(string) +def get_data_placement(idx, config): + data_config = config.data_config + if data_config.architecture == Architecture.CPU: + return Device(Architecture.CPU, 0) + if data_config.architecture == Architecture.GPU: + return Device(Architecture.GPU, idx % config.n_devices) -def read_pgraph(filename: str) -> Tuple[Dict[int, DataInfo], Dict[TaskID, TaskInfo]]: + +def check_config(config: GraphConfig): """ - Reads a pgraph file and returns: - 1. A list of the nodes in the graph - 2. The initial data configuration + Raise warnings for invalid configuration specifications. """ + if config is None: + raise ValueError( + f"Graph Configuration file must be specified: {config}") - task_list = [] - data_config = dict() - - with open(filename, 'r') as graph_file: - - lines = graph_file.readlines() - - # Read the initial data configuration - data_info = lines.pop(0) - data_info = data_info.split(',') - idx = 0 - for data in data_info: - info = data.strip().strip("{}").strip().split(":") - size = int(info[0].strip()) - location = int(info[1].strip()) - data_config[idx] = DataInfo(idx, size, location) - idx += 1 - - # print("Data Config", data_config) - # Read the task graph - for line in lines: - - task = line.split("|") - # Breaks into [task_id, task_runtime, task_dependencies, data_dependencies] - - # Process task id (can't be empty) - ids = task[0].strip() - ids = make_tuple(ids) - - if not isinstance(ids, tuple): - ids = (ids,) - - if isinstance(ids[0], str) and ids[0].isalpha(): - taskspace = ids[0] - idx = ids[1] - - if not isinstance(idx, tuple): - idx = (idx, ) - - task_ids = TaskID(taskspace, idx, 0) - else: - taskspace = "T" - task_ids = TaskID(taskspace, ids, 0) - - # Process task runtime (can't be empty) - configurations = task[1].strip().split("},") - task_runtime = dict() - for config in configurations: - config = config.strip().strip("{}").strip() - config = config.split(":") - - targets = config[0].strip().strip("()").strip().split(",") - targets = [int(target.strip()) - for target in targets if target.strip() != ""] - target = tuple(targets) - - details = config[1].strip().split(",") - - details = [extract(detail.strip()) for detail in details] - details = TaskRuntimeInfo(*details) - - task_runtime[target] = details - - # Process task dependencies (can be empty) - if len(task) > 2: - dependencies = task[2].split(":") - if (len(dependencies) > 0) and (not dependencies[0].isspace()): - task_dependencies = [] - - for i in range(len(dependencies)): - if not dependencies[i].isspace(): - ids = dependencies[i].strip() - - ids = make_tuple(ids) - - if not isinstance(ids, tuple): - ids = (ids,) - - if isinstance(ids[0], str) and ids[0].isalpha(): - name, idx = ids[0], ids[1] - - if not isinstance(idx, tuple): - idx = (idx, ) - dep_id = TaskID(name, idx, 0) - - else: - dep_id = TaskID(taskspace, ids, 0) - - task_dependencies.append(dep_id) - else: - task_dependencies = [] - - else: - task_dependencies = [] - - task_dependencies = task_dependencies - - # Process data dependencies (can be empty) - if len(task) > 3: - # Split into [read, write, read/write] - types = task[3].split(":") - - check = [not t.isspace() for t in types] - - if any(check): - task_data = [[], [], []] - - for i in range(len(types)): - if check[i]: - data = types[i].strip().split(",") - if not data[0].isspace(): - task_data[i] = [0 for _ in range(len(data))] - - for j in range(len(data)): - if not data[j].isspace(): - task_data[i][j] = int(data[j]) - else: - task_data = [[], [], []] - else: - task_data = [[], [], []] - - task_data = TaskDataInfo(*task_data) - - task_tuple = TaskInfo(task_ids, task_runtime, - task_dependencies, task_data) - - task_list.append(task_tuple) - - task_graph = convert_to_dictionary(task_list) + if config.task_config is None: + raise ValueError( + f"Task Configuration file must be specified: {config}") - return data_config, task_graph + if config.data_config is None: + raise ValueError( + f"Data Configuration file must be specified: {config}") -def get_time(line: str) -> int: - logged_time = line.split('>>')[0].strip().strip("\`").strip('[]') - return int(logged_time) +@register_graph_generator +def make_independent_graph(config: IndependentConfig) -> Tuple[TaskMap, DataMap]: + check_config(config) + data_config = config.data_config + configurations = config.task_config + task_dict = dict() + data_dict = dict() -def check_log_line(line: str) -> int: - if "Running task" in line: - return LogState.START_TASK - elif "Notified dependents" in line: - return LogState.NOTIFY_DEPENDENTS - elif "Assigned " in line: - return LogState.ASSIGNED_TASK - elif "Runahead task" in line: - return LogState.RUNAHEAD_TASK - elif "Completed task" in line: - return LogState.COMPLETED_TASK - elif "Adding dependencies" in line: - return LogState.ADDING_DEPENDENCIES - elif "has constraints" in line: - return LogState.ADD_CONSTRAINT + # Generate configuration for data initialization + if data_config.pattern == DataInitType.NO_DATA: + data_placement = Device(Architecture.CPU, 0) + data_dict[0] = DataInfo(0, 1, data_placement) + num_data_blocks = 1 else: - return LogState.UNKNOWN - - -def convert_task_id(task_id: str, instance: int = 0) -> TaskID: - id = task_id.strip().split('_') - taskspace = id[0] - task_idx = tuple([int(i) for i in id[1:]]) - return TaskID(taskspace, task_idx, int(instance)) - - -def get_task_properties(line: str): - message = line.split('>>')[1].strip() - tasks = re.findall(task_filter, message) - tprops = [] - for task in tasks: - properties = {} - task = task.strip('InnerTask{').strip('}').strip() - task_properties = task.split(',') - for prop in task_properties: - prop_name, prop_value = prop.strip().split(':') - properties[prop_name] = prop_value.strip() + if data_config.pattern == DataInitType.INDEPENDENT_DATA: + num_data_blocks = config.task_count + elif data_config.pattern == DataInitType.OVERLAPPED_DATA: + num_data_blocks = data_config.npartitions + else: + raise NotImplementedError( + f"Data pattern {data_config.pattern} not implemented for independent task graph.") - # If ".dm." is in the task name, ignore it since - # this is a data movement task. - # TODO(hc): we may need to verify data movemnt task too. - if ".dm." in properties['name']: - continue + data_size = data_config.total_width // num_data_blocks - properties['name'] = convert_task_id( - properties['name'], properties['instance']) + for i in range(num_data_blocks): + data_placement = get_data_placement(i, config) + data_dict[i] = DataInfo(i, data_size, data_placement) - tprops.append(properties) + # Build task graph + task_placement_info = configurations + for i in range(config.task_count): - return tprops + # Task ID + task_id = TaskID("T", (i,), 0) + # Task Dependencies + task_dependencies = [] -def parse_blog(filename: str = 'parla.blog') -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]: + # Task Data Dependencies + if data_config.pattern == DataInitType.NO_DATA: + data_dependencies = TaskDataInfo() + if data_config.pattern == DataInitType.INDEPENDENT_DATA or data_config.pattern == DataInitType.OVERLAPPED_DATA: + data_dependencies = TaskDataInfo( + read=[DataAccess(i % num_data_blocks)]) - try: - result = subprocess.run( - ['bread', '-s', r"-f `[%r] >> %m`", filename], stdout=subprocess.PIPE) + # Task Mapping + if config.fixed_placement: + if config.placement_arch == Architecture.GPU: + task_mapping = Device(Architecture.GPU, i % config.n_devices) + elif config.placement_arch == Architecture.CPU: + task_mapping = Device(Architecture.CPU, 0) + else: + task_mapping = None - output = result.stdout.decode('utf-8') - except subprocess.CalledProcessError as e: - raise Exception(e.output) + task_dict[task_id] = TaskInfo( + task_id, task_placement_info, task_dependencies, data_dependencies, task_mapping) - output = output.splitlines() + return task_dict, data_dict - task_start_times = {} - task_runahead_times = {} - task_notify_times = {} - task_end_times = {} - task_assigned_times = {} - task_start_order = [] - task_end_order = [] - task_runahead_order = [] +@register_graph_generator +def make_serial_graph(config: SerialConfig) -> Tuple[TaskMap, DataMap]: - task_times = {} - task_states = defaultdict(list) + check_config(config) + data_config = config.data_config + configurations = config.task_config - task_dependencies = {} + task_dict = dict() + data_dict = dict() - final_instance_map = {} + # Generate configuration for data initialization + if data_config.pattern == DataInitType.NO_DATA: + data_placement = Device(Architecture.CPU, 0) + data_dict[0] = DataInfo(0, 1, data_placement) + num_data_blocks = 1 + else: + if data_config.pattern == DataInitType.INDEPENDENT_DATA: + num_data_blocks = config.steps * config.chains - for line in output: - line_type = check_log_line(line) - if line_type == LogState.START_TASK: - start_time = get_time(line) - task_properties = get_task_properties(line) + elif data_config.pattern == DataInitType.OVERLAPPED_DATA: + num_data_blocks = config.chains + else: + raise NotImplementedError( + f"Data pattern {data_config.pattern} not implemented for serial task graph.") - if task_properties[0]["is_data_task"] == "1": - continue + data_size = data_config.total_width // num_data_blocks - task_properties = task_properties[0] + for i in range(num_data_blocks): + data_placement = get_data_placement(i, config) + data_dict[i] = DataInfo(i, data_size, data_placement) - task_start_times[task_properties['name']] = start_time - task_start_order.append(task_properties['name']) + # Build task graph + task_placement_info = configurations + for i in range(config.steps): + for j in range(config.chains): - current_name = task_properties['name'] + # Task ID: + task_id = TaskID("T", (i, j), 0) - base_name = TaskID(current_name.taskspace, - current_name.task_idx, - 0) + # Task Dependencies + dependency_list = [] + dependency_limit = min(i, config.dependency_count) + for k in range(1, dependency_limit+1): + assert (i-k >= 0) + dependency = TaskID("T", (i-k, j), 0) + dependency_list.append(dependency) - if base_name in final_instance_map: - if current_name.instance > final_instance_map[base_name].instance: - final_instance_map[base_name] = current_name + # Task Data Dependencies + if data_config.pattern == DataInitType.NO_DATA: + data_dependencies = TaskDataInfo() else: - # if current_name.instance > 0: - # raise RuntimeError( - # "Instance number is not 0 for first instance of task") - final_instance_map[base_name] = base_name - - elif line_type == LogState.RUNAHEAD_TASK: - runahead_time = get_time(line) - task_properties = get_task_properties(line) - - if task_properties[0]["is_data_task"] == "1": - continue - - task_properties = task_properties[0] - - current_name = task_properties['name'] - base_name = TaskID(current_name.taskspace, - current_name.task_idx, - 0) - - task_runahead_times[base_name] = runahead_time - task_runahead_order.append(base_name) - - elif line_type == LogState.COMPLETED_TASK: - end_time = get_time(line) - task_properties = get_task_properties(line) - - if task_properties[0]["is_data_task"] == "1": - continue - - task_properties = task_properties[0] - - current_name = task_properties['name'] - base_name = TaskID(current_name.taskspace, - current_name.task_idx, - 0) - - task_end_times[base_name] = end_time - task_end_order.append(base_name) - - elif line_type == LogState.NOTIFY_DEPENDENTS: - notify_time = get_time(line) - task_properties = get_task_properties(line) - - if task_properties[0]["is_data_task"] == "1": - continue - - notifying_task = task_properties[0] - current_name = notifying_task['name'] - current_state = notifying_task['get_state'] - instance = notifying_task['instance'] - - if int(instance) > 0: - base_name = TaskID(current_name.taskspace, - current_name.task_idx, - 0) - task_states[base_name] += [current_state] - - task_states[current_name] += [current_state] - - elif line_type == LogState.ASSIGNED_TASK: - assigned_time = get_time(line) - task_properties = get_task_properties(line) - - if task_properties[0]["is_data_task"] == "1": - continue - - task_properties = task_properties[0] - - current_name = task_properties['name'] - base_name = TaskID(current_name.taskspace, - current_name.task_idx, - 0) - - task_assigned_times[base_name] = assigned_time - - elif line_type == LogState.ADDING_DEPENDENCIES: - task_properties = get_task_properties(line) - - if task_properties[0]["is_data_task"] == "1": - continue - - current_task = task_properties[0]['name'] - current_dependencies = [] + if data_config.pattern == DataInitType.INDEPENDENT_DATA: + inout_data_index = i * config.chains + j + elif data_config.pattern == DataInitType.OVERLAPPED_DATA: + inout_data_index = j + data_dependencies = TaskDataInfo( + read_write=[DataAccess(inout_data_index)]) + + # Task Mapping + if config.fixed_placement: + if config.placement_arch == Architecture.GPU: + task_mapping = Device( + Architecture.GPU, j % config.n_devices) + elif config.placement_arch == Architecture.CPU: + task_mapping = Device(Architecture.CPU, 0) + else: + task_mapping = None - for d in task_properties[1:]: - dependency = d['name'] - current_dependencies.append(dependency) + task_dict[task_id] = TaskInfo( + task_id, task_placement_info, dependency_list, data_dependencies, task_mapping) - task_dependencies[current_task] = current_dependencies + return task_dict, data_dict - for task in task_end_times: - assigned_t = task_assigned_times[task] - start_t = task_start_times[task] - # end_t = task_end_times[task] - end_t = task_end_times[task] - duration = end_t - start_t - task_times[task] = TaskTime(assigned_t, start_t, end_t, duration) - return task_times, task_dependencies, task_states +def generate_reduction_graph(config: ReductionConfig) -> Tuple[TaskMap, DataMap]: + check_config(config) + data_config = config.data_config + configurations = config.task_config -def generate_serial_graph(config: SerialConfig) -> str: - task_config = config.task_config - configurations = task_config.configurations + task_dict = dict() + data_dict = dict() - graph = "" + # Generate configuration for data initialization - data_config_string = "" - if config.data_pattern == DataInitType.NO_DATA: - data_config_string = "{1 : -1}\n" - elif config.data_pattern == DataInitType.OVERLAPPED_DATA: - config.data_partitions = 1 - single_data_block_size = ( - config.total_data_width // config.data_partitions) - for i in range(config.data_partitions): - data_config_string += f"{{ {single_data_block_size} : -1}}" - if i+1 < config.data_partitions: - data_config_string += f" , " - elif config.data_pattern == DataInitType.INDEPENDENT_DATA: - raise NotImplementedError("[Serial] Data patterns not implemented") - else: - raise ValueError( - f"[Serial] Not supported data configuration: {config.data_pattern}") - data_config_string += "\n" + # Build Task Graph + task_placement_info = configurations - if task_config is None: - raise ValueError("Task config must be specified") + for level in range(config.levels, -1, -1): + tasks_in_level = config.branch_factor ** level + subtree_segment = tasks_in_level / config.num_gpus - configuration_string = "" - for device_id, task_config in configurations.items(): - last_flag = 1 if device_id == list( - configurations.keys())[-1] else 0 - if config.fixed_placement: - device_id = DeviceType.GPU_0 - # Othrewise, expect any cpu or any gpu. - configuration_string += f"{{ {device_id} : {task_config.task_time}, {task_config.device_fraction}, {task_config.gil_accesses}, {task_config.gil_fraction}, {task_config.memory} }}" + for j in range(tasks_in_level): + # Task ID: + task_id = TaskID("T", (level, j), 0) - if last_flag == 0: - configuration_string += ", " + # Task Dependencies + dependency_list = [] + if level < config.levels: + for k in range(config.branch_factor): + dependency = TaskID( + "T", (level-1, config.branch_factor*j + k), 0) + dependency_list.append(dependency) - graph += data_config_string - for i in range(config.steps): # height - inout_data_index = i - if config.data_pattern == DataInitType.OVERLAPPED_DATA: - inout_data_index = 0 - for j in range(config.chains): # width - dependency_string = "" - dependency_limit = min(i, config.dependency_count) - for k in range(1, dependency_limit+1): - assert (i-k >= 0) - dependency_string += f"{i-k, j}" + # Task Data Dependencies + if data_config.pattern == DataInitType.NO_DATA: + data_dependencies = TaskDataInfo([], [], []) + else: + if data_config.pattern == DataInitType.INDEPENDENT_DATA: + inout_data_index = level * config.branch_factor + j + elif data_config.pattern == DataInitType.OVERLAPPED_DATA: + inout_data_index = j + data_dependencies = TaskDataInfo([], [], [inout_data_index]) - if k < dependency_limit: - dependency_string += " : " + # Task Mapping + if config.fixed_placement: + if config.placement_arch == Architecture.GPU: + task_mapping = Device( + Architecture.GPU, j // subtree_segment) + elif config.placement_arch == Architecture.CPU: + task_mapping = Device(Architecture.CPU, 0) + else: + task_mapping = None - graph += f"{i, j} | {configuration_string} | {dependency_string} | : : {inout_data_index} \n" + task_dict[task_id] = TaskInfo( + task_id, task_placement_info, dependency_list, data_dependencies, task_mapping) - return graph -#TODO(wlr): Refactor this. It is terribly hard to read def generate_reduction_graph(config: ReductionConfig) -> str: task_config = config.task_config num_gpus = config.num_gpus @@ -730,7 +294,7 @@ def generate_reduction_graph(config: ReductionConfig) -> str: # but for now, we only consider a single device placement and so, # follow the old generator's graph generation rule. - device_id = DeviceType.ANY_GPU_DEVICE + device_id = Architecture.GPU for config_device_id, task_config in configurations.items(): last_flag = 1 if config_device_id == list( configurations.keys())[-1] else 0 @@ -773,9 +337,9 @@ def generate_reduction_graph(config: ReductionConfig) -> str: write_dependency = f"{global_idx}" if config.fixed_placement: # USER_CHOSEN_DEVICE acts as an offset. - device_id = int(DeviceType.USER_CHOSEN_DEVICE + j // segment) + device_id = int(2 + j // segment) else: - assert device_id == DeviceType.CPU_DEVICE or device_id == DeviceType.ANY_GPU_DEVICE + assert device_id == Architecture.CPU_DEVICE or device_id == Architecture.ANY_GPU_DEVICE pre_configuration_string = f"{{ {device_id} : " configuration_string = pre_configuration_string + post_configuration_string graph += f"{reverse_level, j} | {configuration_string} | {dependency_string} | {read_dependency} : : {write_dependency} \n" @@ -784,50 +348,6 @@ def generate_reduction_graph(config: ReductionConfig) -> str: return graph -def generate_independent_graph(config: IndependentConfig) -> str: - task_config = config.task_config - configurations = task_config.configurations - num_gpus = config.num_gpus - - graph = "" - - data_config_string = "" - # TODO(hc): for now, assume that data allocation starts from cpu. - if config.data_pattern == DataInitType.NO_DATA: - data_config_string = f"{{1 : -1}}" - elif config.data_pattern == DataInitType.INDEPENDENT_DATA: - single_data_block_size = config.total_data_width - config.data_partitions = 64 - for i in range(config.data_partitions): - data_config_string += f"{{{single_data_block_size} : -1}}" - if i+1 < config.data_partitions: - data_config_string += f", " - elif config.data_pattern == DataInitType.OVERLAPPED_DATA: - raise NotImplementedError( - "[Independent] Data patterns not implemented") - else: - raise ValueError("[Independent] Data patterns not implemented") - data_config_string += "\n" - - if task_config is None: - raise ValueError("Task config must be specified") - - graph += data_config_string - for i in range(config.task_count): - read_data_block = i % config.data_partitions - configuration_string = "" - for device_id, task_config in configurations.items(): - last_flag = 1 if device_id == list( - configurations.keys())[-1] else 0 - if config.fixed_placement: - device_id = int(DeviceType.USER_CHOSEN_DEVICE + i % num_gpus) - configuration_string += f"{{ {device_id} : {task_config.task_time}, {task_config.device_fraction}, {task_config.gil_accesses}, {task_config.gil_fraction}, {task_config.memory} }}" - if last_flag == 0: - configuration_string += ", " - graph += f"{i} | {configuration_string} | | {read_data_block} : :\n" - return graph - - def generate_reduction_scatter_graph(tgraph_config: ReductionScatterConfig) -> str: """ Generate reduction-scatter graph input file. @@ -959,11 +479,3 @@ def generate_reduction_scatter_graph(tgraph_config: ReductionScatterConfig) -> s bulk_task_dev_id = DeviceType.USER_CHOSEN_DEVICE task_id += 1 return graph - - -__all__ = [DeviceType, LogState, MovementType, DataInitType, TaskID, TaskRuntimeInfo, - TaskDataInfo, TaskInfo, DataInfo, TaskTime, TimeSample, read_pgraph, - parse_blog, TaskConfigs, RunConfig, shuffle_tasks, - generate_independent_graph, generate_serial_graph, generate_reduction_graph] - generate_independent_graph, generate_serial_graph, - generate_reduction_scatter_graph] diff --git a/src/python/parla/utility/load.py b/src/python/parla/utility/load.py new file mode 100644 index 00000000..b2cc1561 --- /dev/null +++ b/src/python/parla/utility/load.py @@ -0,0 +1,517 @@ +from .types import * +import yaml + + +def convert_to_dictionary(task_list: List[TaskInfo]) -> Dict[TaskID, TaskInfo]: + """ + Converts a task list to a task graph dictionary + """ + task_dict = dict() + for task in task_list: + task_dict[task.id] = task + + return task_dict + + +######################################## +# YAML Write +######################################## + +def write_object_to_dict(obj): + """ + Write a single task to an open YAML file + """ + sub_dict = {} + + def is_base(x): return isinstance( + x, (int, float, str, bool, type(None))) + + def is_base_str(x): return isinstance(x, (tuple, Architecture, Device)) + def is_base_value(x): return isinstance(x, (Decimal, Fraction)) + + def unpack_values(values): + if is_base_str(value): + return str(value) + elif is_base(value): + return value + elif is_base_value(value): + return numeric_to_str(value) + elif isinstance(value, TaskPlacementInfo): + return write_object_to_dict(value.info) + elif isinstance(value, list): + return [write_object_to_dict(x) for x in value] + else: + return write_object_to_dict(value) + + if isinstance(obj, Dict): + for key, value in obj.items(): + key = str(key) + sub_dict[key] = unpack_values(value) + elif is_base(obj): + return obj + else: + for slot in obj.__slots__: + value = getattr(obj, slot) + sub_dict[slot] = unpack_values(value) + + return sub_dict + + +def write_data_to_yaml(data: Dict[int, DataInfo], basename: str = "graph"): + """ + Write the data specifiers to a yaml file + """ + + datafile = basename + ".data.yaml" + + with open(datafile, "w") as file: + data = [write_object_to_dict(data) for data in data.values()] + yaml.dump(data, file, default_flow_style=False, sort_keys=False) + + +def write_tasks_to_yaml(tasks: Dict[TaskID, TaskInfo], basename: str = "graph"): + """ + Write the task graph to a yaml file + """ + + taskfile = basename + ".tasks.yaml" + + with open(taskfile, "w") as file: + tasks = [write_object_to_dict(task) for task in tasks.values()] + yaml.dump(tasks, file, default_flow_style=False, sort_keys=False) + + +def write_task_mapping_to_yaml(task_mapping: Dict[TaskID, Device | Tuple[Device]], basename: str = "graph"): + """ + Write the task -> device mapping to a yaml file + """ + + taskfile = basename + ".mapping.yaml" + + with open(taskfile, "w") as file: + maplist = [] + for task_id, device in task_mapping.items(): + mapping_dict = {"id": task_id, "mapping": device} + maplist.append(write_object_to_dict(mapping_dict)) + + yaml.dump(maplist, file, + default_flow_style=False, sort_keys=False) + + +def write_task_order_to_yaml(task_order: Dict[TaskID, int], basename: str = "graph"): + """ + Write the task -> order mapping to a yaml file + """ + + taskfile = basename + ".order.yaml" + + with open(taskfile, "w") as file: + maplist = [] + for task_id, order in task_order.items(): + mapping_dict = {"id": task_id, "order": order} + maplist.append(write_object_to_dict(mapping_dict)) + + yaml.dump(maplist, file, + default_flow_style=False, sort_keys=False) + + +def write_to_yaml(tasks: Optional[Dict[TaskID, TaskInfo]] = None, + data: Optional[Dict[int, DataInfo]] = None, + mapping: Optional[Dict[TaskID, + Device | Tuple[Device]]] = None, + order: Optional[Dict[TaskID, int]] = None, + basename: str = "graph"): + """ + Write the task graph to a yaml file + """ + + if tasks is not None: + write_tasks_to_yaml(tasks, basename) + + if data is not None: + write_data_to_yaml(data, basename) + + if mapping is not None: + write_task_mapping_to_yaml(mapping, basename) + + if order is not None: + write_task_order_to_yaml(order, basename) + + +######################################## +# YAML Read +######################################## + + +def read_tasks_from_dict(task_dict: Dict) -> TaskInfo: + """ + Read a task from a dictionary + """ + + task_id = make_task_id_from_dict(task_dict["id"]) + + task_runtime = make_task_placement_from_dict(task_dict["runtime"]) + task_dependencies = [make_task_id_from_dict(task) + for task in task_dict["dependencies"]] + data_dependencies = make_data_dependencies_from_dict( + task_dict["data_dependencies"]) + + if 'mapping' in task_dict: + task_mapping = device_from_string(task_dict["mapping"]) + else: + task_mapping = None + + if 'order' in task_dict: + task_order = int(task_dict["order"]) + else: + task_order = 0 + + return TaskInfo(id=task_id, runtime=task_runtime, dependencies=task_dependencies, data_dependencies=data_dependencies, mapping=task_mapping, order=task_order) + + +def read_mapping_from_dict(mapping_dict: Dict) -> Tuple[TaskID, Device | Tuple[Device]]: + """ + Read a task -> device mapping from a dictionary + """ + + task_id = make_task_id_from_dict(mapping_dict["id"]) + task_mapping = device_from_string(mapping_dict["mapping"]) + + return task_id, task_mapping + + +def read_order_from_dict(order_dict: Dict) -> Tuple[TaskID, int]: + """ + Read a task -> order mapping from a dictionary + """ + + task_id = make_task_id_from_dict(order_dict["id"]) + task_order = int(order_dict["order"]) + + return task_id, task_order + + +def read_data_from_yaml(basename: str = "graph") -> Dict[int, DataInfo]: + """ + Read the data specification from a yaml file + """ + + datafile = basename + ".data.yaml" + + try: + data_dict = dict() + with open(datafile, "r") as file: + data = yaml.load(file, Loader=yaml.FullLoader) + data = [make_data_info(data) for data in data] + + for data in data: + data_dict[data.id] = data + except FileNotFoundError: + raise Warning(f"Could not find data file {datafile}") + return None + + return data_dict + + +def read_tasks_from_yaml(basename: str = "graph") -> Dict[TaskID, TaskInfo]: + """ + Read the task graph from a yaml file + """ + taskfile = basename + ".tasks.yaml" + + try: + task_dict = dict() + with open(taskfile, "r") as file: + tasks = yaml.load(file, Loader=yaml.FullLoader) + tasks = [read_tasks_from_dict(task) for task in tasks] + + for task in tasks: + task_dict[task.id] = task + except FileNotFoundError: + raise Warning(f"Could not find task file {taskfile}") + return None + + return task_dict + + +def read_task_mapping_from_yaml(basename: str = "graph") -> Dict[TaskID, Device | Tuple[Device]]: + """ + Read the task -> device mapping from a yaml file + """ + taskfile = basename + ".mapping.yaml" + + try: + task_mapping = dict() + with open(taskfile, "r") as file: + mappings = yaml.load(file, Loader=yaml.FullLoader) + mappings = [read_mapping_from_dict( + mapping) for mapping in mappings] + + for mapping in mappings: + task_mapping[mapping.id] = mapping.mapping + except FileNotFoundError: + raise Warning(f"Could not find mapping file {taskfile}") + return None + + return task_mapping + + +def read_task_order_from_yaml(basename: str = "graph") -> Dict[TaskID, int]: + """ + Read the task -> order mapping from a yaml file + """ + taskfile = basename + ".order.yaml" + + task_order = dict() + try: + with open(taskfile, "r") as file: + orders = yaml.load(file, Loader=yaml.FullLoader) + orders = [read_order_from_dict( + order) for order in orders] + + for order in orders: + task_order[order.id] = order.order + except FileNotFoundError: + raise Warning(f"Could not find order file {taskfile}") + return None + + return task_order + + +def read_from_yaml(taskfile: Optional[str] = None, datafile: Optional[str] = None) -> Tuple[Optional[Dict[TaskID, TaskInfo]], Optional[Dict[int, DataInfo]]]: + """ + Read the task graph from a yaml file + """ + + if taskfile is None: + tasks = None + else: + tasks = read_tasks_from_yaml(taskfile) + + if datafile is None: + data = None + else: + data = read_data_from_yaml(datafile) + + return tasks, data + +########################################### +# Legacy "PGRAPH" Parla Graph Format Write +########################################### + + +def write_to_pgraph(tasks: Dict[TaskID, TaskInfo], data: Dict[int, DataInfo], basename: str = "graph"): + """ + Write the task graph to a pgraph file + """ + + taskfile = basename + ".pgraph" + + def info_to_comma(info): + comma = ", ".join( + [f"{getattr(info, slot)}" for slot in info.__slots__]) + return comma + + def unpack_runtime(runtime: TaskRuntimeInfo | Dict[Device | Tuple[Device], TaskRuntimeInfo]): + print("Unpacking runtime: ", type(runtime)) + if isinstance(runtime, TaskPlacementInfo): + return unpack_runtime(runtime.info) + elif isinstance(runtime, Dict): + return ", ".join([f"{{{device} : {unpack_runtime(r)}}}" for device, + r in runtime.items()]) + elif isinstance(runtime, TaskRuntimeInfo): + return info_to_comma(runtime) + elif isinstance(runtime, list): + raise NotImplementedError( + f"PGraph does not support lists of runtime configurations (Device configurations cannot vary by their index in the placement options)") + else: + raise ValueError(f"Unknown runtime type {runtime}") + + def unpack_id(task_id): + return f"('{task_id.taskspace}', {tuple(task_id.task_idx)})" + + def unpack_dependencies(dependencies: list[TaskID]): + return ": ".join([f" {unpack_id(task_id)} " for task_id in dependencies]) + + def unpack_data_dependencies(dependencies: TaskDataInfo): + read_data = dependencies.read + write_data = dependencies.write + read_write_data = dependencies.read_write + + read_string = ", ".join([f"{data}" for data in read_data]) + write_string = ", ".join([f"{data}" for data in write_data]) + read_write_string = ", ".join([f"{data}" for data in read_write_data]) + + return f"{read_string} : {write_string} : {read_write_string}" + + data_line = ", ".join( + [f"{{{data.size} : {data.location}}}" for data in data.values()]) + task_lines = [] + + for task in tasks.values(): + task_id = unpack_id(task.id) + task_runtime = unpack_runtime(task.runtime) + task_dependencies = unpack_dependencies(task.dependencies) + task_data_dependencies = unpack_data_dependencies( + task.data_dependencies) + + task_line = f"{task_id} | {task_runtime} | {task_dependencies} | {task_data_dependencies}" + task_lines.append(task_line) + + with open(taskfile, "w") as file: + print(data_line, file=file) + for task_line in task_lines: + print(task_line, file=file) + + +########################################## +# Legacy "PGRAPH" Parla Graph Format Read +########################################## + + +def read_from_pgraph(basename: str = "graph") -> Tuple[Dict[TaskID, TaskInfo], Dict[int, DataInfo]]: + + task_dict = dict() + data_dict = dict() + + filename = basename + ".pgraph" + + def extract_task_id(line: str) -> TaskID: + try: + ids = line.strip() + ids = make_tuple(ids) + + if not isinstance(ids, tuple): + ids = (ids,) + + if isinstance(ids[0], str) and ids[0].isalpha(): + taskspace = ids[0] + task_idx = ids[1] + + if not isinstance(task_idx, tuple): + task_idx = (task_idx,) + + else: + taskspace = "T" + task_idx = ids + + return TaskID(taskspace, task_idx, 0) + except Exception as e: + raise ValueError(f"Could not parse task id {line}: {e}") + + def extract_task_runtime(line: str) -> Dict[Device | Tuple[Device], TaskRuntimeInfo]: + try: + line = line.strip() + + configurations = line.split("},") + configurations = [config.strip().strip("{}").strip() + for config in configurations] + task_runtime = {} + for config in configurations: + targets, details = config.split(":") + targets = device_from_string(targets) + + details = [numeric_from_str(detail.strip()) + for detail in details.split(",")] + + task_runtime[targets] = TaskRuntimeInfo(*details) + + task_runtime = TaskPlacementInfo(info=task_runtime) + task_runtime.update() + + return task_runtime + except Exception as e: + raise ValueError(f"Could not parse task runtime {line}: {e}") + + def extract_task_dependencies(line: str) -> list[TaskID]: + try: + line = line.strip() + dependencies = line.split(":") + dependencies = [dependency.strip() for dependency in dependencies] + + if dependencies[0] == "": + return [] + + return [extract_task_id(dependency) for dependency in dependencies] + except Exception as e: + raise ValueError(f"Could not parse task dependencies {line}: {e}") + + def extract_data_dependencies(line: str) -> TaskDataInfo: + try: + line = line.strip() + dependencies = line.split(":") + dependencies = [dependency.strip() for dependency in dependencies] + + check_has = [(not dependency.isspace()) and (not dependency == '') + for dependency in dependencies] + + if not any(check_has): + return TaskDataInfo([], [], []) + + if len(dependencies) > 3: + raise ValueError( + f"Too many data movement types {dependencies}") + + if len(dependencies) < 1 or dependencies[0].isspace() or not check_has[0]: + read_data = [] + else: + read_data = [int(x.strip()) + for x in dependencies[0].split(",")] + + if len(dependencies) < 2 or dependencies[1].isspace() or not check_has[1]: + write_data = [] + else: + write_data = [int(x.strip()) + for x in dependencies[1].split(",")] + + if len(dependencies) < 3 or dependencies[2].isspace() or not check_has[2]: + read_write_data = [] + else: + read_write_data = [int(x.strip()) if ( + x) else None for x in dependencies[2].split(",")] + + return TaskDataInfo(read_data, write_data, read_write_data) + except Exception as e: + raise ValueError(f"Could not parse data dependencies {line}: {e}") + + with open(filename, "r") as file: + + lines = file.readlines() + + data_line = lines.pop(0) + data_line = data_line.strip() + data_line = data_line.split(",") + + for idx, data in enumerate(data_line): + data = data.strip() + data = data.strip("{}") + data = data.split(":") + data_size = int(data[0]) + data_location = device_from_string(data[1]) + data_info = DataInfo(idx, data_size, data_location) + data_dict[idx] = data_info + + for task_line in lines: + task = task_line.strip() + task = task.split("|") + + if len(task) < 2: + raise ValueError(f"Task line {task_line} is too short") + + task_id = extract_task_id(task[0]) + task_runtime = extract_task_runtime(task[1]) + if len(task) > 2: + task_dependencies = extract_task_dependencies(task[2]) + else: + task_dependencies = [] + + if len(task) > 3: + task_data_dependencies = extract_data_dependencies(task[3]) + else: + task_data_dependencies = TaskDataInfo([], [], []) + + task_info = TaskInfo(task_id, task_runtime, + task_dependencies, task_data_dependencies) + task_dict[task_id] = task_info + + return task_dict, data_dict diff --git a/src/python/parla/utility/logging.py b/src/python/parla/utility/logging.py new file mode 100644 index 00000000..fba1b0d7 --- /dev/null +++ b/src/python/parla/utility/logging.py @@ -0,0 +1,233 @@ +from .types import * +import re + +######################################### +# Log Parsing States +######################################### + + +class LogState(IntEnum): + """ + Specifies the meaning of a log line. Used for parsing the log file. + """ + ADDING_DEPENDENCIES = 0 + ADD_CONSTRAINT = 1 + ASSIGNED_TASK = 2 + START_TASK = 3 + RUNAHEAD_TASK = 4 + NOTIFY_DEPENDENTS = 6 + COMPLETED_TASK = 6 + UNKNOWN = 7 + +task_filter = re.compile(r'InnerTask\{ .*? \}') + + +def get_time(line: str) -> int: + logged_time = line.split('>>')[0].strip().strip("\`").strip('[]') + return int(logged_time) + + +def check_log_line(line: str) -> int: + if "Running task" in line: + return LogState.START_TASK + elif "Notified dependents" in line: + return LogState.NOTIFY_DEPENDENTS + elif "Assigned " in line: + return LogState.ASSIGNED_TASK + elif "Runahead task" in line: + return LogState.RUNAHEAD_TASK + elif "Completed task" in line: + return LogState.COMPLETED_TASK + elif "Adding dependencies" in line: + return LogState.ADDING_DEPENDENCIES + elif "has constraints" in line: + return LogState.ADD_CONSTRAINT + else: + return LogState.UNKNOWN + + +def convert_task_id(task_id: str, instance: int = 0) -> TaskID: + id = task_id.strip().split('_') + taskspace = id[0] + task_idx = tuple([int(i) for i in id[1:]]) + return TaskID(taskspace, task_idx, int(instance)) + + +def get_task_properties(line: str): + message = line.split('>>')[1].strip() + tasks = re.findall(task_filter, message) + tprops = [] + for task in tasks: + properties = {} + task = task.strip('InnerTask{').strip('}').strip() + task_properties = task.split(',') + for prop in task_properties: + prop_name, prop_value = prop.strip().split(':') + properties[prop_name] = prop_value.strip() + + # If ".dm." is in the task name, ignore it since + # this is a data movement task. + # TODO(hc): we may need to verify data movemnt task too. + if ".dm." in properties['name']: + continue + + properties['name'] = convert_task_id( + properties['name'], properties['instance']) + + tprops.append(properties) + + return tprops + + +def parse_blog(filename: str = 'parla.blog') -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]: + + try: + result = subprocess.run( + ['bread', '-s', r"-f `[%r] >> %m`", filename], stdout=subprocess.PIPE) + + output = result.stdout.decode('utf-8') + except subprocess.CalledProcessError as e: + raise Exception(e.output) + + output = output.splitlines() + + task_start_times = {} + task_runahead_times = {} + task_notify_times = {} + task_end_times = {} + task_assigned_times = {} + + task_start_order = [] + task_end_order = [] + task_runahead_order = [] + + task_times = {} + task_states = defaultdict(list) + + task_dependencies = {} + + final_instance_map = {} + + for line in output: + line_type = check_log_line(line) + if line_type == LogState.START_TASK: + start_time = get_time(line) + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + task_properties = task_properties[0] + + task_start_times[task_properties['name']] = start_time + task_start_order.append(task_properties['name']) + + current_name = task_properties['name'] + + base_name = TaskID(current_name.taskspace, + current_name.task_idx, + 0) + + if base_name in final_instance_map: + if current_name.instance > final_instance_map[base_name].instance: + final_instance_map[base_name] = current_name + else: + # if current_name.instance > 0: + # raise RuntimeError( + # "Instance number is not 0 for first instance of task") + final_instance_map[base_name] = base_name + + elif line_type == LogState.RUNAHEAD_TASK: + runahead_time = get_time(line) + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + task_properties = task_properties[0] + + current_name = task_properties['name'] + base_name = TaskID(current_name.taskspace, + current_name.task_idx, + 0) + + task_runahead_times[base_name] = runahead_time + task_runahead_order.append(base_name) + + elif line_type == LogState.COMPLETED_TASK: + end_time = get_time(line) + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + task_properties = task_properties[0] + + current_name = task_properties['name'] + base_name = TaskID(current_name.taskspace, + current_name.task_idx, + 0) + + task_end_times[base_name] = end_time + task_end_order.append(base_name) + + elif line_type == LogState.NOTIFY_DEPENDENTS: + notify_time = get_time(line) + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + notifying_task = task_properties[0] + current_name = notifying_task['name'] + current_state = notifying_task['get_state'] + instance = notifying_task['instance'] + + if int(instance) > 0: + base_name = TaskID(current_name.taskspace, + current_name.task_idx, + 0) + task_states[base_name] += [current_state] + + task_states[current_name] += [current_state] + + elif line_type == LogState.ASSIGNED_TASK: + assigned_time = get_time(line) + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + task_properties = task_properties[0] + + current_name = task_properties['name'] + base_name = TaskID(current_name.taskspace, + current_name.task_idx, + 0) + + task_assigned_times[base_name] = assigned_time + + elif line_type == LogState.ADDING_DEPENDENCIES: + task_properties = get_task_properties(line) + + if task_properties[0]["is_data_task"] == "1": + continue + + current_task = task_properties[0]['name'] + current_dependencies = [] + + for d in task_properties[1:]: + dependency = d['name'] + current_dependencies.append(dependency) + + task_dependencies[current_task] = current_dependencies + + for task in task_end_times: + assigned_t = task_assigned_times[task] + start_t = task_start_times[task] + # end_t = task_end_times[task] + end_t = task_end_times[task] + duration = end_t - start_t + task_times[task] = TaskTime(assigned_t, start_t, end_t, duration) + + return task_times, task_dependencies, task_states diff --git a/src/python/parla/utility/types.py b/src/python/parla/utility/types.py new file mode 100644 index 00000000..00e85d60 --- /dev/null +++ b/src/python/parla/utility/types.py @@ -0,0 +1,668 @@ +from typing import List, Dict, Tuple, Union, Optional, Callable +from dataclasses import dataclass, field +from enum import IntEnum + +from collections import defaultdict + +from fractions import Fraction +from decimal import Decimal + +from ast import literal_eval as make_tuple + +######################################### +# Device Information +######################################### + + +class Architecture(IntEnum): + """ + Used to specify the architecture of a device in a synthetic task graph. + """ + ANY = -1 + CPU = 0 + GPU = 1 + + def __str__(self): + return self.name + + +@dataclass(frozen=True, slots=True) +class Device: + """ + Identifies a device in a synthetic task graph. + """ + # The architecture of the device + architecture: Architecture = Architecture.CPU + # The id of the device (-1 for any) + device_id: int = 0 + + def __str__(self): + return f"{self.architecture.name}[{self.device_id}]" + + def __repr__(self): + return str(self) + + +######################################### +# Data Information +######################################### + + +@dataclass(slots=True) +class DataInfo: + """ + The collection of information for a data object in a synthetic task graph. + + @field id: The id of the data object + @field size: The size of the data object + @field location: The initial allocation location of the data object (device or tuple of devices) + + Distribution is assumed to be uniform partitioning along the first dimension across all devices. + """ + + id: int + size: int + location: Device | Tuple[Device] + + +@dataclass(slots=True) +class DataAccess: + """ + The collection of information for a data access in a synthetic task graph. + + @field id: The id of the data object + @field pattern: The access pattern for the data object (slice, list, int, or None) + + Only access patterns along the first dimension are supported. + """ + id: int + pattern: slice | list[int] | int | None = None + + +@dataclass(slots=True) +class TaskDataInfo: + """ + The data dependencies for a task in a synthetic task graph. + + @field read: The list of data objects that are read by the task + @field write: The list of data objects that are written by the task (and not read). These don't really exist. + @field read_write: The list of data objects that are read and written by the task + """ + read: list[int | DataAccess] = field(default_factory=list) + write: list[int | DataAccess] = field(default_factory=list) + read_write: list[int | DataAccess] = field(default_factory=list) + + +DataMap = Dict[int, DataInfo] + +######################################### +# Task Graph Information +######################################### + + +@dataclass(frozen=True, slots=True) +class TaskID: + """ + The identifier for a task in a synthetic task graph. + """ + taskspace: str = "T" # The task space the task belongs to + task_idx: Tuple[int] = (0,) # The index of the task in the task space + # How many times the task has been spawned (continuation number) + instance: int = 0 + + +@dataclass(slots=True) +class TaskRuntimeInfo: + """ + The collection of important runtime information / constraints for a task in a synthetic task graph. + """ + task_time: float = 0 + device_fraction: Union[float, Fraction] = 0 + gil_accesses: int = 0 + gil_fraction: Union[float, Fraction] = 0 + memory: int = 0 + + +@dataclass(slots=True) +class TaskPlacementInfo: + locations: list[Device | Tuple[Device]] = field(default_factory=list) + # info: Dict[NDevices, Dict[LocalIdx, Dict[Device, TaskRuntimeInfo]]] + info: Dict[Device | Tuple[Device], TaskRuntimeInfo | + Dict[Device, TaskRuntimeInfo]] = field(default_factory=dict) + lookup: defaultdict[defaultdict[dict]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(dict))) + + def add(self, placement: Device | Tuple[Device], runtime_info: TaskRuntimeInfo | Dict[Device, TaskRuntimeInfo] | List[TaskRuntimeInfo]): + if isinstance(placement, Device): + placement = (placement,) + + if isinstance(placement, tuple) and isinstance(runtime_info, TaskRuntimeInfo): + for localidx, device in enumerate(placement): + self.lookup[len(placement)][localidx][device] = runtime_info + elif isinstance(placement, tuple) and isinstance(runtime_info, dict): + for localidx, device in enumerate(placement): + self.lookup[len(placement) + ][localidx][device] = runtime_info[device] + elif isinstance(placement, tuple) and isinstance(runtime_info, list): + if len(placement) != len(runtime_info): + raise ValueError( + f"Invalid placement and runtime_info. {placement} and {runtime_info} must be the same length.") + for localidx, device in enumerate(placement): + details = runtime_info[localidx] + self.lookup[len(placement)][localidx][device] = details + else: + raise ValueError( + f"Invalid runtime_info type: {type(runtime_info)}. Expected TaskRuntimeInfo or Dict[Device, TaskRuntimeInfo].") + + self.locations.append(placement) + self.info[placement] = runtime_info + + def remove(self, placement: Device | Tuple[Device]): + if isinstance(placement, Device): + placement = (placement,) + + if isinstance(placement, tuple): + for localidx, device in enumerate(placement): + del self.lookup[len(placement)][localidx][device] + else: + raise ValueError( + f"Invalid placement: {placement} of {type(placement)}. Expected Device or Tuple[Device]") + + del self.info[placement] + self.locations.remove(placement) + + def update(self): + """ + Convert self.info to self.lookup + """ + for device, details in self.info.items(): + if isinstance(device, Device): + device = (device,) + + if isinstance(details, dict): + for localidx, d in enumerate(device): + self.lookup[len(device)][localidx][d] = details[d] + elif isinstance(details, list): + if len(device) != len(details): + raise ValueError( + f"Invalid placement and runtime_info. {device} and {details} must be the same length.") + for localidx, d in enumerate(device): + self.lookup[len(device)][localidx][d] = details[localidx] + elif isinstance(details, TaskRuntimeInfo): + for localidx, d in enumerate(device): + self.lookup[len(device)][localidx][d] = details + + self.locations = list(self.info.keys()) + + return self + + def __len__(self): + return len(self.list) + + def __repr__(self): + return repr(self.info) + + def _get_any(self, device, lookup: Dict[Device, TaskRuntimeInfo]): + + if device in lookup: + return lookup[device] + + any_of_device = Device(device.architecture, -1) + if any_of_device in lookup: + return lookup[any_of_device] + + generic = Device(Architecture.ANY, -1) + if generic in lookup: + return lookup[generic] + + return None + + def __getitem__(self, placement: Device | Tuple[Device]) -> List[TaskRuntimeInfo]: + + if placement is None: + raise KeyError("Placement query cannot be None.") + + if isinstance(placement, Device): + placement = (placement,) + + if isinstance(placement, tuple): + runtime_info_list = [] + for idx, device in enumerate(placement): + runtime_info = self._get_any( + device, self.lookup[len(placement)][idx]) + if runtime_info is not None: + runtime_info_list.append(runtime_info) + else: + raise KeyError(f"RuntimeInfo not found for {placement}.") + else: + raise KeyError( + f"Invalid placement: {placement} of {type(placement)}. Expected Device or Tuple[Device]") + + return runtime_info_list + + def __contains__(self, placement: Device | Tuple[Device]) -> bool: + + if placement is None: + return False + + if isinstance(placement, Device): + placement = (placement,) + + if isinstance(placement, tuple): + for idx, device in enumerate(placement): + runtime_info = self._get_any( + device, self.info[len(placement)][idx]) + if runtime_info is None: + return False + else: + raise KeyError( + f"Invalid placement: {placement} of {type(placement)}. Expected Device or Tuple[Device]") + + return True + + +@dataclass(slots=True) +class TaskInfo: + """ + The collection of important information for a task in a synthetic task graph. + """ + id: TaskID + runtime: TaskPlacementInfo + dependencies: list[TaskID] + data_dependencies: TaskDataInfo + mapping: Device | Tuple[Device] | None = None + order: int = 0 + + +# Graph Type Aliases +TaskMap = Dict[TaskID, TaskInfo] + +######################################### +# Execution Records +######################################### + + +@dataclass(slots=True) +class TaskTime: + """ + The parsed timing information from a task from an execution log. + """ + assigned_t: float + start_t: float + end_t: float + duration: float + + +@dataclass(slots=True) +class TimeSample: + """ + A collection of timing information. + """ + mean: float + median: float + std: float + min: float + max: float + n: int + + +######################################### +# Generic Synthetic Graph Configurations +######################################### + +class MovementType(IntEnum): + """ + Used to specify the type of data movement to be used in a synthetic task graph execution. + """ + NO_MOVEMENT = 0 + LAZY_MOVEMENT = 1 + EAGER_MOVEMENT = 2 + + +class DataInitType(IntEnum): + """ + Used to specify the data movement pattern and initialization in a synthetic task graph execution. + """ + NO_DATA = 0 + INDEPENDENT_DATA = 1 + OVERLAPPED_DATA = 2 + + +@dataclass(slots=True) +class DataGraphConfig: + pattern: int = DataInitType.NO_DATA + architecture = Architecture.CPU + total_width: int = 2**23 + npartitions: int = 1 + + +@dataclass(slots=True) +class GraphConfig: + """ + Configures information about generating the synthetic task graph. + + @field task_config: The runtime information for each task + @field fixed_placement: Whether to use a fixed placement mapping for the tasks + @field placement_arch: The architecture to use for fixed mapping + @field n_devices: The number of devices to use for fixed mapping + @field data_config: The data configuration for the graph + + """ + task_config: TaskPlacementInfo = field(default_factory=TaskPlacementInfo) + fixed_placement: bool = False + placement_arch = Architecture.GPU + n_devices: int = 4 + data_config: DataGraphConfig = field(default_factory=DataGraphConfig) + + +######################################### +# Specific Synthetic Graph Configurations +######################################### + +@dataclass(slots=True) +class IndependentConfig(GraphConfig): + """ + Used to configure the generation of an independent synthetic task graph. + + @field task_count: The number of tasks in the graph + """ + task_count: int = 1 + + +@dataclass(slots=True) +class SerialConfig(GraphConfig): + """ + Used to configure the generation of a serial synthetic task graph. + + @field steps: The number of steps in the graph + @field dependency_count: The number of dependencies per task + @field chains: The number of chains to generate that can run in parallel + + Example Graph (steps=3, dependency_count=1, chains=4): + --T(0,1)--T(0, 2)--T(0, 3)--> + --T(1,1)--T(1, 2)--T(1, 3)--> + --T(2,1)--T(2, 2)--T(2, 3)--> + --T(3,1)--T(3, 2)--T(3, 3)--> + + """ + steps: int = 1 + dependency_count: int = 1 + chains: int = 1 + + +@dataclass(slots=True) +class ReductionConfig(GraphConfig): + """ + Used to configure the generation of a reduction synthetic task graph. + + @field levels: The number of levels in the tree + @field branch_factor: The number of children per node + + Example Graph (levels=2, branch_factor=2): + T(0,1) + | \ + T(1,1) T(1,2) + | \ | \ + T(2,1) T(2,2) T(2,3) T(2,4) + """ + levels: int = 8 + branch_factor: int = 2 + + +@dataclass(slots=True) +class ReductionScatterConfig(GraphConfig): + """ + Used to configure the generation of a reduction-scatter task graph. + """ + # The total number of tasks. + # The number of tasks for each level is calculated based on this. + # e.g., 1000 total tasks and 4 levels, then about 333 tasks exist for each level + # with 2 bridge tasks. + task_count: int = 1 + levels: int = 4 # Number of levels in the tree + +######################################### +# Synthetic Graph Execution Configurations +######################################### + + +@dataclass(slots=True) +class RunConfig: + """ + Configuration object for executing a synthetic task graph. + + @field outer_iterations: Number of times to launch the Parla runtime and execute the task graph + @field inner_iterations: Number of times to execute the task graph within the same Parla runtime + @field inner_sync: Whether to synchronize after each kernel launch + @field outer_sync: Whether to synchronize at the end of the task + @field verbose: Whether to print the task graph to the console + @field device_fraction: VCUs + @field data_scale: Scaling factor to increase the size of the data objects + @field threads: Number of threads to use for the Parla runtime + @field task_time: Total time for all tasks (this overrides the time in the graphs) + @field gil_fraction: Fraction of time spent in the GIL (this overrides the time in the graphs) + @field gil_accesses: Number of kernel launches/GIL accesses per task (this overrides the time in the graphs) + @field movement_type: The data movement pattern to use + @field logfile: The log file location + @field do_check: If this is true, validate configuration/execution + @field num_gpus: The number of GPUs to use for the execution + """ + outer_iterations: int = 1 + inner_iterations: int = 1 + inner_sync: bool = False + outer_sync: bool = False + verbose: bool = False + device_fraction: float = None # VCUs + data_scale: float = 1.0 + threads: int = 1 + task_time: Optional[float] = None + gil_fraction: Optional[float] = None + gil_accesses: Optional[int] = None + movement_type: int = MovementType.NO_MOVEMENT + logfile: str = "testing.blog" + do_check: bool = False + num_gpus: int = 4 + +######################################### +# Utility Functions & Conversions +######################################### + + +def apply_mapping(mapping: Dict[TaskID, Device | Tuple[Device]], tasks: TaskMap) -> TaskMap: + """ + Apply the mapping to the tasks + """ + for task_id, device in mapping.items(): + tasks[task_id].mapping = device + + return tasks + + +def apply_order(order: Dict[TaskID, int], tasks: TaskMap) -> TaskMap: + """ + Apply the order to the tasks + """ + for task_id, order in order.items(): + tasks[task_id].order = order + + return tasks + + +def extract_mapping(tasks: TaskMap) -> Dict[TaskID, Device | Tuple[Device]]: + """ + Extract the mapping from the tasks + """ + mapping = {} + for task_id, task in tasks.items(): + mapping[task_id] = task.mapping + + return mapping + + +def extract_order(tasks: TaskMap) -> Dict[TaskID, int]: + """ + Extract the order from the tasks + """ + order = {} + for task_id, task in tasks.items(): + order[task_id] = task.order + + return order + + +def get_base_task_id(task_id: TaskID) -> TaskID: + """ + Get the base task id for a task id. This is the task id with instance=0. + """ + return TaskID(taskspace=task_id.taskspace, task_idx=task_id.task_idx, instance=0) + + +def task_id_to_str(task_id: TaskID) -> str: + """ + Convert a task id to a string + """ + return f"{task_id.taskspace}[{task_id.task_idx}]" + + +def decimal_from_fraction(frac): + return frac.numerator / Decimal(frac.denominator) + + +def numeric_from_str(string: str) -> int | Fraction: + """ + Extracts string as decimal or int + """ + if "." in string: + return Fraction(string) + else: + return int(string) + + +def numeric_to_str(obj: Fraction | Decimal): + """ + Convert other numeric types to strings of the form "0.00" + """ + if isinstance(obj, Fraction): + return f"{decimal_from_fraction(obj):0.2f}" + elif isinstance(obj, Decimal): + return f"{obj:0.2f}" + else: + raise ValueError( + f"Unsupported numeric type {type(obj)} of value {obj}") + + +def sizeof_fmt(num, suffix="B"): + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return f"{num:3.1f}{unit}{suffix}" + num /= 1024.0 + return f"{num:.1f}Yi{suffix}" + + +def make_data_info(data_info: Dict) -> DataInfo: + data_idx = int(data_info["id"]) + data_size = int(data_info["size"]) + data_location = device_from_string(data_info["location"]) + + return DataInfo(data_idx, data_size, data_location) + + +def make_task_id_from_dict(task_id: Dict) -> TaskID: + taskspace = task_id["taskspace"] + task_idx = make_tuple(task_id["task_idx"]) + task_instance = int(task_id["instance"]) + return TaskID(taskspace, task_idx, task_instance) + + +def make_data_access_from_dict(data_access: Dict) -> DataAccess: + data_idx = int(data_access["id"]) + + if "pattern" in data_access: + data_pattern = data_access["pattern"] + if data_pattern is not None: + raise NotImplementedError( + "Access patterns currently not supported.") + return DataAccess(id=data_idx) + + +def make_data_dependencies_from_dict(data_dependencies: Dict) -> TaskDataInfo: + read_data = [make_data_access_from_dict( + x) for x in data_dependencies["read"]] + write_data = [make_data_access_from_dict( + x) for x in data_dependencies["write"]] + read_write_data = [make_data_access_from_dict( + x) for x in data_dependencies["read_write"]] + return TaskDataInfo(read_data, write_data, read_write_data) + + +def make_task_runtime_from_dict(task_runtime: Dict) -> TaskRuntimeInfo: + task_time = int(task_runtime["task_time"]) + device_fraction = Fraction(task_runtime["device_fraction"]) + gil_accesses = int(task_runtime["gil_accesses"]) + gil_fraction = Fraction(task_runtime["gil_fraction"]) + memory = int(task_runtime["memory"]) + + return TaskRuntimeInfo(task_time, device_fraction, gil_accesses, gil_fraction, memory) + + +def device_from_string(device_str: str) -> Device | Tuple[Device]: + """ + Convert a device string (or string of a device tuple) to a device set + """ + if device_str is None: + return None + + device_str = device_str.strip() + device_str = device_str.strip("()") + device_str = device_str.strip() + device_str = device_str.split(",") + device_str = [d.strip() for d in device_str] + + devices = [] + + for d in device_str: + if d.isspace() or d == "": + continue + + d = d.strip() + d = d.strip("]") + d = d.split("[") + + if d[0] == "CPU": + devices.append(Device(Architecture.CPU, int(d[1]))) + elif d[0] == "GPU": + devices.append(Device(Architecture.GPU, int(d[1]))) + elif d[0] == "ANY": + devices.append(Device(Architecture.ANY, int(d[1]))) + else: + raise ValueError(f"Unknown device type {d[0]} in {device_str}") + + if len(devices) == 1: + return devices[0] + else: + return tuple(devices) + + +def make_task_placement_from_dict(task_runtime: Dict) -> Dict[Device | Tuple[Device], TaskRuntimeInfo | Dict[Device | Tuple[Device], TaskRuntimeInfo]]: + """ + Parse the device runtime from a dictionary + """ + device_runtime = {} + for device_str, runtime in task_runtime.items(): + device = device_from_string(device_str) + + if 'task_time' in runtime: + device_runtime[device] = make_task_runtime_from_dict(runtime) + elif isinstance(runtime, list): + device_runtime[device] = [ + make_task_runtime_from_dict(r) for r in runtime] + elif isinstance(device, tuple): + device_runtime[device] = make_task_runtime_from_dict(runtime) + else: + raise ValueError( + f"Unknown device type {device} or Invalid runtime {runtime} configuration.") + + device_runtime = TaskPlacementInfo(info=device_runtime) + device_runtime.update() + + return device_runtime diff --git a/src/python/parla/utility/visualize.py b/src/python/parla/utility/visualize.py new file mode 100644 index 00000000..d3b2cce5 --- /dev/null +++ b/src/python/parla/utility/visualize.py @@ -0,0 +1,42 @@ +import networkx as nx +import pydot +import io +import matplotlib.image as mpimg +import matplotlib.pyplot as plt + +from .types import * + + +def convert_to_networkx(tasks: TaskMap): + G = nx.DiGraph() + for task in tasks.values(): + name = task_id_to_str(task.id) + G.add_node(task.id, label=name) + for dependency in task.dependencies: + G.add_edge(dependency, task.id) + + labels = {} + for task in tasks.values(): + name = task_id_to_str(task.id) + labels[task.id] = name + + return G, labels + + +def plot_networkx(G: nx.DiGraph, labels: Dict[TaskID, str]): + pos = nx.spring_layout(G, seed=5) + nx.draw_networkx_nodes(G, pos=pos, node_size=700) + nx.draw_networkx_edges(G, pos=pos) + nx.draw_networkx_labels(G, pos=pos, labels=labels) + plt.tight_layout() + plt.axis("off") + + +def plot_pydot(G: nx.DiGraph): + pg = nx.drawing.nx_pydot.to_pydot(G) + png_str = pg.create_png(prog='dot') + sio = io.BytesIO() + sio.write(png_str) + sio.seek(0) + img = mpimg.imread(sio) + implot = plt.imshow(img, aspect='equal') diff --git a/test_new_graphs.py b/test_new_graphs.py new file mode 100644 index 00000000..49498bcc --- /dev/null +++ b/test_new_graphs.py @@ -0,0 +1,480 @@ +from parla.utility.graphs import generate_single_device_serial, SerialConfig +from parla.utility.graphs import TaskConfig, TaskConfigs, DataGraphConfig, IndependentConfig, Device, Architecture +from parla.utility.graphs import RunConfig, TaskDataInfo, DataInfo, TaskInfo, TaskID, TaskRuntimeInfo +from typing import Tuple, Dict +from ast import literal_eval as make_tuple +from rich import print + +import yaml +from fractions import Fraction +from decimal import Decimal + + +def decimal_from_fraction(frac): + return frac.numerator / Decimal(frac.denominator) + +def convert_numerics_to_str(obj: Fraction | Decimal): + """ + Convert other numeric types to strings of the form "0.00" + """ + if isinstance(obj, Fraction): + return f"{decimal_from_fraction(obj):0.2f}" + elif isinstance(obj, Decimal): + return f"{obj:0.2f}" + else: + raise ValueError(f"Unsupported numeric type {type(obj)} of value {obj}") + + +def write_object_to_dict(obj): + """ + Write a single task to an open YAML file + """ + sub_dict = {} + + def is_base(x): return isinstance( + x, (int, float, str, bool, type(None))) + + def is_base_str(x): return isinstance(x, (tuple, Architecture, Device)) + + def is_base_value(x): return isinstance(x, (Decimal, Fraction)) + + def unpack_values(values): + if is_base_str(value): + return str(value) + elif is_base(value): + return value + elif is_base_value(value): + return convert_numerics_to_str(value) + elif isinstance(value, list): + return [write_object_to_dict(x) for x in value] + else: + return write_object_to_dict(value) + + if isinstance(obj, Dict): + for key, value in obj.items(): + key = str(key) + sub_dict[key] = unpack_values(value) + elif is_base(obj): + return obj + else: + for slot in obj.__slots__: + value = getattr(obj, slot) + sub_dict[slot] = unpack_values(value) + + return sub_dict + +def write_to_pgraph(tasks: Dict[TaskID, TaskInfo], data: Dict[int, DataInfo], basename: str = "graph"): + """ + Write the task graph to a pgraph file + """ + + taskfile = basename + ".pgraph" + + def info_to_comma(info): + comma = ", ".join( + [f"{getattr(info, slot)}" for slot in info.__slots__]) + return comma + + def unpack_runtime(runtime: TaskRuntimeInfo | Dict[Device | Tuple[Device], TaskRuntimeInfo]): + if isinstance(runtime, Dict): + return ", ".join([f"{{{device} : {unpack_runtime(r)}}}" for device, + r in runtime.items()]) + elif isinstance(runtime, TaskRuntimeInfo): + return info_to_comma(runtime) + else: + raise ValueError(f"Unknown runtime type {runtime}") + + def unpack_id(task_id): + return f"('{task_id.taskspace}', {tuple(task_id.task_idx)})" + + def unpack_dependencies(dependencies: list[TaskID]): + return ": ".join( [f" {unpack_id(task_id)} " for task_id in dependencies] ) + + def unpack_data_dependencies(dependencies: TaskDataInfo): + read_data = dependencies.read + write_data = dependencies.write + read_write_data = dependencies.read_write + + read_string = ", ".join([f"{data}" for data in read_data]) + write_string = ", ".join([f"{data}" for data in write_data]) + read_write_string = ", ".join([f"{data}" for data in read_write_data]) + + return f"{read_string} : {write_string} : {read_write_string}" + + + data_line = ", ".join([f"{{{data.size} : {data.location}}}" for data in data.values()]) + task_lines = [] + + + for task in tasks.values(): + task_id = unpack_id(task.task_id) + task_runtime = unpack_runtime(task.task_runtime) + task_dependencies = unpack_dependencies(task.task_dependencies) + task_data_dependencies = unpack_data_dependencies(task.data_dependencies) + + task_line = f"{task_id} | {task_runtime} | {task_dependencies} | {task_data_dependencies}" + task_lines.append(task_line) + + with open(taskfile, "w") as file: + print(data_line, file=file) + for task_line in task_lines: + print(task_line, file=file) + + +def extract(string: str) -> int | Fraction: + """ + Extracts string as decimal (in Fraction form) or int + """ + if "." in string: + return Fraction(string) + else: + return int(string) + + +def read_from_pgraph(basename: str = "graph") -> Tuple[Dict[TaskID, TaskInfo], Dict[int, DataInfo]]: + + task_dict = dict() + data_dict = dict() + + filename = basename + ".pgraph" + + def extract_task_id(line: str) -> TaskID: + try: + ids = line.strip() + ids = make_tuple(ids) + + if not isinstance(ids, tuple): + ids = (ids,) + + if isinstance(ids[0], str) and ids[0].isalpha(): + taskspace = ids[0] + task_idx = ids[1] + + if not isinstance(task_idx, tuple): + task_idx = (task_idx,) + + else: + taskspace = "T" + task_idx = ids + + return TaskID(taskspace, task_idx, 0) + except Exception as e: + raise ValueError(f"Could not parse task id {line}: {e}") + + def extract_task_runtime(line: str) -> Dict[Device | Tuple[Device], TaskRuntimeInfo]: + try: + line = line.strip() + + configurations = line.split("},") + configurations = [config.strip().strip("{}").strip() for config in configurations] + task_runtime = {} + for config in configurations: + targets, details = config.split(":") + targets = device_from_string(targets) + + details = [extract(detail.strip()) for detail in details.split(",")] + + task_runtime[targets] = TaskRuntimeInfo(*details) + + return task_runtime + except Exception as e: + raise ValueError(f"Could not parse task runtime {line}: {e}") + + def extract_task_dependencies(line: str) -> list[TaskID]: + try: + line = line.strip() + dependencies = line.split(":") + dependencies = [dependency.strip() for dependency in dependencies] + + if dependencies[0] == "": + return [] + + return [extract_task_id(dependency) for dependency in dependencies] + except Exception as e: + raise ValueError(f"Could not parse task dependencies {line}: {e}") + + + + def extract_data_dependencies(line: str) -> TaskDataInfo: + try: + line = line.strip() + dependencies = line.split(":") + dependencies = [dependency.strip() for dependency in dependencies] + + check_has = [(not dependency.isspace()) and (not dependency == '') + for dependency in dependencies] + + if not any(check_has): + return TaskDataInfo([], [], []) + + if len(dependencies) > 3: + raise ValueError(f"Too many data movement types {dependencies}") + + if len(dependencies) < 1 or dependencies[0].isspace() or not check_has[0]: + read_data = [] + else: + read_data = [int(x.strip()) for x in dependencies[0].split(",")] + + if len(dependencies) < 2 or dependencies[1].isspace() or not check_has[1]: + write_data = [] + else: + write_data = [int(x.strip()) for x in dependencies[1].split(",")] + + if len(dependencies) < 3 or dependencies[2].isspace() or not check_has[2]: + read_write_data = [] + else: + read_write_data = [int(x.strip()) if (x) else None for x in dependencies[2].split(",")] + + return TaskDataInfo(read_data, write_data, read_write_data) + except Exception as e: + raise ValueError(f"Could not parse data dependencies {line}: {e}") + + with open(filename, "r") as file: + + lines = file.readlines() + + data_line = lines.pop(0) + data_line = data_line.strip() + data_line = data_line.split(",") + + for idx, data in enumerate(data_line): + data = data.strip() + data = data.strip("{}") + data = data.split(":") + data_size = int(data[0]) + data_location = device_from_string(data[1]) + data_info = DataInfo(idx, data_size, data_location) + data_dict[idx] = data_info + + for task_line in lines: + task = task_line.strip() + task = task.split("|") + + if len(task) < 2: + raise ValueError(f"Task line {task_line} is too short") + + task_id = extract_task_id(task[0]) + task_runtime = extract_task_runtime(task[1]) + if len(task) > 2: + task_dependencies = extract_task_dependencies(task[2]) + else: + task_dependencies = [] + + if len(task) > 3: + task_data_dependencies = extract_data_dependencies(task[3]) + else: + task_data_dependencies = TaskDataInfo([], [], []) + + task_info = TaskInfo(task_id, task_runtime, task_dependencies, task_data_dependencies) + task_dict[task_id] = task_info + + return task_dict, data_dict + + + + + + + + + + + + + +def write_to_yaml(tasks: Dict[TaskID, TaskInfo], data: Dict[int, DataInfo], basename: str = "graph"): + """ + Write the task graph to a yaml file + """ + + taskfile = basename + ".tasks.yaml" + datafile = basename + ".data.yaml" + + with open(taskfile, "w") as file: + tasks = [write_object_to_dict(task) for task in tasks.values()] + yaml.dump(tasks, file, default_flow_style=False, sort_keys=False) + + with open(datafile, "w") as file: + data = [write_object_to_dict(data) for data in data.values()] + yaml.dump(data, file, default_flow_style=False, sort_keys=False) + +def read_from_yaml(basename: str = "graph") -> Tuple[Dict[TaskID, TaskInfo], Dict[int, DataInfo]]: + """ + Read the task graph from a yaml file + """ + + taskfile = basename + ".tasks.yaml" + datafile = basename + ".data.yaml" + + task_dict = dict() + with open(taskfile, "r") as file: + tasks = yaml.load(file, Loader=yaml.FullLoader) + tasks = [read_tasks_from_dict(task) for task in tasks] + + for task in tasks: + task_dict[task.task_id] = task + + data_dict = dict() + with open(datafile, "r") as file: + data = yaml.load(file, Loader=yaml.FullLoader) + data = [make_data_info(data) for data in data] + + for data in data: + data_dict[data.idx] = data + + return task_dict, data_dict + + +def make_data_info(data_info: Dict) -> DataInfo: + data_idx = int(data_info["idx"]) + data_size = int(data_info["size"]) + data_location = device_from_string(data_info["location"]) + + return DataInfo(data_idx, data_size, data_location) + + +def make_task_id(task_id: Dict) -> TaskID: + taskspace = task_id["taskspace"] + task_idx = make_tuple(task_id["task_idx"]) + task_instance = int(task_id["instance"]) + return TaskID(taskspace, task_idx, task_instance) + + +def make_data_dependencies(data_dependencies: Dict) -> TaskDataInfo: + read_data = [int(x) for x in data_dependencies["read"]] + write_data = [int(x) for x in data_dependencies["write"]] + read_write_data = [int(x) for x in data_dependencies["read_write"]] + return TaskDataInfo(read_data, write_data, read_write_data) + +def make_task_runtime(task_runtime: Dict) -> TaskRuntimeInfo: + task_time = int(task_runtime["task_time"]) + device_fraction = Fraction(task_runtime["device_fraction"]) + gil_accesses = int(task_runtime["gil_accesses"]) + gil_fraction = Fraction(task_runtime["gil_fraction"]) + memory = int(task_runtime["memory"]) + + return TaskRuntimeInfo(task_time, device_fraction, gil_accesses, gil_fraction, memory) + + +def device_from_string(device_str: str) -> Device | Tuple[Device]: + """ + Convert a device string (or string of a device tuple) to a device set + """ + if device_str is None: + return None + + device_str = device_str.strip() + device_str = device_str.strip("()") + device_str = device_str.strip() + device_str = device_str.split(",") + device_str = [d.strip() for d in device_str] + + devices = [] + + for d in device_str: + d = d.strip() + d = d.strip("]") + d = d.split("[") + + if d[0] == "CPU": + devices.append(Device(Architecture.CPU, int(d[1]))) + elif d[0] == "GPU": + devices.append(Device(Architecture.GPU, int(d[1]))) + elif d[0] == "ANY": + devices.append(Device(Architecture.ANY, int(d[1]))) + else: + raise ValueError(f"Unknown device type {d[0]}") + + if len(devices) == 1: + return devices[0] + else: + return tuple(devices) + +def parse_device_runtime(task_runtime: Dict) -> Dict[Device | Tuple[Device], TaskRuntimeInfo | Dict[Device | Tuple[Device], TaskRuntimeInfo]]: + """ + Parse the device runtime from a dictionary + """ + device_runtime = {} + for device_str, runtime in task_runtime.items(): + device = device_from_string(device_str) + + if 'task_time' in runtime: + device_runtime[device] = make_task_runtime(runtime) + elif isinstance(device, tuple): + device_runtime[device] = parse_device_runtime(runtime) + else: + raise ValueError(f"Unknown device type {device} or Invalid runtime {runtime} configuration.") + + return device_runtime + + +def read_tasks_from_dict(task_dict: Dict) -> TaskInfo: + """ + Read a task from a dictionary + """ + + task_id = make_task_id(task_dict["task_id"]) + + + task_runtime = parse_device_runtime(task_dict["task_runtime"]) + task_dependencies = [make_task_id(task) for task in task_dict["task_dependencies"]] + data_dependencies = make_data_dependencies(task_dict["data_dependencies"]) + + if 'task_mapping' in task_dict: + task_mapping = device_from_string(task_dict["task_mapping"]) + else: + task_mapping = None + + if 'task_order' in task_dict: + task_order = int(task_dict["task_order"]) + else: + task_order = 0 + + return TaskInfo(task_id=task_id, task_runtime=task_runtime, task_dependencies=task_dependencies, data_dependencies=data_dependencies, task_mapping=task_mapping, task_order=task_order) + + + + + + +def test_generate_single_device_independent(): + + cpu = Device(Architecture.CPU, 0) + gpu = Device(Architecture.GPU, -1) + + gpu1 = Device(Architecture.GPU, 1) + gpu2 = Device(Architecture.GPU, 2) + + + #A few type checks: + print("Is Tuple: ", isinstance(cpu, Tuple)) + print("Is Device: ", isinstance(cpu, Device)) + + task_configs = TaskConfigs() + task_configs.add((gpu1, gpu2), TaskConfig(task_time=10, gil_accesses=1)) + task_configs.add(cpu, TaskConfig(task_time=1000, gil_accesses=1)) + task_configs.add(gpu, TaskConfig(task_time=1000, gil_accesses=1)) + data_config = DataGraphConfig(pattern=1) + config = SerialConfig(steps=2, task_config=task_configs, data_config=data_config, fixed_placement=False, n_devices=2) + + tasks, data = generate_single_device_serial(config) + + + write_to_yaml(tasks, data) + tasks, data = read_from_yaml() + write_to_yaml(tasks, data) + tasks, data = read_from_yaml() + + #write_to_pgraph(tasks, data) + #tasks, data = read_from_pgraph() + + print(tasks) + + + + + + #print(tasks) + + +test_generate_single_device_independent()