@@ -71,7 +71,7 @@ class RPCBase {
71
71
// Id - The function's unique identifier.
72
72
// ErrorReturn - The return type for blocking calls.
73
73
// readResult - Deserialize a result from a channel.
74
- // abandon - Abandon a promised (asynchronous) result.
74
+ // abandon - Abandon a promised result.
75
75
// respond - Retun a result on the channel.
76
76
template <typename FunctionIdT, FunctionIdT FuncId, typename FnT>
77
77
class FunctionHelper {};
@@ -109,6 +109,10 @@ class RPCBase {
109
109
inconvertibleErrorCode ()));
110
110
}
111
111
112
+ static void consumeAbandoned (std::future<ErrorReturn> &P) {
113
+ consumeError (P.get ().takeError ());
114
+ }
115
+
112
116
template <typename ChannelT, typename SequenceNumberT>
113
117
static Error respond (ChannelT &C, SequenceNumberT SeqNo,
114
118
ErrorReturn &Result) {
@@ -153,6 +157,10 @@ class RPCBase {
153
157
inconvertibleErrorCode ()));
154
158
}
155
159
160
+ static void consumeAbandoned (std::future<ErrorReturn> &P) {
161
+ consumeError (P.get ());
162
+ }
163
+
156
164
template <typename ChannelT, typename SequenceNumberT>
157
165
static Error respond (ChannelT &C, SequenceNumberT SeqNo,
158
166
ErrorReturn &Result) {
@@ -366,28 +374,25 @@ class RPC : public RPCBase {
366
374
template <FunctionIdT FuncId, typename FnT>
367
375
using Function = FunctionHelper<FunctionIdT, FuncId, FnT>;
368
376
369
- // / Return type for asynchronous call primitives.
377
+ // / Return type for non-blocking call primitives.
370
378
template <typename Func>
371
- using AsyncCallResult = std::future<typename Func::ErrorReturn>;
379
+ using NonBlockingCallResult = std::future<typename Func::ErrorReturn>;
372
380
373
- // / Return type for asynchronous call-with-seq primitives.
381
+ // / Return type for non-blocking call-with-seq primitives.
374
382
template <typename Func>
375
- using AsyncCallWithSeqResult =
376
- std::pair<AsyncCallResult <Func>, SequenceNumberT>;
383
+ using NonBlockingCallWithSeqResult =
384
+ std::pair<NonBlockingCallResult <Func>, SequenceNumberT>;
377
385
378
- // / Serialize Args... to channel C, but do not call C.send().
379
- // /
380
- // / Returns an error (on serialization failure) or a pair of:
381
- // / (1) A future Expected<T> (or future<Error> for void functions), and
382
- // / (2) A sequence number.
386
+ // / Call Func on Channel C. Does not block, does not call send. Returns a pair
387
+ // / of a future result and the sequence number assigned to the result.
383
388
// /
384
389
// / This utility function is primarily used for single-threaded mode support,
385
390
// / where the sequence number can be used to wait for the corresponding
386
- // / result. In multi-threaded mode the appendCallAsync method, which does not
391
+ // / result. In multi-threaded mode the appendCallNB method, which does not
387
392
// / return the sequence numeber, should be preferred.
388
393
template <typename Func, typename ... ArgTs>
389
- Expected<AsyncCallWithSeqResult <Func>>
390
- appendCallAsyncWithSeq (ChannelT &C, const ArgTs &... Args) {
394
+ Expected<NonBlockingCallWithSeqResult <Func>>
395
+ appendCallNBWithSeq (ChannelT &C, const ArgTs &... Args) {
391
396
auto SeqNo = SequenceNumberMgr.getSequenceNumber ();
392
397
std::promise<typename Func::ErrorReturn> Promise;
393
398
auto Result = Promise.get_future ();
@@ -397,21 +402,23 @@ class RPC : public RPCBase {
397
402
if (auto Err = CallHelper<ChannelT, SequenceNumberT, Func>::call (C, SeqNo,
398
403
Args...)) {
399
404
abandonOutstandingResults ();
405
+ Func::consumeAbandoned (Result);
400
406
return std::move (Err);
401
407
} else
402
- return AsyncCallWithSeqResult <Func>(std::move (Result), SeqNo);
408
+ return NonBlockingCallWithSeqResult <Func>(std::move (Result), SeqNo);
403
409
}
404
410
405
- // / The same as appendCallAsyncWithSeq , except that it calls C.send() to
411
+ // / The same as appendCallNBWithSeq , except that it calls C.send() to
406
412
// / flush the channel after serializing the call.
407
413
template <typename Func, typename ... ArgTs>
408
- Expected<AsyncCallWithSeqResult <Func>>
409
- callAsyncWithSeq (ChannelT &C, const ArgTs &... Args) {
410
- auto Result = appendCallAsyncWithSeq <Func>(C, Args...);
414
+ Expected<NonBlockingCallWithSeqResult <Func>>
415
+ callNBWithSeq (ChannelT &C, const ArgTs &... Args) {
416
+ auto Result = appendCallNBWithSeq <Func>(C, Args...);
411
417
if (!Result)
412
418
return Result;
413
419
if (auto Err = C.send ()) {
414
420
abandonOutstandingResults ();
421
+ Func::consumeAbandoned (Result->first );
415
422
return std::move (Err);
416
423
}
417
424
return Result;
@@ -421,30 +428,54 @@ class RPC : public RPCBase {
421
428
// / Returns an error if serialization fails, otherwise returns a
422
429
// / std::future<Expected<T>> (or a future<Error> for void functions).
423
430
template <typename Func, typename ... ArgTs>
424
- Expected<AsyncCallResult <Func>> appendCallAsync (ChannelT &C,
425
- const ArgTs &... Args) {
426
- auto ResAndSeqOrErr = appendCallAsyncWithSeq <Func>(C, Args...);
427
- if (ResAndSeqOrErr )
428
- return std::move (ResAndSeqOrErr ->first );
429
- return ResAndSeqOrErr .getError ();
431
+ Expected<NonBlockingCallResult <Func>> appendCallNB (ChannelT &C,
432
+ const ArgTs &... Args) {
433
+ auto FutureResAndSeqOrErr = appendCallNBWithSeq <Func>(C, Args...);
434
+ if (FutureResAndSeqOrErr )
435
+ return std::move (FutureResAndSeqOrErr ->first );
436
+ return FutureResAndSeqOrErr .getError ();
430
437
}
431
438
432
- // / The same as appendCallAsync , except that it calls C.send to flush the
439
+ // / The same as appendCallNB , except that it calls C.send to flush the
433
440
// / channel after serializing the call.
434
441
template <typename Func, typename ... ArgTs>
435
- Expected<AsyncCallResult<Func>> callAsync (ChannelT &C,
436
- const ArgTs &... Args) {
437
- auto ResAndSeqOrErr = callAsyncWithSeq<Func>(C, Args...);
438
- if (ResAndSeqOrErr)
439
- return std::move (ResAndSeqOrErr->first );
440
- return ResAndSeqOrErr.getError ();
442
+ Expected<NonBlockingCallResult<Func>> callNB (ChannelT &C,
443
+ const ArgTs &... Args) {
444
+ auto FutureResAndSeqOrErr = callNBWithSeq<Func>(C, Args...);
445
+ if (FutureResAndSeqOrErr)
446
+ return std::move (FutureResAndSeqOrErr->first );
447
+ return FutureResAndSeqOrErr.getError ();
448
+ }
449
+
450
+ // / Call Func on Channel C. Blocks waiting for a result. Returns an Error
451
+ // / for void functions or an Expected<T> for functions returning a T.
452
+ // /
453
+ // / This function is for use in threaded code where another thread is
454
+ // / handling responses and incoming calls.
455
+ template <typename Func, typename ... ArgTs>
456
+ typename Func::ErrorReturn callB (ChannelT &C, const ArgTs &... Args) {
457
+ if (auto FutureResOrErr = callNBWithSeq (C, Args...)) {
458
+ if (auto Err = C.send ()) {
459
+ abandonOutstandingResults ();
460
+ Func::consumeAbandoned (*FutureResOrErr);
461
+ return std::move (Err);
462
+ }
463
+ return FutureResOrErr->get ();
464
+ } else
465
+ return FutureResOrErr.takeError ();
441
466
}
442
467
443
- // / This can be used in single-threaded mode.
468
+ // / Call Func on Channel C. Block waiting for a result. While blocked, run
469
+ // / HandleOther to handle incoming calls (Response calls will be handled
470
+ // / implicitly before calling HandleOther). Returns an Error for void
471
+ // / functions or an Expected<T> for functions returning a T.
472
+ // /
473
+ // / This function is for use in single threaded mode when the calling thread
474
+ // / must act as both sender and receiver.
444
475
template <typename Func, typename HandleFtor, typename ... ArgTs>
445
476
typename Func::ErrorReturn
446
477
callSTHandling (ChannelT &C, HandleFtor &HandleOther, const ArgTs &... Args) {
447
- if (auto ResultAndSeqNoOrErr = callAsyncWithSeq <Func>(C, Args...)) {
478
+ if (auto ResultAndSeqNoOrErr = callNBWithSeq <Func>(C, Args...)) {
448
479
auto &ResultAndSeqNo = *ResultAndSeqNoOrErr;
449
480
if (auto Err = waitForResult (C, ResultAndSeqNo.second , HandleOther))
450
481
return std::move (Err);
@@ -453,7 +484,8 @@ class RPC : public RPCBase {
453
484
return ResultAndSeqNoOrErr.takeError ();
454
485
}
455
486
456
- // This can be used in single-threaded mode.
487
+ // / Call Func on Channel C. Block waiting for a result. Returns an Error for
488
+ // / void functions or an Expected<T> for functions returning a T.
457
489
template <typename Func, typename ... ArgTs>
458
490
typename Func::ErrorReturn callST (ChannelT &C, const ArgTs &... Args) {
459
491
return callSTHandling<Func>(C, handleNone, Args...);
0 commit comments