Skip to content

Commit 9fc2f57

Browse files
authored
Support wildcards streams in request-response stream client (#381)
1 parent 36896f6 commit 9fc2f57

File tree

6 files changed

+1315
-221
lines changed

6 files changed

+1315
-221
lines changed

include/aws/mqtt/private/request-response/request_response_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_releas
2020

2121
AWS_EXTERN_C_END
2222

23-
#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */
23+
#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
2+
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
3+
4+
/**
5+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
6+
* SPDX-License-Identifier: Apache-2.0.
7+
*/
8+
9+
#include <aws/common/byte_buf.h>
10+
#include <aws/common/hash_table.h>
11+
#include <aws/common/linked_list.h>
12+
#include <aws/mqtt/exports.h>
13+
14+
struct aws_mqtt_rr_incoming_publish_event;
15+
16+
/*
17+
* Handles subscriptions for request-response client.
18+
* Lifetime of this struct is bound to request-response client.
19+
*/
20+
struct aws_request_response_subscriptions {
21+
struct aws_allocator *allocator;
22+
23+
/*
24+
* Map from cursor (topic filter) -> list of streaming operations using that filter
25+
*
26+
* We don't garbage collect this table over the course of normal client operation. We only clean it up
27+
* when the client is shutting down.
28+
*/
29+
struct aws_hash_table streaming_operation_subscription_lists;
30+
31+
/*
32+
* Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter
33+
*
34+
* We don't garbage collect this table over the course of normal client operation. We only clean it up
35+
* when the client is shutting down.
36+
*/
37+
struct aws_hash_table streaming_operation_wildcards_subscription_lists;
38+
39+
/*
40+
* Map from cursor (topic) -> request response path (topic, correlation token json path)
41+
*/
42+
struct aws_hash_table request_response_paths;
43+
};
44+
45+
/*
46+
* This is the (key and) value in stream subscriptions tables.
47+
*/
48+
struct aws_rr_operation_list_topic_filter_entry {
49+
struct aws_allocator *allocator;
50+
51+
struct aws_byte_cursor topic_filter_cursor;
52+
struct aws_byte_buf topic_filter;
53+
54+
struct aws_linked_list operations;
55+
};
56+
57+
/*
58+
* Value in request subscriptions table.
59+
*/
60+
struct aws_rr_response_path_entry {
61+
struct aws_allocator *allocator;
62+
63+
size_t ref_count;
64+
65+
struct aws_byte_cursor topic_cursor;
66+
struct aws_byte_buf topic;
67+
68+
struct aws_byte_buf correlation_token_json_path;
69+
};
70+
71+
/*
72+
* Callback type for matched stream subscriptions.
73+
*/
74+
typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
75+
const struct aws_linked_list *operations,
76+
const struct aws_byte_cursor *topic_filter,
77+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
78+
void *user_data);
79+
80+
/*
81+
* Callback type for matched request subscriptions.
82+
*/
83+
typedef void(aws_mqtt_request_operation_subscription_match_fn)(
84+
struct aws_rr_response_path_entry *entry,
85+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
86+
void *user_data);
87+
88+
AWS_EXTERN_C_BEGIN
89+
90+
/*
91+
* Initialize internal state of a provided request-response subscriptions structure.
92+
*/
93+
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init(
94+
struct aws_request_response_subscriptions *subscriptions,
95+
struct aws_allocator *allocator);
96+
97+
/*
98+
* Clean up internals of a provided request-response subscriptions structure.
99+
*/
100+
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up(
101+
struct aws_request_response_subscriptions *subscriptions);
102+
103+
/*
104+
* Add a subscription for stream operations.
105+
* If subscription with the same topic filter is already added, previously created
106+
* aws_rr_operation_list_topic_filter_entry instance is returned.
107+
*/
108+
AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry *
109+
aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
110+
struct aws_request_response_subscriptions *subscriptions,
111+
const struct aws_byte_cursor *topic_filter);
112+
113+
/*
114+
* Add a subscription for request operation.
115+
*/
116+
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
117+
struct aws_request_response_subscriptions *subscriptions,
118+
const struct aws_byte_cursor *topic_filter,
119+
const struct aws_byte_cursor *correlation_token_json_path);
120+
121+
/*
122+
* Remove a subscription for a given request operation.
123+
*/
124+
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
125+
struct aws_request_response_subscriptions *subscriptions,
126+
const struct aws_byte_cursor *topic_filter);
127+
128+
/*
129+
* Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event.
130+
*/
131+
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_handle_incoming_publish(
132+
const struct aws_request_response_subscriptions *subscriptions,
133+
const struct aws_mqtt_rr_incoming_publish_event *publish_event,
134+
aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
135+
aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
136+
void *user_data);
137+
138+
AWS_EXTERN_C_END
139+
140+
#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_SET_H */

0 commit comments

Comments
 (0)