Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data movement task dependencies with clang-formatting #110

Merged
merged 6 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/c/backend/include/phases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class MemoryReserver : virtual public SchedulerPhase {
* @param task The task to create data movement tasks for.
*/
void create_datamove_tasks(InnerTask *task);
void create_datamove_tasks2(InnerTask *task);
};

/**
Expand Down
15 changes: 14 additions & 1 deletion src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class InnerTask {
/* Task Assigned Device Set*/
std::vector<Device *> assigned_devices;

/*Resource Requirements for each assigned device*/
/* Resource Requirements for each assigned device*/
std::unordered_map<int, ResourcePool_t> device_constraints;

/* Task is data movement task */
Expand All @@ -293,6 +293,15 @@ class InnerTask {
std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
parray_list;

/* A list of dependency tasks of a parray for this task's dependent tasks.
To be specific, a task sets dependencies of a parray for dependent tasks.
If this task's access permission to a parray includes write, it sets
itself as the dependency of the parray.
If this task's access permission to the parray is read-only, it pulls
this list of the dependencies to this map.
*/
std::unordered_map<uint64_t, std::vector<InnerTask*>> parray_dependencies_map;

InnerTask();
InnerTask(long long int id, void *py_task);
InnerTask(std::string name, long long int id, void *py_task);
Expand Down Expand Up @@ -623,6 +632,10 @@ class InnerTask {
void begin_multidev_req_addition();
void end_multidev_req_addition();

std::vector<InnerTask*>& get_parray_dependencies(uint64_t parray_parent_id) {
return this->parray_dependencies_map[parray_parent_id];
}

PlacementRequirementCollections &get_placement_req_options() {
return placement_req_options_;
}
Expand Down
83 changes: 82 additions & 1 deletion src/c/backend/phases.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,87 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) {
task->add_dependencies(data_tasks, true);
}



// TODO(hc): need to think about better naming before it is merged.
// first, need peer-review on this.
void MemoryReserver::create_datamove_tasks2(InnerTask *task) {
// Get a list of the parrays the current task holds.
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list = task->parray_list;
std::string task_base_name = task->get_name();
std::vector<InnerTask *> data_tasks;
data_tasks.reserve(parray_list.size());

for (size_t i = 0; i < parray_list.size(); ++i) {
for (size_t j = 0; j < parray_list[i].size(); ++j) {
// Create a data movement task for each PArray.
parray::InnerPArray *parray = parray_list[i][j].first;
AccessMode access_mode = parray_list[i][j].second;
InnerDataTask *datamove_task = new InnerDataTask(
// TODO(hc): id should be updated!
task_base_name + ".dm." + std::to_string(i), 0, parray, access_mode,
i);
uint64_t parray_parent_id = parray->get_parent_parray()->id;
// Get dependencies
std::vector<void *> compute_task_dependencies = task->get_dependencies();
std::vector<InnerTask *> data_task_dependencies;
for (size_t k = 0; k < compute_task_dependencies.size(); ++k) {
InnerTask *parray_dependency =
static_cast<InnerTask *>(compute_task_dependencies[k]);
// Get dependencies of a parray having `parray_parent_id` that have
// registered to the traversed dependency task
std::vector<InnerTask*>& dep_parray_dependencies =
parray_dependency->get_parray_dependencies(parray_parent_id);

//std::cout << parray_dependency->name << " is being traversed\n";
for (size_t t = 0; t < dep_parray_dependencies.size(); ++t) {
data_task_dependencies.push_back(parray_dependency);
// If the current processing parray's access mode is READ ONLY,
// add this dependency as a dependency for this parray.
//std::cout << "access mode:" << int(access_mode) << "\n";
if (access_mode == AccessMode::IN) {
//std::cout << "IN parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(parray_dependency);
}
}
}

// If the current processing parray's access mode is not READ ONLY,
// add itself as a dependency for this parray.
//std::cout << task->name << " is being traversed access id :" << int(access_mode) << "\n";
if (access_mode != AccessMode::IN) {
//std::cout << "IN/OUT OUT parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(task);
}

// TODO(hc): pass false to add_dependencies() as optimization.
datamove_task->add_dependencies(data_task_dependencies, true);
// Copy assigned devices to a compute task to a data movement task.
// TODO(hc): When we support xpy, it should be devices corresponding
// to placements of the local partition.
auto device = task->get_assigned_devices()[i];
datamove_task->add_assigned_device(device);

datamove_task->device_constraints.emplace(
std::piecewise_construct,
std::forward_as_tuple(device->get_global_id()),
std::forward_as_tuple(0, 0, 1));

data_tasks.push_back(datamove_task);
// Add the created data movement task to a reserved task queue.
this->scheduler->increase_num_active_tasks();
this->reserved_tasks_buffer.push_back(datamove_task);
}
}

// Create dependencies between data move task and compute tasks.
task->add_dependencies(data_tasks, true);
}




void MemoryReserver::run(SchedulerPhase *next_phase) {
NVTX_RANGE("MemoryReserver::run", NVTX_COLOR_LIGHT_GREEN)

Expand All @@ -263,7 +344,7 @@ void MemoryReserver::run(SchedulerPhase *next_phase) {
if (can_reserve) {
this->reserve_resources(task);
this->reservable_tasks->pop();
this->create_datamove_tasks(task);
this->create_datamove_tasks2(task);
this->reserved_tasks_buffer.push_back(task);
} else {
// TODO:(wlr) we need some break condition to allow the scheduler to
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/common/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def process_crosspys(
_out.append((parray, i))
else:
raise TypeError(
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
)
return _out

Expand Down
1 change: 1 addition & 0 deletions src/python/parla/common/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def print_config():
print("Default Runahead Behavior: ", default_sync, flush=True)
print("VCU Precision: ", VCU_BASELINE, flush=True)


class DeviceType(IntEnum):
"""
This class declares device types.
Expand Down
16 changes: 7 additions & 9 deletions src/python/parla/common/parray/coherence.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class MemoryOperation:

# Flag
SWITCH_DEVICE_FLAG = (
101
) # if the flag is set, it means dst is not the current device
101 # if the flag is set, it means dst is not the current device
)
LOAD_SUBARRAY = (
102
) # if the flag is set, it means a subarray of src should be loaded
102 # if the flag is set, it means a subarray of src should be loaded
)
ENSURE_IS_COMPLETE = (
103
) # if the flag is set, check data will also check if the data is complete
103 # if the flag is set, check data will also check if the data is complete
)

def __init__(self, inst: int = NOOP, dst: int = -1, src: int = -1, flag: int = []):
self.inst = inst
Expand Down Expand Up @@ -125,9 +125,7 @@ def __init__(self, init_owner: int, num_gpu: int, cyparray_state: CyPArrayState)
self._is_complete[CPU_INDEX] = None

self._local_states[init_owner] = self.MODIFIED # initial state is MODIFIED
self.owner = (
init_owner
) # the device that has the complete copy (take the role of main memory)
self.owner = init_owner # the device that has the complete copy (take the role of main memory)
self._versions[init_owner] = 0 # the first version is 0
self._is_complete[init_owner] = True # the copy is complete
self._latest_version = 0 # the latest version in the system
Expand Down
9 changes: 3 additions & 6 deletions src/python/parla/common/parray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,11 @@ def get_array(self, device_idx: Optional[int] = None) -> ndarray:
"""

if device_idx is None:
device_idx = self._current_device_index
device_idx = self._current_device_index

if self._slices: # so this is a sub-parray object
# index into origin array by saved slices
ret = self._array.get_by_global_slices(
device_idx, self._slices[0]
)
ret = self._array.get_by_global_slices(device_idx, self._slices[0])
for s in self._slices[1:]:
ret = ret[s]
return ret
Expand Down Expand Up @@ -214,13 +212,12 @@ def _current_device_index(self) -> int:
# to avoid import gpu context, which is slow to setup.
return device.device.id # device.device should be a cupy.cuda.Device object


# Public API:

def set_name(self, name: str):
self._name = name

def get(self, device: Optional[PyDevice] = None) -> 'np.ndarray' | 'cp.ndarray':
def get(self, device: Optional[PyDevice] = None) -> "np.ndarray" | "cp.ndarray":
if device is None:
return self.array
else:
Expand Down
4 changes: 2 additions & 2 deletions src/python/parla/common/parray/from_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ def get_parray(object, count=0): # recursively process Sequence or Dictionary
elif isinstance(object, dict):
accumulator = {}
for key, value in object.items():
accumulator[key] = get_parray(value, count+1)
accumulator[key] = get_parray(value, count + 1)
return accumulator
elif isinstance(object, (list, tuple, set)):
accumulator = []
for item in object:
accumulator.append(get_parray(item, count+1))
accumulator.append(get_parray(item, count + 1))
return type(object)(accumulator)
else:
raise TypeError(f"Unsupported Type: {type(object)}")
Expand Down
6 changes: 0 additions & 6 deletions src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1789,9 +1789,3 @@ class BackendTaskSpace(TaskSpace):

def wait(self):
self.inner_space.wait()






6 changes: 3 additions & 3 deletions src/python/parla/utility/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ class RunConfig:
"""

outer_iterations: int = (
1
) # Number of times to launch the Parla runtime and execute the task graph
1 # Number of times to launch the Parla runtime and execute the task graph
)
# Number of times to execute the task graph within the same Parla runtime
inner_iterations: int = 1
inner_sync: bool = False # Whether to synchronize after each kernel launch
Expand Down Expand Up @@ -504,7 +504,7 @@ def get_task_properties(line: str):


def parse_blog(
filename: str = "parla.blog"
filename: str = "parla.blog",
) -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]:
try:
result = subprocess.run(
Expand Down