16
16
17
17
#include < grpc/event_engine/endpoint_config.h>
18
18
#include < grpc/event_engine/extensible.h>
19
+ #include < grpc/event_engine/internal/write_event.h>
19
20
#include < grpc/event_engine/memory_allocator.h>
20
21
#include < grpc/event_engine/port.h>
21
22
#include < grpc/event_engine/slice_buffer.h>
22
23
#include < grpc/support/port_platform.h>
23
24
25
+ #include < bitset>
26
+ #include < initializer_list>
24
27
#include < vector>
25
28
26
29
#include " absl/functional/any_invocable.h"
@@ -180,13 +183,26 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
180
183
// / EventEngine Endpoint Read API call.
181
184
// /
182
185
// / Passed as argument to an Endpoint \a Read
183
- struct ReadArgs {
186
+ class ReadArgs final {
187
+ public:
188
+ ReadArgs () = default ;
189
+ ReadArgs (const ReadArgs&) = delete ;
190
+ ReadArgs& operator =(const ReadArgs&) = delete ;
191
+ ReadArgs (ReadArgs&&) = default ;
192
+ ReadArgs& operator =(ReadArgs&&) = default ;
193
+
184
194
// A suggestion to the endpoint implementation to read at-least the
185
195
// specified number of bytes over the network connection before marking
186
196
// the endpoint read operation as complete. gRPC may use this argument
187
197
// to minimize the number of endpoint read API calls over the lifetime
188
198
// of a connection.
189
- int64_t read_hint_bytes;
199
+ void set_read_hint_bytes (int64_t read_hint_bytes) {
200
+ read_hint_bytes_ = read_hint_bytes;
201
+ }
202
+ int64_t read_hint_bytes () const { return read_hint_bytes_; }
203
+
204
+ private:
205
+ int64_t read_hint_bytes_ = 1 ;
190
206
};
191
207
// / Reads data from the Endpoint.
192
208
// /
@@ -212,20 +228,110 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
212
228
// / statuses to \a on_read. For example, callbacks might expect to receive
213
229
// / CANCELLED on endpoint shutdown.
214
230
virtual bool Read (absl::AnyInvocable<void (absl::Status)> on_read,
215
- SliceBuffer* buffer, const ReadArgs* args) = 0;
231
+ SliceBuffer* buffer, ReadArgs args) = 0;
232
+ // // The set of write events that can be reported by an Endpoint.
233
+ using WriteEvent = ::grpc_event_engine::experimental::internal::WriteEvent;
234
+ // / An output WriteMetric consists of a key and a value.
235
+ // / The space of keys can be queried from the endpoint via the
236
+ // / \a AllWriteMetrics, \a GetMetricName and \a GetMetricKey APIs.
237
+ // / The value is an int64_t that is implementation-defined. Check with the
238
+ // / endpoint implementation documentation for the semantics of each metric.
239
+ struct WriteMetric {
240
+ size_t key;
241
+ int64_t value;
242
+ };
243
+ using WriteEventCallback = absl::AnyInvocable<void (
244
+ WriteEvent, absl::Time, std::vector<WriteMetric>) const >;
245
+ // A bitmask of the events that the caller is interested in.
246
+ // Each bit corresponds to an entry in WriteEvent.
247
+ using WriteEventSet = std::bitset<static_cast <int >(WriteEvent::kCount )>;
248
+ // A sink to receive write events.
249
+ // The requested metrics are the keys of the metrics that the caller is
250
+ // interested in. The on_event callback will be called on each event
251
+ // requested.
252
+ class WriteEventSink final {
253
+ public:
254
+ WriteEventSink (absl::Span<const size_t > requested_metrics,
255
+ std::initializer_list<WriteEvent> requested_events,
256
+ WriteEventCallback on_event)
257
+ : requested_metrics_(requested_metrics),
258
+ on_event_ (std::move(on_event)) {
259
+ for (auto event : requested_events) {
260
+ requested_events_mask_.set (static_cast <int >(event));
261
+ }
262
+ }
263
+
264
+ absl::Span<const size_t > requested_metrics () const {
265
+ return requested_metrics_;
266
+ }
267
+
268
+ bool requested_event (WriteEvent event) const {
269
+ return requested_events_mask_.test (static_cast <int >(event));
270
+ }
271
+
272
+ WriteEventSet requested_events_mask () const {
273
+ return requested_events_mask_;
274
+ }
275
+
276
+ WriteEventCallback TakeEventCallback () { return std::move (on_event_); }
277
+
278
+ private:
279
+ absl::Span<const size_t > requested_metrics_;
280
+ WriteEventSet requested_events_mask_;
281
+ // The callback to be called on each event.
282
+ WriteEventCallback on_event_;
283
+ };
216
284
// / A struct representing optional arguments that may be provided to an
217
285
// / EventEngine Endpoint Write API call.
218
286
// /
219
287
// / Passed as argument to an Endpoint \a Write
220
- struct WriteArgs {
288
+ class WriteArgs final {
289
+ public:
290
+ WriteArgs () = default ;
291
+ WriteArgs (const WriteArgs&) = delete ;
292
+ WriteArgs& operator =(const WriteArgs&) = delete ;
293
+ WriteArgs (WriteArgs&&) = default ;
294
+ WriteArgs& operator =(WriteArgs&&) = default ;
295
+
296
+ // A sink to receive write events.
297
+ std::optional<WriteEventSink> TakeMetricsSink () {
298
+ auto sink = std::move (metrics_sink_);
299
+ metrics_sink_.reset ();
300
+ return sink;
301
+ }
302
+
303
+ bool has_metrics_sink () const { return metrics_sink_.has_value (); }
304
+
305
+ void set_metrics_sink (WriteEventSink sink) {
306
+ metrics_sink_ = std::move (sink);
307
+ }
308
+
221
309
// Represents private information that may be passed by gRPC for
222
310
// select endpoints expected to be used only within google.
223
- void * google_specific = nullptr ;
311
+ // TODO(ctiller): Remove this method once all callers are migrated to
312
+ // metrics sink.
313
+ void * GetDeprecatedAndDiscouragedGoogleSpecificPointer () {
314
+ return google_specific_;
315
+ }
316
+
317
+ void SetDeprecatedAndDiscouragedGoogleSpecificPointer (void * pointer) {
318
+ google_specific_ = pointer;
319
+ }
320
+
224
321
// A suggestion to the endpoint implementation to group data to be written
225
322
// into frames of the specified max_frame_size. gRPC may use this
226
323
// argument to dynamically control the max sizes of frames sent to a
227
324
// receiver in response to high receiver memory pressure.
228
- int64_t max_frame_size;
325
+ int64_t max_frame_size () const { return max_frame_size_; }
326
+
327
+ void set_max_frame_size (int64_t max_frame_size) {
328
+ max_frame_size_ = max_frame_size;
329
+ }
330
+
331
+ private:
332
+ std::optional<WriteEventSink> metrics_sink_;
333
+ void * google_specific_ = nullptr ;
334
+ int64_t max_frame_size_ = 1024 * 1024 ;
229
335
};
230
336
// / Writes data out on the connection.
231
337
// /
@@ -248,11 +354,22 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
248
354
// / statuses to \a on_writable. For example, callbacks might expect to
249
355
// / receive CANCELLED on endpoint shutdown.
250
356
virtual bool Write (absl::AnyInvocable<void (absl::Status)> on_writable,
251
- SliceBuffer* data, const WriteArgs* args) = 0;
357
+ SliceBuffer* data, WriteArgs args) = 0;
252
358
// / Returns an address in the format described in DNSResolver. The returned
253
359
// / values are expected to remain valid for the life of the Endpoint.
254
360
virtual const ResolvedAddress& GetPeerAddress () const = 0;
255
361
virtual const ResolvedAddress& GetLocalAddress () const = 0;
362
+ // / Returns the list of write metrics that the endpoint supports.
363
+ // / The keys are used to identify the metrics in the GetMetricName and
364
+ // / GetMetricKey APIs. The current value of the metric can be queried by
365
+ // / adding a WriteEventSink to the WriteArgs of a Write call.
366
+ virtual std::vector<size_t > AllWriteMetrics () = 0;
367
+ // / Returns the name of the write metric with the given key.
368
+ // / If the key is not found, returns std::nullopt.
369
+ virtual std::optional<absl::string_view> GetMetricName (size_t key) = 0;
370
+ // / Returns the key of the write metric with the given name.
371
+ // / If the name is not found, returns std::nullopt.
372
+ virtual std::optional<size_t > GetMetricKey (absl::string_view name) = 0;
256
373
};
257
374
258
375
// / Called when a new connection is established.
@@ -334,7 +451,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
334
451
// / when the object is destroyed and all pending callbacks will be called
335
452
// / shortly. If cancellation races with request completion, implementations
336
453
// / may choose to either cancel or satisfy the request.
337
- class DNSResolver {
454
+ class DNSResolver : public Extensible {
338
455
public:
339
456
// / Optional configuration for DNSResolvers.
340
457
struct ResolverOptions {
0 commit comments