Skip to content

Commit 7cdbd39

Browse files
author
Bret Ambrose
committed
Checkpoint before refactoring impl
1 parent 1d02362 commit 7cdbd39

File tree

4 files changed

+429
-452
lines changed

4 files changed

+429
-452
lines changed

eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h

Lines changed: 69 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ namespace Aws
4141
class ClientOperation;
4242
class ClientConnection;
4343
class ClientContinuation;
44+
class ClientContinuationHandler;
4445

4546
using HeaderValueType = aws_event_stream_header_value_type;
4647
using MessageType = aws_event_stream_rpc_message_type;
@@ -212,10 +213,11 @@ namespace Aws
212213

213214
struct AWS_EVENTSTREAMRPC_API RpcError
214215
{
216+
explicit operator bool() const noexcept { return baseStatus == EVENT_STREAM_RPC_SUCCESS; }
217+
Crt::String StatusToString();
218+
215219
EventStreamRpcStatusCode baseStatus;
216220
int crtError;
217-
operator bool() const noexcept { return baseStatus == EVENT_STREAM_RPC_SUCCESS; }
218-
Crt::String StatusToString();
219221
};
220222

221223
/**
@@ -232,20 +234,21 @@ namespace Aws
232234
* is invoked, the `ClientConnection` is ready to be used for sending messages.
233235
*/
234236
virtual void OnConnectCallback();
237+
235238
/**
236239
* Invoked upon connection shutdown.
237240
* @param status The status upon disconnection. It can be treated as a bool
238241
* with true implying a successful disconnection.
239242
*/
240243
virtual void OnDisconnectCallback(RpcError status);
244+
241245
/**
242246
* Invoked upon receiving an error. Use the return value to determine
243-
* whether or not to force the connection to close. Keep in mind that once
244-
* closed, the `ClientConnection` can no longer send messages.
245-
* @param status The status upon disconnection. It can be treated as a bool
246-
* with true implying a successful disconnection.
247+
* whether or not to force the connection to close.
248+
* @param status Details about the error encountered.
247249
*/
248250
virtual bool OnErrorCallback(RpcError status);
251+
249252
/**
250253
* Invoked upon receiving a ping from the server. The `headers` and `payload`
251254
* refer to what is contained in the ping message.
@@ -255,6 +258,61 @@ namespace Aws
255258
const Crt::Optional<Crt::ByteBuf> &payload);
256259
};
257260

261+
262+
class ClientConnectionImpl;
263+
264+
/**
265+
* Class representing a connection to an RPC server.
266+
*/
267+
class AWS_EVENTSTREAMRPC_API ClientConnection final
268+
{
269+
public:
270+
271+
explicit ClientConnection(Crt::Allocator *allocator = Crt::g_allocator) noexcept;
272+
~ClientConnection() noexcept;
273+
274+
/**
275+
* Initiates a new outgoing event-stream-rpc connection.
276+
* @param connectionOptions Connection options.
277+
* @param connectionLifecycleHandler Handler to process connection lifecycle events.
278+
* @param clientBootstrap ClientBootstrap object to run the connection on.
279+
* @return Future that will be resolved when connection either succeeds or fails.
280+
*/
281+
std::future<RpcError> Connect(
282+
const ConnectionConfig &connectionOptions,
283+
ConnectionLifecycleHandler *connectionLifecycleHandler,
284+
Crt::Io::ClientBootstrap &clientBootstrap) noexcept;
285+
286+
/**
287+
* Create a new stream.
288+
* @note Activate() must be called on the stream for it to actually initiate the new stream.
289+
* @param clientContinuationHandler Handler to process continuation events.
290+
* @return A newly created continuation.
291+
*/
292+
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;
293+
294+
/**
295+
* Close the connection.
296+
*/
297+
void Close() noexcept;
298+
299+
/**
300+
* Check if the connection is open.
301+
* @return True if the connection is open, false otherwise.
302+
*/
303+
bool IsOpen() const noexcept;
304+
305+
/**
306+
* Returns the C connection object, if it exists.
307+
* @return the C connection object, if it exists.
308+
*/
309+
struct aws_event_stream_rpc_client_connection *GetUnderlyingHandle() const noexcept;
310+
311+
private:
312+
313+
std::shared_ptr<ClientConnectionImpl> m_impl;
314+
};
315+
258316
/**
259317
* User data passed to callbacks for a new stream.
260318
*/
@@ -301,7 +359,7 @@ namespace Aws
301359

302360
private:
303361
friend class ClientContinuation;
304-
ContinuationCallbackData *m_callbackData;
362+
std::shared_ptr<ContinuationCallbackData> m_callbackData;
305363
};
306364

307365
/**
@@ -320,7 +378,7 @@ namespace Aws
320378
* @param allocator Allocator to use.
321379
*/
322380
ClientContinuation(
323-
ClientConnection *connection,
381+
struct aws_event_stream_rpc_client_connection *connection,
324382
ClientContinuationHandler &continuationHandler,
325383
Crt::Allocator *allocator) noexcept;
326384
~ClientContinuation() noexcept;
@@ -372,7 +430,9 @@ namespace Aws
372430
Crt::Allocator *m_allocator;
373431
ClientContinuationHandler &m_continuationHandler;
374432
struct aws_event_stream_rpc_client_continuation_token *m_continuationToken;
375-
ContinuationCallbackData *m_callbackData;
433+
std::shared_ptr<ContinuationCallbackData> m_callbackData;
434+
435+
void Release();
376436

377437
static void s_onContinuationMessage(
378438
struct aws_event_stream_rpc_client_continuation_token *continuationToken,
@@ -722,146 +782,6 @@ namespace Aws
722782
std::condition_variable m_closeReady;
723783
};
724784

725-
/**
726-
* Class representing a connection to an RPC server.
727-
*/
728-
class AWS_EVENTSTREAMRPC_API ClientConnection final
729-
{
730-
public:
731-
ClientConnection(Crt::Allocator *allocator = Crt::g_allocator) noexcept;
732-
~ClientConnection() noexcept;
733-
ClientConnection(const ClientConnection &) noexcept = delete;
734-
ClientConnection &operator=(const ClientConnection &) noexcept = delete;
735-
ClientConnection(ClientConnection &&) noexcept;
736-
ClientConnection &operator=(ClientConnection &&) noexcept;
737785

738-
/**
739-
* Initiates a new outgoing event-stream-rpc connection.
740-
* @param connectionOptions Connection options.
741-
* @param connectionLifecycleHandler Handler to process connection lifecycle events.
742-
* @param clientBootstrap ClientBootstrap object to run the connection on.
743-
* @return Future that will be resolved when connection either succeeds or fails.
744-
*/
745-
std::future<RpcError> Connect(
746-
const ConnectionConfig &connectionOptions,
747-
ConnectionLifecycleHandler *connectionLifecycleHandler,
748-
Crt::Io::ClientBootstrap &clientBootstrap) noexcept;
749-
750-
std::future<RpcError> SendPing(
751-
const Crt::List<EventStreamHeader> &headers,
752-
const Crt::Optional<Crt::ByteBuf> &payload,
753-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
754-
755-
std::future<RpcError> SendPingResponse(
756-
const Crt::List<EventStreamHeader> &headers,
757-
const Crt::Optional<Crt::ByteBuf> &payload,
758-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
759-
760-
/**
761-
* Create a new stream.
762-
* @note Activate() must be called on the stream for it to actually initiate the new stream.
763-
* @param clientContinuationHandler Handler to process continuation events.
764-
* @return A newly created continuation.
765-
*/
766-
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;
767-
768-
/**
769-
* Close the connection.
770-
*/
771-
void Close() noexcept;
772-
773-
/**
774-
* Check if the connection is open.
775-
* @return True if the connection is open, false otherwise.
776-
*/
777-
bool IsOpen() const noexcept
778-
{
779-
if (this->m_underlyingConnection == nullptr)
780-
{
781-
return false;
782-
}
783-
else
784-
{
785-
return aws_event_stream_rpc_client_connection_is_open(this->m_underlyingConnection);
786-
}
787-
}
788-
789-
/**
790-
* @return true if the connection is open, false otherwise.
791-
*/
792-
operator bool() const noexcept { return IsOpen(); }
793-
794-
private:
795-
friend class ClientContinuation;
796-
friend std::future<RpcError> ClientOperation::Close(OnMessageFlushCallback onMessageFlushCallback) noexcept;
797-
enum ClientState
798-
{
799-
DISCONNECTED = 1,
800-
CONNECTING_SOCKET,
801-
WAITING_FOR_CONNECT_ACK,
802-
CONNECTED,
803-
DISCONNECTING,
804-
};
805-
/* This recursive mutex protects m_clientState & m_connectionWillSetup */
806-
std::recursive_mutex m_stateMutex;
807-
Crt::Allocator *m_allocator;
808-
struct aws_event_stream_rpc_client_connection *m_underlyingConnection;
809-
ClientState m_clientState;
810-
ConnectionLifecycleHandler *m_lifecycleHandler;
811-
ConnectMessageAmender m_connectMessageAmender;
812-
std::promise<void> m_connectionSetupPromise;
813-
bool m_connectionWillSetup;
814-
std::promise<RpcError> m_connectAckedPromise;
815-
std::promise<RpcError> m_closedPromise;
816-
bool m_onConnectCalled;
817-
RpcError m_closeReason;
818-
OnMessageFlushCallback m_onConnectRequestCallback;
819-
Crt::Io::SocketOptions m_socketOptions;
820-
ConnectionConfig m_connectionConfig;
821-
std::future<RpcError> SendProtocolMessage(
822-
const Crt::List<EventStreamHeader> &headers,
823-
const Crt::Optional<Crt::ByteBuf> &payload,
824-
MessageType messageType,
825-
uint32_t messageFlags,
826-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
827-
828-
static void s_onConnectionShutdown(
829-
struct aws_event_stream_rpc_client_connection *connection,
830-
int errorCode,
831-
void *userData) noexcept;
832-
static void s_onConnectionSetup(
833-
struct aws_event_stream_rpc_client_connection *connection,
834-
int errorCode,
835-
void *userData) noexcept;
836-
static void s_onProtocolMessage(
837-
struct aws_event_stream_rpc_client_connection *connection,
838-
const struct aws_event_stream_rpc_message_args *messageArgs,
839-
void *userData) noexcept;
840-
841-
static void s_protocolMessageCallback(int errorCode, void *userData) noexcept;
842-
843-
/**
844-
* Sends a message on the connection. These must be connection level messages (not application messages).
845-
*/
846-
static std::future<RpcError> s_sendProtocolMessage(
847-
ClientConnection *connection,
848-
const Crt::List<EventStreamHeader> &headers,
849-
const Crt::Optional<Crt::ByteBuf> &payload,
850-
MessageType messageType,
851-
uint32_t messageFlags,
852-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
853-
854-
static std::future<RpcError> s_sendPing(
855-
ClientConnection *connection,
856-
const Crt::List<EventStreamHeader> &headers,
857-
const Crt::Optional<Crt::ByteBuf> &payload,
858-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
859-
860-
static std::future<RpcError> s_sendPingResponse(
861-
ClientConnection *connection,
862-
const Crt::List<EventStreamHeader> &headers,
863-
const Crt::Optional<Crt::ByteBuf> &payload,
864-
OnMessageFlushCallback onMessageFlushCallback) noexcept;
865-
};
866786
} // namespace Eventstreamrpc
867787
} // namespace Aws

0 commit comments

Comments
 (0)