Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support wildcards streams in request-response stream client #381

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0bfd64e
Outline
sfodagain Jan 16, 2025
fd912fa
Extract search into separate source
sfodagain Jan 17, 2025
ba11a74
Move subscriptions match
sfodagain Jan 17, 2025
af0bb88
Move init
sfodagain Jan 17, 2025
d2f5325
Move add_stream_subscription
sfodagain Jan 17, 2025
7263993
Move add_request_subscription
sfodagain Jan 17, 2025
f093b1d
Move remove_request_subscription
sfodagain Jan 17, 2025
8fdc4fc
Style fixes
sfodagain Jan 17, 2025
4757825
Call on_publish cb for wildcarded topics
sfodagain Jan 17, 2025
8d85817
Change stream cb
sfodagain Jan 20, 2025
26b9d2e
Tests fixup
sfodagain Jan 21, 2025
ec659f1
test fixup
sfodagain Jan 21, 2025
49fde1f
Support multi level wildcards
sfodagain Jan 21, 2025
9da0526
temp
sfodagain Jan 29, 2025
e8838a0
Merge branch 'main' into support-wildcards-streams
sfodagain Jan 30, 2025
4ade5cd
Remove client from subscription module
sfodagain Jan 30, 2025
afb4028
Extract matching
sfodagain Jan 30, 2025
01fa22f
Fix log
sfodagain Jan 30, 2025
42e85f6
Revert max_streaming_subscriptions in tests
sfodagain Jan 30, 2025
e7f4a79
Add comments and test
sfodagain Jan 31, 2025
98911da
Fix tests naming
sfodagain Jan 31, 2025
2390554
Simplify adding and removing req op
sfodagain Feb 3, 2025
25fcb4a
More tests
sfodagain Feb 3, 2025
c6495be
Test for both subscriptions
sfodagain Feb 3, 2025
4c621ca
fixup
sfodagain Feb 3, 2025
41d5067
Deal with logs
sfodagain Feb 3, 2025
386e991
Fix comments and style
sfodagain Feb 3, 2025
af67f9c
More tests
sfodagain Feb 3, 2025
1bf1b8b
format
sfodagain Feb 3, 2025
061731f
fixup
sfodagain Feb 3, 2025
9208605
Merge branch 'main' into support-wildcards-streams
sfod Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to see the "request_response" prefix removed somehow.

#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/common/byte_buf.h>
#include <aws/common/hash_table.h>
#include <aws/common/linked_list.h>
#include <aws/mqtt/private/request-response/protocol_adapter.h>

/*
* Handles subscriptions for request-response client.
* Lifetime of this struct is bound to request-response client.
*/
struct aws_request_response_subscriptions {
struct aws_allocator *allocator;

/*
* Map from cursor (topic filter) -> list of streaming operations using that filter
*
* We don't garbage collect this table over the course of normal client operation. We only clean it up
* when the client is shutting down.
*/
struct aws_hash_table streaming_operation_subscription_lists;

/*
* Map from cursor (topic filter with wildcards) -> list of streaming operations using that filter
*
* We don't garbage collect this table over the course of normal client operation. We only clean it up
* when the client is shutting down.
*/
struct aws_hash_table streaming_operation_wildcards_subscription_lists;

/*
* Map from cursor (topic) -> request response path (topic, correlation token json path)
*/
struct aws_hash_table request_response_paths;
};

/*
* This is the (key and) value in stream subscriptions tables.
*/
struct aws_rr_operation_list_topic_filter_entry {
struct aws_allocator *allocator;

struct aws_byte_cursor topic_filter_cursor;
struct aws_byte_buf topic_filter;

struct aws_linked_list operations;
};

/*
* Value in request subscriptions table.
*/
struct aws_rr_response_path_entry {
struct aws_allocator *allocator;

size_t ref_count;

struct aws_byte_cursor topic_cursor;
struct aws_byte_buf topic;

struct aws_byte_buf correlation_token_json_path;
};

/*
* Callback type for matched stream subscriptions.
*/
typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
const struct aws_linked_list *operations,
const struct aws_byte_cursor *topic_filter,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

/*
* Callback type for matched request subscriptions.
*/
typedef void(aws_mqtt_request_operation_subscription_match_fn)(
struct aws_rr_response_path_entry *entry,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

AWS_EXTERN_C_BEGIN

/*
* Initialize internal state of a provided request-response subscriptions structure.
*/
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_init(
struct aws_request_response_subscriptions *subscriptions,
struct aws_allocator *allocator);

/*
* Clean up internals of a provided request-response subscriptions structure.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_clean_up(
struct aws_request_response_subscriptions *subscriptions);

/*
* Add a subscription for stream operations.
* If subscription with the same topic filter is already added, previously created
* aws_rr_operation_list_topic_filter_entry instance is returned.
*/
AWS_MQTT_API struct aws_rr_operation_list_topic_filter_entry *
aws_mqtt_request_response_client_subscriptions_add_stream_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter);

/*
* Add a subscription for request operation.
*/
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter,
const struct aws_byte_cursor *correlation_token_json_path);

/*
* Remove a subscription for a given request operation.
*/
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter);

/*
* Call specified callbacks for all stream and request operations with subscriptions matching a provided publish event.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_match(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rename this to something ala 'handle_incoming_publish' or similar

const struct aws_request_response_subscriptions *subscriptions,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
aws_mqtt_request_operation_subscription_match_fn *on_request_operation_subscription_match,
void *user_data);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_SUBSCRIPTION_SET_H */
Loading