-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathrpc_context.h
343 lines (276 loc) · 10.6 KB
/
rpc_context.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
#pragma once
#include <string>
#include <boost/type_traits/is_detected.hpp>
#include "yb/rpc/rpc_header.pb.h"
#include "yb/rpc/serialization.h"
#include "yb/rpc/service_if.h"
#include "yb/ash/wait_state.h"
#include "yb/util/memory/arena.h"
#include "yb/util/ref_cnt_buffer.h"
#include "yb/util/status.h"
#include "yb/util/status_fwd.h"
#include "yb/util/monotime.h"
#include "yb/util/net/sockaddr.h"
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace yb {
class Trace;
class WriteBuffer;
namespace rpc {
class YBInboundCall;
class RpcCallParams {
public:
virtual ~RpcCallParams() = default;
virtual Status ParseRequest(Slice param, const RefCntBuffer& buffer) = 0;
virtual AnyMessageConstPtr SerializableResponse() = 0;
};
class RpcCallPBParams : public RpcCallParams {
public:
Status ParseRequest(Slice param, const RefCntBuffer& buffer) override;
AnyMessageConstPtr SerializableResponse() override;
virtual google::protobuf::Message& request() = 0;
virtual google::protobuf::Message& response() = 0;
static google::protobuf::Message* CastMessage(const AnyMessagePtr& msg);
static const google::protobuf::Message* CastMessage(const AnyMessageConstPtr& msg);
};
template <class Req, class Resp>
class RpcCallPBParamsImpl : public RpcCallPBParams {
public:
using RequestType = Req;
using ResponseType = Resp;
RpcCallPBParamsImpl() = default;
Req& request() override {
return req_;
}
Resp& response() override {
return resp_;
}
private:
Req req_;
Resp resp_;
};
class RpcCallLWParams : public RpcCallParams {
public:
Status ParseRequest(Slice param, const RefCntBuffer& buffer) override;
AnyMessageConstPtr SerializableResponse() override;
virtual LightweightMessage& request() = 0;
virtual LightweightMessage& response() = 0;
static LightweightMessage* CastMessage(const AnyMessagePtr& msg);
static const LightweightMessage* CastMessage(const AnyMessageConstPtr& msg);
private:
RefCntBuffer buffer_;
};
template <class Req, class Resp>
class RpcCallLWParamsImpl : public RpcCallLWParams {
public:
using RequestType = Req;
using ResponseType = Resp;
Req& request() override {
return req_;
}
Resp& response() override {
return resp_;
}
RpcCallLWParamsImpl() : req_(&arena_), resp_(&arena_) {}
private:
ThreadSafeArena arena_;
Req req_;
Resp resp_;
};
template <class T>
using MutableErrorDetector = decltype(boost::declval<T&>().mutable_error());
template <bool>
struct ResponseErrorHelper;
template <>
struct ResponseErrorHelper<true> {
template <class T>
static auto Apply(T* t) {
return t->mutable_error();
}
};
template <>
struct ResponseErrorHelper<false> {
template <class T>
static auto Apply(T* t) {
return t->mutable_status();
}
};
template <class T>
auto ResponseError(T* t) {
return ResponseErrorHelper<boost::is_detected_v<MutableErrorDetector, T>>::Apply(t);
}
// The context provided to a generated ServiceIf. This provides
// methods to respond to the RPC. In the future, this will also
// include methods to access information about the caller: e.g
// authentication info, tracing info, and cancellation status.
//
// This is the server-side analogue to the RpcController class.
class RpcContext {
public:
// Create an RpcContext. This is called only from generated code
// and is not a public API.
RpcContext(std::shared_ptr<YBInboundCall> call,
std::shared_ptr<RpcCallParams> params);
explicit RpcContext(std::shared_ptr<LocalYBInboundCall> call);
RpcContext(RpcContext&& rhs) = default;
RpcContext& operator=(RpcContext&& rhs) = default;
RpcContext(const RpcContext&) = delete;
void operator=(const RpcContext&) = delete;
~RpcContext();
explicit operator bool() const {
return call_ != nullptr;
}
// Return the trace buffer for this call.
Trace* trace();
// Ensure that this call has a trace associated with it.
void EnsureTraceCreated();
// Send a response to the call. The service may call this method
// before or after returning from the original handler method,
// and it may call this method from a different thread.
//
// The response should be prepared already in the response PB pointer
// which was passed to the handler method.
//
// After this method returns, this RpcContext object is destroyed. The request
// and response protobufs are also destroyed.
void RespondSuccess();
static void RespondSuccess(InboundCall* call);
// Respond with an error to the client. This sends back an error with the code
// ERROR_APPLICATION. Because there is no more specific error code passed back
// to the client, most applications should create a custom error PB extension
// and use RespondApplicationError(...) below. This method should only be used
// for unexpected errors where the server doesn't expect the client to do any
// more advanced handling.
//
// After this method returns, this RpcContext object is destroyed. The request
// and response protobufs are also destroyed.
void RespondFailure(const Status &status);
// Respond with an RPC-level error. This typically manifests to the client as
// a remote error, one whose handling is agnostic to the particulars of the
// sent RPC. For example, ERROR_SERVER_TOO_BUSY usually causes the client to
// retry the RPC at a later time.
//
// After this method returns, this RpcContext object is destroyed. The request
// and response protobufs are also destroyed.
void RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status);
// Respond with an application-level error. This causes the caller to get a
// RemoteError status with the provided string message. Additionally, a
// service-specific error extension is passed back to the client. The
// extension must be registered with the ErrorStatusPB protobuf. For
// example:
//
// message MyServiceError {
// extend yb.rpc.ErrorStatusPB {
// optional MyServiceError my_service_error_ext = 101;
// }
// // Add any extra fields or status codes you want to pass back to
// // the client here.
// required string extra_error_data = 1;
// }
//
// NOTE: the numeric '101' above must be an integer greater than 101
// and must be unique across your code base.
//
// Given the above definition in your service protobuf file, you would
// use this method like:
//
// MyServiceError err;
// err.set_extra_error_data("foo bar");
// ctx->RespondApplicationError(MyServiceError::my_service_error_ext.number(),
// "Some error occurred", err);
//
// The client side may then retreieve the error by calling:
// const MyServiceError& err_details =
// controller->error_response()->GetExtension(MyServiceError::my_service_error_ext);
//
// After this method returns, this RpcContext object is destroyed. The request
// and response protobufs are also destroyed.
void RespondApplicationError(int error_ext_id, const std::string& message,
const google::protobuf::Message& app_error_pb);
Sidecars& sidecars();
// Return the remote endpoint which sent the current RPC call.
const Endpoint& remote_address() const;
// Return the local endpoint which received the current RPC call.
const Endpoint& local_address() const;
// A string identifying the requestor -- both the user info and the IP address.
// Suitable for use in log messages.
std::string requestor_string() const;
// Return an upper bound on the client timeout deadline. This does not
// account for transmission delays between the client and the server.
// If the client did not specify a deadline, returns MonoTime::Max().
CoarseTimePoint GetClientDeadline() const;
MonoTime ReceiveTime() const;
RpcCallParams& params() {
return *params_;
}
const std::shared_ptr<RpcCallParams>& shared_params() const {
return params_;
}
// Panic the server. This logs a fatal error with the given message, and
// also includes the current RPC request, requestor, trace information, etc,
// to make it easier to debug.
//
// Call this via the PANIC_RPC() macro.
void Panic(const char* filepath, int line_number, const std::string& message)
__attribute__((noreturn));
// Returns true if the call has been responded.
bool responded() const { return responded_; }
// Closes connection that received this request.
void CloseConnection();
void ListenConnectionShutdown(const std::function<void()>& listener);
std::string ToString() const;
const ash::WaitStateInfoPtr& wait_state() const;
Result<RefCntSlice> ExtractSidecar(size_t idx) const;
private:
std::shared_ptr<YBInboundCall> call_;
std::shared_ptr<RpcCallParams> params_;
bool responded_ = false;
};
void PanicRpc(RpcContext* context, const char* file, int line_number, const std::string& message);
#define PANIC_RPC(rpc_context, message) \
do { \
yb::rpc::PanicRpc((rpc_context), __FILE__, __LINE__, (message)); \
} while (false)
inline std::string RequestorString(yb::rpc::RpcContext* rpc) {
if (rpc) {
return rpc->requestor_string();
} else {
return "internal request";
}
}
} // namespace rpc
} // namespace yb