@@ -263,7 +263,7 @@ ana::dataset::reader<DS> ana::dataflow::open(Args &&...args) {
263
263
// open dataset reader and processor for each thread
264
264
// slot for each partition range
265
265
this ->m_players = lockstep::get_node (
266
- [ds](dataset::range & part) { return ds->open_player (part); },
266
+ [ds](dataset::range * part) { return ds->open_player (* part); },
267
267
this ->m_parts );
268
268
this ->m_processors = lockstep::node<dataset::processor>(
269
269
m_parts.concurrency (), this ->m_weight / this ->m_norm );
@@ -275,8 +275,8 @@ template <typename DS, typename Val>
275
275
auto ana::dataflow::read (dataset::input<DS> &ds, const std::string &name)
276
276
-> lazy<read_column_t<DS, Val>> {
277
277
auto act = lockstep::get_node (
278
- [name, &ds](dataset::processor & proc, dataset::range & part) {
279
- return proc. template read <DS, Val>(std::ref (ds), part, name);
278
+ [name, &ds](dataset::processor * proc, dataset::range * part) {
279
+ return proc-> template read <DS, Val>(std::ref (ds), * part, name);
280
280
},
281
281
this ->m_processors , this ->m_parts );
282
282
auto lzy = lazy<read_column_t <DS, Val>>(*this , act);
@@ -288,8 +288,8 @@ template <typename Val>
288
288
auto ana::dataflow::constant (Val const &val)
289
289
-> lazy<ana::column::constant<Val>> {
290
290
auto act = lockstep::get_node (
291
- [&val](dataset::processor & proc) {
292
- return proc. template constant <Val>(val);
291
+ [&val](dataset::processor * proc) {
292
+ return proc-> template constant <Val>(val);
293
293
},
294
294
this ->m_processors );
295
295
auto lzy = lazy<column::constant<Val>>(*this , act);
@@ -301,8 +301,8 @@ template <typename Val, typename... Args>
301
301
auto ana::dataflow::constant (Args &&...args)
302
302
-> lazy<ana::column::constant<Val>> {
303
303
auto act = lockstep::get_node (
304
- [args...](dataset::processor & proc) {
305
- return proc. template constant <Val>(std::forward<Args>(args)...);
304
+ [args...](dataset::processor * proc) {
305
+ return proc-> template constant <Val>(std::forward<Args>(args)...);
306
306
},
307
307
this ->m_processors );
308
308
auto lzy = lazy<column::constant<Val>>(*this , act);
@@ -315,17 +315,17 @@ auto ana::dataflow::define(Args &&...args) {
315
315
return delayed<ana::column::template evaluator_t <Def>>(
316
316
*this ,
317
317
lockstep::get_node (
318
- [&args...](dataset::processor & proc) {
319
- return proc. template define <Def>(std::forward<Args>(args)...);
318
+ [&args...](dataset::processor * proc) {
319
+ return proc-> template define <Def>(std::forward<Args>(args)...);
320
320
},
321
321
this ->m_processors ));
322
322
}
323
323
324
324
template <typename F> auto ana::dataflow::define (F const &callable) {
325
325
return delayed<ana::column::template evaluator_t <F>>(
326
326
*this , lockstep::get_node (
327
- [callable](dataset::processor & proc) {
328
- return proc. template define (callable);
327
+ [callable](dataset::processor * proc) {
328
+ return proc-> template define (callable);
329
329
},
330
330
this ->m_processors ));
331
331
}
@@ -369,8 +369,8 @@ auto ana::dataflow::select(const std::string &name, F callable)
369
369
-> delayed<selection::template custom_applicator_t<F>> {
370
370
return delayed<selection::template custom_applicator_t <F>>(
371
371
*this , lockstep::get_node (
372
- [name, callable](dataset::processor & proc) {
373
- return proc. template select <Sel>(nullptr , name, callable);
372
+ [name, callable](dataset::processor * proc) {
373
+ return proc-> template select <Sel>(nullptr , name, callable);
374
374
},
375
375
this ->m_processors ));
376
376
}
@@ -380,8 +380,8 @@ auto ana::dataflow::channel(const std::string &name, F callable)
380
380
-> delayed<selection::template custom_applicator_t<F>> {
381
381
return delayed<selection::template custom_applicator_t <F>>(
382
382
*this , lockstep::get_node (
383
- [name = name , callable = callable ](dataset::processor & proc) {
384
- return proc. template channel <Sel>(nullptr , name, callable);
383
+ [name, callable](dataset::processor * proc) {
384
+ return proc-> template channel <Sel>(nullptr , name, callable);
385
385
},
386
386
this ->m_processors ));
387
387
}
@@ -390,8 +390,8 @@ template <typename Cnt, typename... Args>
390
390
auto ana::dataflow::agg (Args &&...args) -> delayed<aggregation::booker<Cnt>> {
391
391
return delayed<aggregation::booker<Cnt>>(
392
392
*this , lockstep::get_node (
393
- [&args...](dataset::processor & proc) {
394
- return proc. template book <Cnt>(std::forward<Args>(args)...);
393
+ [&args...](dataset::processor * proc) {
394
+ return proc-> template book <Cnt>(std::forward<Args>(args)...);
395
395
},
396
396
this ->m_processors ));
397
397
}
@@ -417,8 +417,8 @@ auto ana::dataflow::select(lazy<selection> const &prev, const std::string &name,
417
417
return delayed<selection::template custom_applicator_t <F>>(
418
418
*this ,
419
419
lockstep::get_node (
420
- [name, callable](dataset::processor & proc, selection const & prev) {
421
- return proc. template select <Sel>(& prev, name, callable);
420
+ [name, callable](dataset::processor * proc, selection const * prev) {
421
+ return proc-> template select <Sel>(prev, name, callable);
422
422
},
423
423
this ->m_processors , prev));
424
424
}
@@ -428,21 +428,21 @@ auto ana::dataflow::channel(lazy<selection> const &prev,
428
428
const std::string &name, F callable)
429
429
-> delayed<selection::template custom_applicator_t<F>> {
430
430
return delayed<selection::template custom_applicator_t <F>>(
431
- *this , lockstep::get_node (
432
- [name = name, callable = callable](dataset::processor &proc,
433
- selection const & prev) {
434
- return proc. template channel <Sel>(& prev, name, callable);
435
- },
436
- this ->m_processors , prev));
431
+ *this ,
432
+ lockstep::get_node (
433
+ [name, callable](dataset::processor *proc, selection const * prev) {
434
+ return proc-> template channel <Sel>(prev, name, callable);
435
+ },
436
+ this ->m_processors , prev));
437
437
}
438
438
439
439
template <typename Def, typename ... Cols>
440
440
auto ana::dataflow::evaluate_column (delayed<column::evaluator<Def>> const &calc,
441
441
lazy<Cols> const &...columns) -> lazy<Def> {
442
442
auto act = lockstep::get_node (
443
- [](dataset::processor & proc, column::evaluator<Def> & calc,
444
- Cols const & ...cols ) {
445
- return proc. template evaluate_column (calc, cols...);
443
+ [](dataset::processor * proc, column::evaluator<Def> * calc,
444
+ Cols const * ...cols ) {
445
+ return proc-> template evaluate_column (* calc, * cols...);
446
446
},
447
447
this ->m_processors , calc, columns...);
448
448
auto lzy = lazy<Def>(*this , act);
@@ -455,9 +455,9 @@ auto ana::dataflow::apply_selection(
455
455
delayed<selection::applicator<Eqn>> const &calc,
456
456
lazy<Cols> const &...columns) -> lazy<selection> {
457
457
auto act = lockstep::get_node (
458
- [](dataset::processor & proc, selection::applicator<Eqn> & calc,
459
- Cols & ...cols ) {
460
- return proc. template apply_selection (calc, cols...);
458
+ [](dataset::processor * proc, selection::applicator<Eqn> * calc,
459
+ Cols * ...cols ) {
460
+ return proc-> template apply_selection (* calc, * cols...);
461
461
},
462
462
this ->m_processors , calc, columns...);
463
463
auto lzy = lazy<selection>(*this , act);
@@ -473,8 +473,8 @@ auto ana::dataflow::select_aggregation(
473
473
// its status
474
474
this ->reset ();
475
475
auto act = lockstep::get_node (
476
- [](dataset::processor & proc, aggregation::booker<Cnt> & bkr,
477
- const selection & sel) { return proc. select_aggregation (bkr, sel); },
476
+ [](dataset::processor * proc, aggregation::booker<Cnt> * bkr,
477
+ const selection * sel) { return proc-> select_aggregation (* bkr, * sel); },
478
478
this ->m_processors , bkr, sel);
479
479
auto lzy = lazy<Cnt>(*this , act);
480
480
this ->add_operation (std::move (act));
@@ -492,10 +492,11 @@ auto ana::dataflow::select_aggregations(
492
492
using delayed_bookkeeper_type = delayed<aggregation::bookkeeper<Cnt>>;
493
493
auto bkpr = delayed_bookkeeper_type (
494
494
*this , lockstep::get_node (
495
- [this ](dataset::processor & proc, aggregation::booker<Cnt> & bkr,
496
- Sels const & ...sels ) {
495
+ [this ](dataset::processor * proc, aggregation::booker<Cnt> * bkr,
496
+ Sels const * ...sels ) {
497
497
// get bookkeeper and aggregations
498
- auto bkpr_and_cntrs = proc.select_aggregations (bkr, sels...);
498
+ auto bkpr_and_cntrs =
499
+ proc->select_aggregations (*bkr, *sels...);
499
500
500
501
// add each aggregation to this dataflow
501
502
for (auto &&cntr : bkpr_and_cntrs.second ) {
@@ -506,8 +507,6 @@ auto ana::dataflow::select_aggregations(
506
507
return std::move (bkpr_and_cntrs.first );
507
508
},
508
509
this ->m_processors , bkr, sels...));
509
- // lockstep::node<aggregation::booker<Cnt>>(bkr), sels...));
510
-
511
510
return bkpr;
512
511
}
513
512
@@ -522,15 +521,15 @@ inline void ana::dataflow::analyze() {
522
521
m_source->initialize ();
523
522
524
523
this ->m_mtcfg .run (
525
- [](dataset::processor & proc, dataset::player & plyr,
526
- const dataset::range & part) { proc. process (plyr, part); },
524
+ [](dataset::processor * proc, dataset::player * plyr,
525
+ const dataset::range * part) { proc-> process (* plyr, * part); },
527
526
this ->m_processors , this ->m_players , this ->m_parts );
528
527
529
528
m_source->finalize ();
530
529
531
530
// clear aggregations so they are not run more than once
532
531
this ->m_processors .call_all_slots (
533
- [](dataset::processor & proc) { proc. clear_aggregations (); });
532
+ [](dataset::processor * proc) { proc-> clear_aggregations (); });
534
533
}
535
534
536
535
inline void ana::dataflow::reset () { m_analyzed = false ; }
0 commit comments