@@ -262,9 +262,9 @@ ana::dataset::reader<DS> ana::dataflow::open(Args &&...args) {
262
262
263
263
// open dataset reader and processor for each thread
264
264
// slot for each partition range
265
- this ->m_players = lockstep::invoke_node (
265
+ this ->m_players = lockstep::get_node (
266
266
[ds](dataset::range &part) { return ds->open_player (part); },
267
- this ->m_parts . get_view () );
267
+ this ->m_parts );
268
268
this ->m_processors = lockstep::node<dataset::processor>(
269
269
m_parts.concurrency (), this ->m_weight / this ->m_norm );
270
270
@@ -274,11 +274,11 @@ ana::dataset::reader<DS> ana::dataflow::open(Args &&...args) {
274
274
template <typename DS, typename Val>
275
275
auto ana::dataflow::read (dataset::input<DS> &ds, const std::string &name)
276
276
-> lazy<read_column_t<DS, Val>> {
277
- auto act = this -> m_processors . get_lockstep_node (
277
+ auto act = lockstep::get_node (
278
278
[name, &ds](dataset::processor &proc, dataset::range &part) {
279
279
return proc.template read <DS, Val>(std::ref (ds), part, name);
280
280
},
281
- this ->m_parts . get_view () );
281
+ this ->m_processors , this -> m_parts );
282
282
auto lzy = lazy<read_column_t <DS, Val>>(*this , act);
283
283
this ->add_operation (std::move (act));
284
284
return lzy;
@@ -287,10 +287,11 @@ auto ana::dataflow::read(dataset::input<DS> &ds, const std::string &name)
287
287
template <typename Val>
288
288
auto ana::dataflow::constant (Val const &val)
289
289
-> lazy<ana::column::constant<Val>> {
290
- auto act =
291
- this -> m_processors . get_lockstep_node ( [&val](dataset::processor &proc) {
290
+ auto act = lockstep::get_node (
291
+ [&val](dataset::processor &proc) {
292
292
return proc.template constant <Val>(val);
293
- });
293
+ },
294
+ this ->m_processors );
294
295
auto lzy = lazy<column::constant<Val>>(*this , act);
295
296
this ->add_operation (std::move (act));
296
297
return lzy;
@@ -299,10 +300,11 @@ auto ana::dataflow::constant(Val const &val)
299
300
template <typename Val, typename ... Args>
300
301
auto ana::dataflow::constant (Args &&...args)
301
302
-> lazy<ana::column::constant<Val>> {
302
- auto act =
303
- this -> m_processors . get_lockstep_node ( [args...](dataset::processor &proc) {
303
+ auto act = lockstep::get_node (
304
+ [args...](dataset::processor &proc) {
304
305
return proc.template constant <Val>(std::forward<Args>(args)...);
305
- });
306
+ },
307
+ this ->m_processors );
306
308
auto lzy = lazy<column::constant<Val>>(*this , act);
307
309
this ->add_operation (std::move (act));
308
310
return lzy;
@@ -312,18 +314,20 @@ template <typename Def, typename... Args>
312
314
auto ana::dataflow::define (Args &&...args) {
313
315
return delayed<ana::column::template evaluator_t <Def>>(
314
316
*this ,
315
- this -> m_processors . get_lockstep_node (
317
+ lockstep::get_node (
316
318
[&args...](dataset::processor &proc) {
317
319
return proc.template define <Def>(std::forward<Args>(args)...);
318
- }));
320
+ },
321
+ this ->m_processors ));
319
322
}
320
323
321
324
template <typename F> auto ana::dataflow::define (F const &callable) {
322
325
return delayed<ana::column::template evaluator_t <F>>(
323
- *this , this -> m_processors . get_lockstep_node (
324
- [callable = callable ](dataset::processor &proc) {
326
+ *this , lockstep::get_node (
327
+ [callable](dataset::processor &proc) {
325
328
return proc.template define (callable);
326
- }));
329
+ },
330
+ this ->m_processors ));
327
331
}
328
332
329
333
inline auto ana::dataflow::filter (const std::string &name) {
@@ -364,29 +368,32 @@ template <typename Sel, typename F>
364
368
auto ana::dataflow::select (const std::string &name, F callable)
365
369
-> delayed<selection::template custom_applicator_t<F>> {
366
370
return delayed<selection::template custom_applicator_t <F>>(
367
- *this , this -> m_processors . get_lockstep_node (
368
- [name = name, callable = callable](dataset::processor &proc) {
371
+ *this , lockstep::get_node (
372
+ [name, callable](dataset::processor &proc) {
369
373
return proc.template select <Sel>(nullptr , name, callable);
370
- }));
374
+ },
375
+ this ->m_processors ));
371
376
}
372
377
373
378
template <typename Sel, typename F>
374
379
auto ana::dataflow::channel (const std::string &name, F callable)
375
380
-> delayed<selection::template custom_applicator_t<F>> {
376
381
return delayed<selection::template custom_applicator_t <F>>(
377
- *this , this -> m_processors . get_lockstep_node (
382
+ *this , lockstep::get_node (
378
383
[name = name, callable = callable](dataset::processor &proc) {
379
384
return proc.template channel <Sel>(nullptr , name, callable);
380
- }));
385
+ },
386
+ this ->m_processors ));
381
387
}
382
388
383
389
template <typename Cnt, typename ... Args>
384
390
auto ana::dataflow::agg (Args &&...args) -> delayed<aggregation::booker<Cnt>> {
385
391
return delayed<aggregation::booker<Cnt>>(
386
- *this , this -> m_processors . get_lockstep_node (
392
+ *this , lockstep::get_node (
387
393
[&args...](dataset::processor &proc) {
388
394
return proc.template book <Cnt>(std::forward<Args>(args)...);
389
- }));
395
+ },
396
+ this ->m_processors ));
390
397
}
391
398
392
399
template <typename Sel>
@@ -408,36 +415,36 @@ auto ana::dataflow::select(lazy<selection> const &prev, const std::string &name,
408
415
F callable)
409
416
-> delayed<selection::template custom_applicator_t<F>> {
410
417
return delayed<selection::template custom_applicator_t <F>>(
411
- *this , this -> m_processors . get_lockstep_node (
412
- [name = name, callable = callable](dataset::processor &proc,
413
- selection const &prev) {
414
- return proc.template select <Sel>(&prev, name, callable);
415
- },
416
- prev));
418
+ *this ,
419
+ lockstep::get_node (
420
+ [name, callable](dataset::processor &proc, selection const &prev) {
421
+ return proc.template select <Sel>(&prev, name, callable);
422
+ },
423
+ this -> m_processors , prev));
417
424
}
418
425
419
426
template <typename Sel, typename F>
420
427
auto ana::dataflow::channel (lazy<selection> const &prev,
421
428
const std::string &name, F callable)
422
429
-> delayed<selection::template custom_applicator_t<F>> {
423
430
return delayed<selection::template custom_applicator_t <F>>(
424
- *this , this -> m_processors . get_lockstep_node (
431
+ *this , lockstep::get_node (
425
432
[name = name, callable = callable](dataset::processor &proc,
426
433
selection const &prev) {
427
434
return proc.template channel <Sel>(&prev, name, callable);
428
435
},
429
- prev));
436
+ this -> m_processors , prev));
430
437
}
431
438
432
439
template <typename Def, typename ... Cols>
433
440
auto ana::dataflow::evaluate_column (delayed<column::evaluator<Def>> const &calc,
434
441
lazy<Cols> const &...columns) -> lazy<Def> {
435
- auto act = this -> m_processors . get_lockstep_node (
442
+ auto act = lockstep::get_node (
436
443
[](dataset::processor &proc, column::evaluator<Def> &calc,
437
444
Cols const &...cols ) {
438
445
return proc.template evaluate_column (calc, cols...);
439
446
},
440
- calc. get_view () , columns...);
447
+ this -> m_processors , calc, columns...);
441
448
auto lzy = lazy<Def>(*this , act);
442
449
this ->add_operation (std::move (act));
443
450
return lzy;
@@ -447,12 +454,12 @@ template <typename Eqn, typename... Cols>
447
454
auto ana::dataflow::apply_selection (
448
455
delayed<selection::applicator<Eqn>> const &calc,
449
456
lazy<Cols> const &...columns) -> lazy<selection> {
450
- auto act = this -> m_processors . get_lockstep_node (
457
+ auto act = lockstep::get_node (
451
458
[](dataset::processor &proc, selection::applicator<Eqn> &calc,
452
459
Cols &...cols ) {
453
460
return proc.template apply_selection (calc, cols...);
454
461
},
455
- calc. get_view () , columns...);
462
+ this -> m_processors , calc, columns...);
456
463
auto lzy = lazy<selection>(*this , act);
457
464
this ->add_operation (std::move (act));
458
465
return lzy;
@@ -465,10 +472,10 @@ auto ana::dataflow::select_aggregation(
465
472
// any time a new aggregation is booked, means the dataflow must run: so reset
466
473
// its status
467
474
this ->reset ();
468
- auto act = this -> m_processors . get_lockstep_node (
475
+ auto act = lockstep::get_node (
469
476
[](dataset::processor &proc, aggregation::booker<Cnt> &bkr,
470
477
const selection &sel) { return proc.select_aggregation (bkr, sel); },
471
- bkr. get_view () , sel);
478
+ this -> m_processors , bkr, sel);
472
479
auto lzy = lazy<Cnt>(*this , act);
473
480
this ->add_operation (std::move (act));
474
481
return lzy;
@@ -484,7 +491,7 @@ auto ana::dataflow::select_aggregations(
484
491
485
492
using delayed_bookkeeper_type = delayed<aggregation::bookkeeper<Cnt>>;
486
493
auto bkpr = delayed_bookkeeper_type (
487
- *this , this -> m_processors . get_lockstep_node (
494
+ *this , lockstep::get_node (
488
495
[this ](dataset::processor &proc, aggregation::booker<Cnt> &bkr,
489
496
Sels const &...sels ) {
490
497
// get bookkeeper and aggregations
@@ -498,7 +505,7 @@ auto ana::dataflow::select_aggregations(
498
505
// take the bookkeeper
499
506
return std::move (bkpr_and_cntrs.first );
500
507
},
501
- bkr. get_view () , sels...));
508
+ this -> m_processors , bkr, sels...));
502
509
// lockstep::node<aggregation::booker<Cnt>>(bkr), sels...));
503
510
504
511
return bkpr;
@@ -514,11 +521,10 @@ inline void ana::dataflow::analyze() {
514
521
515
522
m_source->initialize ();
516
523
517
- this ->m_processors .run_slots (
518
- this ->m_mtcfg ,
524
+ this ->m_mtcfg .run (
519
525
[](dataset::processor &proc, dataset::player &plyr,
520
526
const dataset::range &part) { proc.process (plyr, part); },
521
- this ->m_players . get_view () , this ->m_parts . get_view () );
527
+ this ->m_processors , this -> m_players , this ->m_parts );
522
528
523
529
m_source->finalize ();
524
530
0 commit comments