Skip to content

Commit

Permalink
Add comments and test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jan 31, 2025
1 parent 42e85f6 commit e7f4a79
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@
#include <aws/common/byte_buf.h>
#include <aws/common/hash_table.h>
#include <aws/common/linked_list.h>
#include <aws/mqtt/exports.h>
#include <aws/mqtt/private/request-response/protocol_adapter.h>

struct aws_mqtt_request_response_client;

/* Holds subscriptions for request-response client. */
/* Handles subscriptions for request-response client. */
struct aws_request_response_subscriptions {
struct aws_allocator *allocator;

Expand Down Expand Up @@ -63,39 +60,65 @@ struct aws_rr_response_path_entry {
struct aws_byte_buf correlation_token_json_path;
};

/*
* Callback type for matched stream subscriptions.
*/
typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
const struct aws_linked_list *operations,
const struct aws_byte_cursor *topic_filter, // TODO Do we need this for anything other than tests?
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

/*
* Callback type for matched request subscriptions.
*/
typedef void(aws_mqtt_request_operation_subscription_match_fn)(
struct aws_rr_response_path_entry *entry,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

AWS_EXTERN_C_BEGIN

AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_init(
/*
* Initialize internal state of a provided request-response subscriptions structure.
*/
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init(
struct aws_request_response_subscriptions *subscriptions,
struct aws_allocator *allocator);

AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_cleanup(
/*
* Clean up internals of a provided request-response subscriptions structure.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up(
struct aws_request_response_subscriptions *subscriptions);

/*
* Add a subscription for stream operations.
* If subscription with the same topic filter is already added, previously created
* aws_rr_operation_list_topic_filter_entry instance is returned.
*/
AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry *
aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter);

AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
/*
* Add subscriptions for request operations for topics specified in paths list.
*/
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscriptions(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_array_list *paths);

/*
* Remove subscriptions for request operations for topics specified in paths list.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_array_list *paths);

/*
* Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_match(
const struct aws_request_response_subscriptions *subscriptions,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
Expand Down
4 changes: 2 additions & 2 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request

aws_priority_queue_clean_up(&client->operations_by_timeout);

aws_mqtt_request_response_client_subscriptions_cleanup(&client->subscriptions);
aws_mqtt_request_response_client_subscriptions_clean_up(&client->subscriptions);
aws_hash_table_clean_up(&client->operations_by_correlation_tokens);

aws_mem_release(client->allocator, client);
Expand Down Expand Up @@ -1196,7 +1196,7 @@ static int s_add_request_operation_to_response_path_table(
struct aws_mqtt_rr_client_operation *operation) {

struct aws_array_list *paths = &operation->storage.request_storage.operation_response_paths;
return aws_mqtt_request_response_client_subscriptions_add_request_subscription(&client->subscriptions, paths);
return aws_mqtt_request_response_client_subscriptions_add_request_subscriptions(&client->subscriptions, paths);
}

static int s_add_request_operation_to_correlation_token_table(
Expand Down
125 changes: 77 additions & 48 deletions source/request-response/request_response_subscription_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,6 @@
#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50
#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50

static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new(
struct aws_allocator *allocator,
struct aws_byte_cursor topic_filter) {
struct aws_rr_operation_list_topic_filter_entry *entry =
aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry));

entry->allocator = allocator;
aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter);
entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter);

aws_linked_list_init(&entry->operations);

return entry;
}

static void s_aws_rr_operation_list_topic_filter_entry_destroy(struct aws_rr_operation_list_topic_filter_entry *entry) {
if (entry == NULL) {
return;
Expand Down Expand Up @@ -72,51 +57,88 @@ static void s_aws_rr_response_path_table_hash_element_destroy(void *value) {
s_aws_rr_response_path_entry_destroy(value);
}

void aws_mqtt_request_response_client_subscriptions_init(
int aws_mqtt_request_response_client_subscriptions_init(
struct aws_request_response_subscriptions *subscriptions,
struct aws_allocator *allocator) {

AWS_FATAL_ASSERT(subscriptions);

subscriptions->allocator = allocator;

aws_hash_table_init(
&subscriptions->streaming_operation_subscription_lists,
allocator,
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy);

aws_hash_table_init(
&subscriptions->streaming_operation_wildcards_subscription_lists,
allocator,
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy);

aws_hash_table_init(
&subscriptions->request_response_paths,
allocator,
MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_response_path_table_hash_element_destroy);
if (aws_hash_table_init(
&subscriptions->streaming_operation_subscription_lists,
allocator,
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) {
goto clean_up;
}

if (aws_hash_table_init(
&subscriptions->streaming_operation_wildcards_subscription_lists,
allocator,
MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_operation_list_topic_filter_entry_hash_element_destroy)) {
goto clean_up;
}

if (aws_hash_table_init(
&subscriptions->request_response_paths,
allocator,
MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE,
aws_hash_byte_cursor_ptr,
aws_mqtt_byte_cursor_hash_equality,
NULL,
s_aws_rr_response_path_table_hash_element_destroy)) {
goto clean_up;
}

return AWS_OP_SUCCESS;

clean_up:
aws_mqtt_request_response_client_subscriptions_clean_up(subscriptions);
return AWS_OP_ERR;
}

void aws_mqtt_request_response_client_subscriptions_cleanup(struct aws_request_response_subscriptions *subscriptions) {
aws_hash_table_clean_up(&subscriptions->streaming_operation_subscription_lists);
aws_hash_table_clean_up(&subscriptions->streaming_operation_wildcards_subscription_lists);
aws_hash_table_clean_up(&subscriptions->request_response_paths);
void aws_mqtt_request_response_client_subscriptions_clean_up(struct aws_request_response_subscriptions *subscriptions) {
if (subscriptions == NULL) {
return;
}

if (aws_hash_table_is_valid(&subscriptions->streaming_operation_subscription_lists)) {
aws_hash_table_clean_up(&subscriptions->streaming_operation_subscription_lists);
}
if (aws_hash_table_is_valid(&subscriptions->streaming_operation_wildcards_subscription_lists)) {
aws_hash_table_clean_up(&subscriptions->streaming_operation_wildcards_subscription_lists);
}
if (aws_hash_table_is_valid(&subscriptions->request_response_paths)) {
aws_hash_table_clean_up(&subscriptions->request_response_paths);
}
}

static struct aws_rr_operation_list_topic_filter_entry *s_aws_rr_operation_list_topic_filter_entry_new(
struct aws_allocator *allocator,
struct aws_byte_cursor topic_filter) {
struct aws_rr_operation_list_topic_filter_entry *entry =
aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_operation_list_topic_filter_entry));

entry->allocator = allocator;
aws_byte_buf_init_copy_from_cursor(&entry->topic_filter, allocator, topic_filter);
entry->topic_filter_cursor = aws_byte_cursor_from_buf(&entry->topic_filter);

aws_linked_list_init(&entry->operations);

return entry;
}

struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter) {
AWS_FATAL_ASSERT(subscriptions);

bool is_topic_with_wildcard =
(memchr(topic_filter->ptr, '+', topic_filter->len) || memchr(topic_filter->ptr, '#', topic_filter->len));
Expand Down Expand Up @@ -144,9 +166,11 @@ struct aws_rr_operation_list_topic_filter_entry *aws_mqtt_request_response_clien
return entry;
}

int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
int aws_mqtt_request_response_client_subscriptions_add_request_subscriptions(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_array_list *paths) {
AWS_FATAL_ASSERT(subscriptions);
AWS_FATAL_ASSERT(paths);

size_t path_count = aws_array_list_length(paths);
for (size_t i = 0; i < path_count; ++i) {
Expand Down Expand Up @@ -178,6 +202,9 @@ int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_array_list *paths) {
AWS_FATAL_ASSERT(subscriptions);
AWS_FATAL_ASSERT(paths);

size_t path_count = aws_array_list_length(paths);
for (size_t i = 0; i < path_count; ++i) {
struct aws_mqtt_request_operation_response_path path;
Expand Down Expand Up @@ -329,7 +356,9 @@ void aws_mqtt_request_response_client_subscriptions_match(
aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
void *user_data) {

AWS_FATAL_PRECONDITION(subscriptions);
AWS_FATAL_PRECONDITION(publish_event);
// TODO ? Allow NULLs?
AWS_FATAL_PRECONDITION(on_stream_operation_subscription_match);
AWS_FATAL_PRECONDITION(on_request_operation_subscription_match);

Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ add_test_case(rrc_request_response_failure_invalid_correlation_token_type)
add_test_case(rrc_request_response_failure_non_matching_correlation_token)
add_test_case(rrc_request_response_multi_operation_sequence)

# "rrs" = request-response subscriptions
add_test_case(rrs_init_cleanup)
add_test_case(rrs_match_subscription_with_single_level_wildcards)
add_test_case(rrs_match_subscription_with_multi_level_wildcards)

Expand Down
14 changes: 12 additions & 2 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3368,6 +3368,16 @@ static void s_rrs_fixture_on_request_operation_subscription_match(
fprintf(stderr, "====== on req called\n");
}

static int s_rrs_init_cleanup_fn(struct aws_allocator *allocator, void *ctx) {
struct aws_request_response_subscriptions subscriptions;

ASSERT_SUCCESS(aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator));
aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions);
return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(rrs_init_cleanup, s_rrs_init_cleanup_fn)

static int s_rrs_match_subscription_with_single_level_wildcards_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down Expand Up @@ -3414,7 +3424,7 @@ static int s_rrs_match_subscription_with_single_level_wildcards_fn(struct aws_al

s_aws_rr_client_fixture_streaming_subscriptions_record_delete(record);

aws_mqtt_request_response_client_subscriptions_cleanup(&subscriptions);
aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions);

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -3467,7 +3477,7 @@ static int s_rrs_match_subscription_with_multi_level_wildcards_fn(struct aws_all

s_aws_rr_client_fixture_streaming_subscriptions_record_delete(record);

aws_mqtt_request_response_client_subscriptions_cleanup(&subscriptions);
aws_mqtt_request_response_client_subscriptions_clean_up(&subscriptions);

return AWS_OP_SUCCESS;
}
Expand Down

0 comments on commit e7f4a79

Please sign in to comment.