Skip to content

Commit

Permalink
Remove client from subscription module
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jan 30, 2025
1 parent e8838a0 commit 4ade5cd
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
void *user_data);

typedef void(aws_mqtt_request_operation_subscription_match_fn)(
struct aws_mqtt_request_response_client *rr_client,
struct aws_rr_response_path_entry *entry,
const struct aws_protocol_adapter_incoming_publish_event *publish_event);
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

AWS_EXTERN_C_BEGIN

Expand Down
20 changes: 14 additions & 6 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ struct aws_mqtt_request_response_client {
*/
struct aws_priority_queue operations_by_timeout;

/*
* Structure to handle subscriptions: add/remove subscriptions, match incoming messages.
* TODO Add normal description.
*/
struct aws_request_response_subscriptions subscriptions;

/*
Expand Down Expand Up @@ -782,6 +786,8 @@ static void s_apply_publish_to_streaming_operation_list(
const struct aws_byte_cursor *topic_filter,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data) {
(void)user_data;

AWS_FATAL_ASSERT(operations != NULL);

struct aws_linked_list_node *node = aws_linked_list_begin(operations);
Expand All @@ -804,8 +810,8 @@ static void s_apply_publish_to_streaming_operation_list(
continue;
}

void *user_data = operation->storage.streaming_storage.options.user_data;
(*incoming_publish_callback)(publish_event->payload, publish_event->topic, user_data);
void *operation_user_data = operation->storage.streaming_storage.options.user_data;
(*incoming_publish_callback)(publish_event->payload, publish_event->topic, operation_user_data);

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
Expand Down Expand Up @@ -870,9 +876,11 @@ static void s_complete_operation_with_correlation_token(
}

static void s_apply_publish_to_response_path_entry(
struct aws_mqtt_request_response_client *rr_client,
struct aws_rr_response_path_entry *entry,
const struct aws_protocol_adapter_incoming_publish_event *publish_event) {
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data) {

struct aws_mqtt_request_response_client *rr_client = user_data;

struct aws_json_value *json_payload = NULL;

Expand Down Expand Up @@ -967,7 +975,7 @@ static void s_aws_rr_client_protocol_adapter_incoming_publish_callback(
publish_event,
s_apply_publish_to_streaming_operation_list,
s_apply_publish_to_response_path_entry,
NULL);
rr_client);
}

static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) {
Expand Down Expand Up @@ -1057,7 +1065,7 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
sizeof(struct aws_mqtt_rr_client_operation *),
s_compare_rr_operation_timeouts);

aws_mqtt_request_response_client_subscriptions_init(&rr_client->subscriptions, rr_client, allocator);
aws_mqtt_request_response_client_subscriptions_init(&rr_client->subscriptions, allocator);

aws_hash_table_init(
&rr_client->operations_by_correlation_tokens,
Expand Down
19 changes: 7 additions & 12 deletions source/request-response/request_response_subscription_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
if (aws_hash_table_find(&subscriptions->request_response_paths, &path.topic, &element) || element == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: internal state error removing reference to response path for topic " PRInSTR,
(void *)subscriptions->client,
"internal state error removing reference to response path for topic " PRInSTR,
AWS_BYTE_CURSOR_PRI(path.topic));
continue;
}
Expand All @@ -199,15 +198,13 @@ void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
if (entry->ref_count == 0) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: removing last reference to response path for topic " PRInSTR,
(void *)subscriptions->client,
"removing last reference to response path for topic " PRInSTR,
AWS_BYTE_CURSOR_PRI(path.topic));
aws_hash_table_remove(&subscriptions->request_response_paths, &path.topic, NULL, NULL);
} else {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: removing reference to response path for topic " PRInSTR ", %zu references remain",
(void *)subscriptions->client,
"removing reference to response path for topic " PRInSTR ", %zu references remain",
AWS_BYTE_CURSOR_PRI(path.topic),
entry->ref_count);
}
Expand All @@ -222,7 +219,7 @@ static void s_match_wildcard_stream_subscriptions(

AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"= Looking subscription for topic '" PRInSTR "'",
"= Looking for subscription for topic '" PRInSTR "'",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter);
Expand Down Expand Up @@ -308,8 +305,7 @@ void aws_mqtt_request_response_client_subscriptions_match(
subscription_filter_element != NULL) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client incoming publish on topic '" PRInSTR "' matches streaming topic",
(void *)subscriptions->client,
"request-response client incoming publish on topic '" PRInSTR "' matches streaming topic",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

struct aws_rr_operation_list_topic_filter_entry *entry = subscription_filter_element->value;
Expand All @@ -330,10 +326,9 @@ void aws_mqtt_request_response_client_subscriptions_match(
response_path_element != NULL) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path",
(void *)subscriptions->client,
"request-response client incoming publish on topic '" PRInSTR "' matches response path",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

on_request_operation_subscription_match(subscriptions->client, response_path_element->value, publish_event);
on_request_operation_subscription_match(response_path_element->value, publish_event, user_data);
}
}
12 changes: 4 additions & 8 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3359,20 +3359,18 @@ static void s_rrs_fixture_on_stream_operation_subscription_match(
}

static void s_rrs_fixture_on_request_operation_subscription_match(
struct aws_mqtt_request_response_client *rr_client,
struct aws_rr_response_path_entry *entry,
const struct aws_protocol_adapter_incoming_publish_event *publish_event) {
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data) {
fprintf(stderr, "====== on req called\n");
}

static int s_rrs_match_subscription_with_single_level_wildcards_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

void *client = (void *)0x08;

struct aws_request_response_subscriptions subscriptions;

aws_mqtt_request_response_client_subscriptions_init(&subscriptions, client, allocator);
aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator);

struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/abc");
struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/123/+");
Expand Down Expand Up @@ -3425,11 +3423,9 @@ AWS_TEST_CASE(
static int s_rrs_match_subscription_with_multi_level_wildcards_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

void *client = (void *)0x08;

struct aws_request_response_subscriptions subscriptions;

aws_mqtt_request_response_client_subscriptions_init(&subscriptions, client, allocator);
aws_mqtt_request_response_client_subscriptions_init(&subscriptions, allocator);

struct aws_byte_cursor topic_filter1 = aws_byte_cursor_from_c_str("topic/123/abc");
struct aws_byte_cursor topic_filter2 = aws_byte_cursor_from_c_str("topic/123/#");
Expand Down

0 comments on commit 4ade5cd

Please sign in to comment.