Skip to content

Commit

Permalink
refs #10: Revive and clean up PPC calculation codes.
Browse files Browse the repository at this point in the history
  • Loading branch information
achimnol committed Jul 3, 2015
1 parent b4b1cad commit 49102c4
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 38 deletions.
3 changes: 0 additions & 3 deletions lib/coprocessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ static void coproc_task_input_cb(struct ev_loop *loop, struct ev_async *watcher,
if (task != nullptr) {
assert(task->cctx != nullptr);
task->coproc_ctx = ctx;
task->offload_start = rte_rdtsc();
task->copy_h2d();
task->execute();
/* We separate d2h copy step since CUDA implicitly synchronizes
Expand Down Expand Up @@ -114,8 +113,6 @@ static void coproc_task_done_cb(struct ev_loop *loop, struct ev_async *watcher,
OffloadTask *task = ctx->task_done_queue->front();
ctx->task_done_queue->pop_front();
if (task->poll_d2h_copy_finished()) {
task->offload_cost += (rte_rdtsc() - task->offload_start);
task->offload_start = 0;
task->notify_completion();
} else
ctx->task_done_queue->push_back(task);
Expand Down
22 changes: 4 additions & 18 deletions lib/elementgraph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@ void ElementGraph::flush_offloaded_tasks()
while (!ready_tasks[dev_idx].empty()) {
OffloadTask *task = ready_tasks[dev_idx].front();
ready_tasks[dev_idx].pop_front();
//task->offload_start = rte_rdtsc();

/* Start offloading! */
// TODO: create multiple cctx_list and access them via dev_idx for hetero-device systems.
//if (!ctx->cctx_list.empty()) {
ComputeContext *cctx = ctx->cctx_list.front();
//ctx->cctx_list.pop_front();
if (cctx->state == ComputeContext::READY) {

/* Grab a compute context. */
Expand All @@ -78,17 +75,11 @@ void ElementGraph::flush_offloaded_tasks()
task->dbid_h2d[dbid] = k;
}

//size_t total_num_pkts = 0;
for (PacketBatch *batch : task->batches) {
//total_num_pkts = batch->count;
if (batch->datablock_states == nullptr)
assert(0 == rte_mempool_get(ctx->dbstate_pool, (void **) &batch->datablock_states));
//assert(task->offload_start);
//task->offload_cost += (rte_rdtsc() - task->offload_start);
task->offload_start = 0;
}
//print_ratelimit("avg.# pkts sent to GPU", total_num_pkts, 100);
//assert(total_num_pkts > 0);

task->prepare_read_buffer();
task->prepare_write_buffer();
Expand Down Expand Up @@ -176,7 +167,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)
int input_port = batch->input_port;
int batch_disposition = CONTINUE_TO_PROCESS;
int64_t lb_decision = anno_get(&batch->banno, NBA_BANNO_LB_DECISION);
uint64_t _cpu_start = rte_rdtsc();
uint64_t now = rdtscp(); // The starting timestamp of the current element.

/* Check if we can and should offload. */
if (!batch->has_results) {
Expand Down Expand Up @@ -246,25 +237,21 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)
* 10 x avg.task completion time.
*/
assert(task->batches.size() > 0);
uint64_t now = _cpu_start;
if (task->batches.size() == ctx->num_coproc_ppdepth
//|| (ctx->load_balancer != nullptr && ctx->load_balancer->is_changed_to_cpu)
//|| (now - task->batches[0]->recv_timestamp) / (float) rte_get_tsc_hz()
// > ctx->inspector->avg_task_completion_sec[dev_idx] * 10
)//|| (ctx->io_ctx->mode == IO_EMUL && !ctx->stop_task_batching))
{
//printf("avg task completion time: %.6f sec\n", ctx->inspector->avg_task_completion_sec[dev_idx]);

offloadable->tasks[dev_idx] = nullptr; // Let the element be able to take next pkts/batches.
task->begin_timestamp = now;
task->offload_start = _cpu_start;

task->offload_start = rdtscp();
ready_tasks[dev_idx].push_back(task);
#ifdef USE_NVPROF
nvtxRangePop();
#endif
flush_offloaded_tasks();
}
flush_offloaded_tasks();

/* At this point, the batch is already consumed to the task
* or delayed. */
Expand All @@ -275,8 +262,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)
} else {
/* If not offloaded, run the element's CPU-version handler. */
batch_disposition = current_elem->_process_batch(input_port, batch);
double _cpu_end = rte_rdtsc();
batch->compute_time += (_cpu_end - _cpu_start);
batch->compute_time += (rdtscp() - now);
}
} else {
/* If not offloadable, run the element's CPU-version handler. */
Expand Down
21 changes: 7 additions & 14 deletions lib/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn
(void **) tasks,
ctx->task_completion_queue_size);
print_ratelimit("# done tasks", nr_tasks, 100);
uint64_t now = rdtscp();

for (unsigned t = 0; t < nr_tasks && !io_ctx->loop_broken; t++) {
/* We already finished postprocessing.
Expand All @@ -209,25 +210,24 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn
nvtxRangePush("task");
#endif

assert(task->offload_start == 0);
//task->offload_start = rte_rdtsc();
/* Run postprocessing handlers. */
task->postprocess();

/* Update statistics. */
float time_spent = (rte_rdtsc() - task->begin_timestamp) / (float) rte_get_tsc_hz();
float time_spent = (now - task->offload_start) / (float) rte_get_tsc_hz();
uint64_t task_count = ctx->inspector->dev_finished_task_count[task->local_dev_idx];
ctx->inspector->avg_task_completion_sec[task->local_dev_idx] \
= (ctx->inspector->avg_task_completion_sec[task->local_dev_idx] * task_count + time_spent) / (task_count + 1);
ctx->inspector->dev_finished_task_count[task->local_dev_idx] ++;
ctx->inspector->dev_finished_batch_count[task->local_dev_idx] += task->batches.size();

/* Enqueue batches for later processing. */
//task->offload_cost += (rte_rdtsc() - task->offload_start);
task->offload_start = 0;
double task_time = (task->offload_cost);
uint64_t task_cycles = now - task->offload_start;
uint64_t total_batch_size = 0;
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++)
total_batch_size += task->batches[b]->count;
for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) {
task->batches[b]->compute_time += task_time / ((ctx->num_coproc_ppdepth));
task->batches[b]->compute_time += task_cycles / total_batch_size;
ctx->elem_graph->enqueue_postproc_batch(task->batches[b], task->elem,
task->input_ports[b]);
}
Expand Down Expand Up @@ -679,13 +679,6 @@ void io_tx_batch(struct io_thread_context *ctx, PacketBatch *batch)
}
}
double &thruput = ctx->comp_ctx->inspector->tx_pkt_thruput;
//ctx->LB_THRUPUT_WINDOW_SIZE = 16384; // (1 << 16);
////if (tick - ctx->last_tx_tick > rte_get_tsc_hz() * 0.1) {
// double thr = (double) ctx->global_tx_cnt * 1e3 / (tick - ctx->last_tx_tick);
// thruput = (thruput * (ctx->LB_THRUPUT_WINDOW_SIZE - 1) + thr) / ctx->LB_THRUPUT_WINDOW_SIZE;
// ctx->global_tx_cnt = 0;
// ctx->last_tx_tick = tick;
////}
//#ifdef NBA_CPU_MICROBENCH
// {
// long long ctr[5];
Expand Down
3 changes: 1 addition & 2 deletions lib/offloadtask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ OffloadTask::OffloadTask()
completion_watcher = nullptr;
completion_queue = nullptr;
cctx = nullptr;
offload_cost = 0;
offload_start = 0;
}

Expand Down Expand Up @@ -130,7 +129,7 @@ bool OffloadTask::copy_h2d()
int dbid_d = dbid_h2d[dbid];
dbarray_h[dbid_d].total_item_count_in = 0;
dbarray_h[dbid_d].total_item_count_out = 0;
assert(dbid_d < datablocks.size());
assert(dbid_d < (signed) datablocks.size());

DataBlock *db = comp_ctx->datablock_registry[dbid];
struct read_roi_info rri;
Expand Down
1 change: 0 additions & 1 deletion lib/offloadtask.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public:

public:
/* Initialized during execute(). */
uint64_t begin_timestamp;
struct resource_param res;
uint64_t offload_start;
double offload_cost;
Expand Down

0 comments on commit 49102c4

Please sign in to comment.