Skip to content

Commit 3fda289

Browse files
authored
Add comments and readme file for eventstream module (#700)
1 parent 25ddf5e commit 3fda289

File tree

2 files changed

+226
-2
lines changed

2 files changed

+226
-2
lines changed

eventstream_rpc/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Eventstream RPC
2+
3+
This module implements client for eventstream RPC. The eventstream RPC client provides a base for defining custom RPC
4+
operations and implements interaction with RPC server.
5+
6+
## Client Connection
7+
8+
`ClientConnection` class represents the connection to the RPC server. It's responsible for connection lifetime and
9+
can send connection-level messages (e.g. sending PING or opening a new stream).
10+
11+
## Client Stream/Continuation
12+
13+
Application-level messages are supposed to be sent over streams. `ClientConnection::NewStream` creates a new stream which is represented
14+
by the `ClientContinuation` class.
15+
The new stream must be activated with `ClientContinuation::Activate` method. This method initiates sending messages over the wire.
16+
17+
Application-level messages can be sent using `ClientContinuation::SendMessage` method.
18+
19+
## Client Operation
20+
21+
Based upon a client continuation, this entity represents an abstract operation that the client can perform. Concrete operations
22+
should be implemented as classes inherited from the `ClientOperation` class.
23+
24+
Each operation has input data (referred as **Request**) and output data (referred as **Response**).
25+
Requests are sent by the `Activate` method.
26+
27+
### Example of a Client Operation
28+
29+
[DeleteThingShadowOperation](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html)
30+
defines an RPC operation for deleting a thing shadow.
31+
32+
[DeleteThingShadowOperation::Activate](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a7aca4de69329780dfa9ff17315e68b23)
33+
takes [DeleteThingShadowRequest](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_request.html)
34+
instance as an argument and sends it to the server.
35+
36+
After the request is done, you can use [DeleteThingShadowOperation::GetResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a261fa97489a6e2015e11f7c25aa13ee9)
37+
to get an instance of the [DeleteThingShadowResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html)
38+
class, an auxiliary wrapper class returning a response in case of success or an error in case of failure. To obtain
39+
an actual response, use [DeleteThingShadowResult::GetOperationResponse](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html#a6f4c4987311e016d20cf8e82808181fd)
40+
method.

eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h

Lines changed: 186 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ namespace Aws
223223
Crt::String StatusToString();
224224
};
225225

226+
/**
227+
* Handler interface for connection lifecycle events.
228+
*/
226229
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
227230
{
228231
public:
@@ -255,7 +258,9 @@ namespace Aws
255258
const Crt::Optional<Crt::ByteBuf> &payload);
256259
};
257260

258-
/* User data passed to callbacks for a new stream. */
261+
/**
262+
* User data passed to callbacks for a new stream.
263+
*/
259264
class AWS_EVENTSTREAMRPC_API ContinuationCallbackData
260265
{
261266
public:
@@ -273,6 +278,9 @@ namespace Aws
273278
Crt::Allocator *allocator;
274279
};
275280

281+
/**
282+
* Handler interface for continuation events.
283+
*/
276284
class AWS_EVENTSTREAMRPC_API ClientContinuationHandler
277285
{
278286
public:
@@ -299,23 +307,63 @@ namespace Aws
299307
ContinuationCallbackData *m_callbackData;
300308
};
301309

310+
/**
311+
* A wrapper for event-stream-rpc client continuation.
312+
*/
302313
class AWS_EVENTSTREAMRPC_API ClientContinuation final
303314
{
304315
public:
316+
/**
317+
* Create a new continuation.
318+
*
319+
* @note continuation_option's callbacks will not be invoked, and nothing will be sent across
320+
* the wire until Activate() is invoked.
321+
* @param connection Connection on which open a new stream.
322+
* @param continuationHandler A set of callbacks that will be invoked for continuation events.
323+
* @param allocator Allocator to use.
324+
*/
305325
ClientContinuation(
306326
ClientConnection *connection,
307327
ClientContinuationHandler &continuationHandler,
308328
Crt::Allocator *allocator) noexcept;
309329
~ClientContinuation() noexcept;
330+
331+
/**
332+
* Initiate a new client stream. Send new message for the new stream.
333+
* @param operation Name for the operation to be invoked by the peer endpoint.
334+
* @param headers Headers for the eventstream message.
335+
* @param payload Payload for the eventstream message.
336+
* @param messageType Message type for the message.
337+
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
338+
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
339+
* transport.
340+
* @return Future that will be resolved when the message has either been written to the wire or it fails.
341+
*/
310342
std::future<RpcError> Activate(
311343
const Crt::String &operation,
312344
const Crt::List<EventStreamHeader> &headers,
313345
const Crt::Optional<Crt::ByteBuf> &payload,
314346
MessageType messageType,
315347
uint32_t messageFlags,
316348
OnMessageFlushCallback onMessageFlushCallback) noexcept;
349+
350+
/**
351+
* Check if the continuation has been closed.
352+
* @return True if the continuation has been closed, false otherwise.
353+
*/
317354
bool IsClosed() noexcept;
318355
void Release() noexcept;
356+
357+
/**
358+
* Send message on the continuation.
359+
* @param headers List of additional event stream headers to include on the message.
360+
* @param payload Message payload.
361+
* @param messageType Message type for the message.
362+
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
363+
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
364+
* transport.
365+
* @return Future that will be resolved when the message has either been written to the wire or it fails.
366+
*/
319367
std::future<RpcError> SendMessage(
320368
const Crt::List<EventStreamHeader> &headers,
321369
const Crt::Optional<Crt::ByteBuf> &payload,
@@ -329,6 +377,7 @@ namespace Aws
329377
ClientContinuationHandler &m_continuationHandler;
330378
struct aws_event_stream_rpc_client_continuation_token *m_continuationToken;
331379
ContinuationCallbackData *m_callbackData;
380+
332381
static void s_onContinuationMessage(
333382
struct aws_event_stream_rpc_client_continuation_token *continuationToken,
334383
const struct aws_event_stream_rpc_message_args *messageArgs,
@@ -338,6 +387,9 @@ namespace Aws
338387
void *userData) noexcept;
339388
};
340389

390+
/**
391+
* Base class for types used by operations.
392+
*/
341393
class AWS_EVENTSTREAMRPC_API AbstractShapeBase
342394
{
343395
public:
@@ -351,6 +403,9 @@ namespace Aws
351403
Crt::Allocator *m_allocator;
352404
};
353405

406+
/**
407+
* Base class for errors used by operations.
408+
*/
354409
class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase
355410
{
356411
public:
@@ -394,6 +449,9 @@ namespace Aws
394449
RPC_ERROR
395450
};
396451

452+
/**
453+
* A wrapper for operation result.
454+
*/
397455
class AWS_EVENTSTREAMRPC_API TaggedResult
398456
{
399457
public:
@@ -410,9 +468,28 @@ namespace Aws
410468
*/
411469
operator bool() const noexcept;
412470

471+
/**
472+
* Get operation result.
473+
* @return A pointer to the resulting object in case of success, nullptr otherwise.
474+
*/
413475
AbstractShapeBase *GetOperationResponse() const noexcept;
476+
477+
/**
478+
* Get error for a failed operation.
479+
* @return A pointer to the error object in case of failure, nullptr otherwise.
480+
*/
414481
OperationError *GetOperationError() const noexcept;
482+
483+
/**
484+
* Get RPC-level error.
485+
* @return A pointer to the error object in case of RPC-level failure, nullptr otherwise.
486+
*/
415487
RpcError GetRpcError() const noexcept;
488+
489+
/**
490+
* Get the type of the result with which the operation has completed.
491+
* @return Result type.
492+
*/
416493
ResultType GetResultType() const noexcept { return m_responseType; }
417494

418495
private:
@@ -463,20 +540,67 @@ namespace Aws
463540
Crt::Allocator *allocator) const noexcept = 0;
464541
};
465542

543+
/**
544+
* All generated model types implement this interface, including errors.
545+
*/
466546
class AWS_EVENTSTREAMRPC_API OperationModelContext
467547
{
468548
public:
469549
OperationModelContext(const ServiceModel &serviceModel) noexcept;
550+
551+
/**
552+
* Parse the given string into an initial response object.
553+
* @param stringView String to parse the response from.
554+
* @param allocator Allocator to use.
555+
* @return The initial response object.
556+
*/
470557
virtual Crt::ScopedResource<AbstractShapeBase> AllocateInitialResponseFromPayload(
471558
Crt::StringView stringView,
472559
Crt::Allocator *allocator) const noexcept = 0;
560+
561+
/**
562+
* Parse the given string into a streaming response object.
563+
* @param stringView String to parse the response from.
564+
* @param allocator Allocator to use.
565+
* @return The streaming response object.
566+
*/
473567
virtual Crt::ScopedResource<AbstractShapeBase> AllocateStreamingResponseFromPayload(
474568
Crt::StringView stringView,
475569
Crt::Allocator *allocator) const noexcept = 0;
570+
571+
/**
572+
* Get the initial response type name.
573+
* @return The initial response type name.
574+
*/
476575
virtual Crt::String GetInitialResponseModelName() const noexcept = 0;
576+
577+
/**
578+
* Get the request type name.
579+
* @return The request type name.
580+
*/
477581
virtual Crt::String GetRequestModelName() const noexcept = 0;
582+
583+
/**
584+
* Get the streaming response type name.
585+
* @return The streaming response type name.
586+
*/
478587
virtual Crt::Optional<Crt::String> GetStreamingResponseModelName() const noexcept = 0;
588+
589+
/**
590+
* Returns the canonical operation name associated with this context across any client language.
591+
* Namespace included.
592+
* Example: aws.greengrass#SubscribeToTopic
593+
* @return The canonical operation name associated with this context across any client language.
594+
*/
479595
virtual Crt::String GetOperationName() const noexcept = 0;
596+
597+
/**
598+
* Parse the given string into an operation error.
599+
* @param errorModelName The model name.
600+
* @param stringView String to parse the error from.
601+
* @param allocator Allocator to use.
602+
* @return The operation error.
603+
*/
480604
Crt::ScopedResource<OperationError> AllocateOperationErrorFromPayload(
481605
const Crt::String &errorModelName,
482606
Crt::StringView stringView,
@@ -489,6 +613,9 @@ namespace Aws
489613
const ServiceModel &m_serviceModel;
490614
};
491615

616+
/**
617+
* Interface for an RPC operation.
618+
*/
492619
class AWS_EVENTSTREAMRPC_API ClientOperation : public ClientContinuationHandler
493620
{
494621
public:
@@ -504,18 +631,48 @@ namespace Aws
504631
bool operator=(const ClientOperation &clientOperation) noexcept = delete;
505632
bool operator=(ClientOperation &&clientOperation) noexcept = delete;
506633

634+
/**
635+
* Close the stream on which operation is sent.
636+
* @note This function sends a message with the message flag set to terminate the stream.
637+
* @param onMessageFlushCallback Callback to invoke when the closing message is flushed to the underlying
638+
* transport.
639+
* @return Future which will be resolved once the message is sent.
640+
*/
507641
std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;
642+
643+
/**
644+
* Get an operation result.
645+
* @return Future which will be resolved when the corresponding RPC request completes.
646+
*/
508647
std::future<TaggedResult> GetOperationResult() noexcept;
648+
649+
/**
650+
* Set the launch mode for executing operations. The mode is set to std::launch::deferred by default.
651+
* @param mode The launch mode to use.
652+
*/
509653
void WithLaunchMode(std::launch mode) noexcept;
510654

511655
protected:
656+
/**
657+
* Initiate a new client stream. Send the shape for the new stream.
658+
* @param shape A parameter for RPC operation.
659+
* @param onMessageFlushCallback Callback to invoke when the shape is flushed to the underlying transport.
660+
* @return Future which will be resolved once the message is sent.
661+
*/
512662
std::future<RpcError> Activate(
513663
const AbstractShapeBase *shape,
514664
OnMessageFlushCallback onMessageFlushCallback) noexcept;
515665
std::future<RpcError> SendStreamEvent(
516666
AbstractShapeBase *shape,
517667
OnMessageFlushCallback onMessageFlushCallback) noexcept;
668+
669+
/**
670+
* Returns the canonical model name associated with this operation across any client language.
671+
* Namespace included.
672+
* @return The model name.
673+
*/
518674
virtual Crt::String GetModelName() const noexcept = 0;
675+
519676
const OperationModelContext &m_operationModelContext;
520677
std::launch m_asyncLaunchMode;
521678

@@ -566,6 +723,9 @@ namespace Aws
566723
std::condition_variable m_closeReady;
567724
};
568725

726+
/**
727+
* Class representing a connection to an RPC server.
728+
*/
569729
class AWS_EVENTSTREAMRPC_API ClientConnection final
570730
{
571731
public:
@@ -576,6 +736,13 @@ namespace Aws
576736
ClientConnection(ClientConnection &&) noexcept;
577737
ClientConnection &operator=(ClientConnection &&) noexcept;
578738

739+
/**
740+
* Initiates a new outgoing event-stream-rpc connection.
741+
* @param connectionOptions Connection options.
742+
* @param connectionLifecycleHandler Handler to process connection lifecycle events.
743+
* @param clientBootstrap ClientBootstrap object to run the connection on.
744+
* @return Future that will be resolved when connection either succeeds or fails.
745+
*/
579746
std::future<RpcError> Connect(
580747
const ConnectionConfig &connectionOptions,
581748
ConnectionLifecycleHandler *connectionLifecycleHandler,
@@ -591,10 +758,23 @@ namespace Aws
591758
const Crt::Optional<Crt::ByteBuf> &payload,
592759
OnMessageFlushCallback onMessageFlushCallback) noexcept;
593760

761+
/**
762+
* Create a new stream.
763+
* @note Activate() must be called on the stream for it to actually initiate the new stream.
764+
* @param clientContinuationHandler Handler to process continuation events.
765+
* @return A newly created continuation.
766+
*/
594767
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;
595768

769+
/**
770+
* Close the connection.
771+
*/
596772
void Close() noexcept;
597773

774+
/**
775+
* Check if the connection is open.
776+
* @return True if the connection is open, false otherwise.
777+
*/
598778
bool IsOpen() const noexcept
599779
{
600780
if (this->m_underlyingConnection == nullptr)
@@ -608,7 +788,7 @@ namespace Aws
608788
}
609789

610790
/**
611-
* @return true if the instance is in a valid state, false otherwise.
791+
* @return true if the connection is open, false otherwise.
612792
*/
613793
operator bool() const noexcept { return IsOpen(); }
614794

@@ -660,6 +840,10 @@ namespace Aws
660840
void *userData) noexcept;
661841

662842
static void s_protocolMessageCallback(int errorCode, void *userData) noexcept;
843+
844+
/**
845+
* Sends a message on the connection. These must be connection level messages (not application messages).
846+
*/
663847
static std::future<RpcError> s_sendProtocolMessage(
664848
ClientConnection *connection,
665849
const Crt::List<EventStreamHeader> &headers,

0 commit comments

Comments
 (0)