Skip to content

Commit 8df3dad

Browse files
committed
Support SQL filter expressions for streams
## What? This commit allows AMQP 1.0 clients to define SQL-like filter expressions when consuming from streams, enabling server-side message filtering. RabbitMQ will only dispatch messages that match the provided filter expression, reducing network traffic and client-side processing overhead. SQL filter expressions are a more powerful alternative to the [AMQP Property Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) introduced in RabbitMQ 4.1. SQL filter expressions are based on the [JMS message selector syntax](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax) and support: * Comparison operators (`=`, `<>`, `>`, `<`, `>=`, `<=`) * Logical operators (`AND`, `OR`, `NOT`) * Arithmetic operators (`+`, `-`, `*`, `/`) * Special operators (`BETWEEN`, `LIKE`, `IN`, `IS NULL`) * Access to the properties and application-properties sections **Examples** Simple expression: ```sql header.priority > 4 ``` Complex expression: ```sql order_type IN ('premium', 'express') AND total_amount BETWEEN 100 AND 5000 AND (customer_region LIKE 'EU-%' OR customer_region = 'US-CA') AND properties.creation-time >= 1750772279000 AND NOT cancelled ``` Like AMQP property filter expressions, SQL filter expressions can be combined with Bloom filters. Combining both allows for highly customisable expressions (SQL) and extremely fast evaluation (Bloom filter) if only a subset of the chunks need to be read from disk. ## Why? Compared to AMQP property filter expressions, SQL filter expressions provide the following advantage: * High expressiveness and flexibility in defining the filter Like for AMQP property filter expressions, the following advantages apply: * No false positives (as is the case for Bloom filters) * Multiple concurrent clients can attach to the same stream each consuming only a specific subset of messages while preserving message order. * Low network overhead as only messages that match the filter are transferred to the client * Likewise, lower resource usage (CPU and memory) on clients since they don't need to deserialise messages that they are not interested in. * If the SQL expression is simple, even the broker will save resources because it doesn't need to serialse and send messages that the client isn't interested in. ## How? ### JMS Message Selector Syntax vs. AMQP Extension Spec The AMQP Filter Expressions Version 1.0 extension Working Draft 09 defines SQL Filter Expressions in Section 6. This spec differs from the JMS message selector spec. Neither is a subset of the other. We can choose to follow either. However, I think it makes most sense to follow the JMS spec because: * The JMS spec is better defined * The JMS spec is far more widespread than the AMQP Working Draft spec. (A slight variation of the AMQP Working Draft is used by Azure Service Bus: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter) * The JMS spec is mostly simpler (partly because matching on only simple types) * This will allow for a single SQL parser in RabbitMQ for both AMQP clients consuming from a stream and possibly in future for JMS clients consuming from queues or topics. <details> <summary>AMQP extension spec vs JMS spec</summary> AMQP != is synonym for <> JMS defines only <> Conclusion <> is sufficient AMQP Strings can be tested for “greater than” “both operands are of type string or of type symbol (any combination is permitted) and the lexicographical rank of the left operand is greater than the lexicographical rank of the right operand” JMS “String and Boolean comparison is restricted to = and <>.” Conclusion The JMS behaviour is sufficient. AMQP IN <set-expression> set-expression can contain non-string literals JMS: set-expression can contain only string literals Conclusion The JMS behaviour is sufficient. AMQP EXISTS predicate to check for composite types JMS Only simple types Conclusion We want to match only for simple types, i.e. allowing matching only against values in the application-properties, properties sections and priority field of the header section. AMQP: Modulo operator % Conclusion JMS doesn't define the modulo operator. Let's start without it. We can decide in future to add support since it can actually be useful, for example for two receivers who want to process every other message. AMQP: The ‘+’ operator can concatenate string and symbol values Conclusion Such string concatenation isn't defined in JMS. We don't need it. AMQP: Define NAN and INF JMS: “Approximate literals use the Java floating-point literal syntax.” Examples include "7." Conclusion We can go with the JMS spec given that needs to be implemented anyway for JMS support. Scientific notations are supported in both the AMQP spec and JMS spec. AMQP String literals can be surrounded by single or double quotation marks JMS A string literal is enclosed in single quotes Conclusion Supporting single quotes is good enough. AMQP “A binary constant is a string of pairs of hexadecimal digits prefixed by ‘0x’ that are not enclosed in quotation marks” Conclusion JMS doesn't support binary constants. We can start without binary constants. Matching against binary values are still supported if these binary values can be expressed as UTF-8 strings. AMQP Functions DATE, UTC, SUBSTRING, LOWER, UPPER, LEFT, RIGHT Vendor specific functions Conclusion JMS doesn't define such functions. We can start without those functions. AMQP <field><array_element_reference> <field>‘.’<composite_type_reference> to access map and array elements Conclusion Same as above: We want to match only for simple types, i.e. allowing matching only against values in the application-properties, properties sections and priority field of the header section. AMQP allows for delimited identifiers JMS Java identifier part characters Conclusion We can go with the Java identifiers extending the allowed characters by `.` and `-` to reference field names such as `properties.group-id`. JMS: BETWEEN operator Conclusion The BETWEEN operator isn't supported in the AMQP spec. Let's support it as convenience since it's already available in JMS. </details> ### Filter Name The client provides a filter with name `sql-filter` instead of name `jms-selector` to allow to differentiate between JMS clients and other native AMQP 1.0 clients using SQL expressions. This way, we can also optionally extend the SQL grammar in future. ### Identifiers JMS message selectors allow identifiers to contain some well known JMS headers that match to well known AMQP fields, for example: ```erl jms_header_to_amqp_field_name(<<"JMSDeliveryMode">>) -> durable; jms_header_to_amqp_field_name(<<"JMSPriority">>) -> priority; jms_header_to_amqp_field_name(<<"JMSMessageID">>) -> message_id; jms_header_to_amqp_field_name(<<"JMSTimestamp">>) -> creation_time; jms_header_to_amqp_field_name(<<"JMSCorrelationID">>) -> correlation_id; jms_header_to_amqp_field_name(<<"JMSType">>) -> subject; %% amqp-bindmap-jms-v1.0-wd10 § 3.2.2 JMS-defined ’JMSX’ Properties jms_header_to_amqp_field_name(<<"JMSXUserID">>) -> user_id; jms_header_to_amqp_field_name(<<"JMSXGroupID">>) -> group_id; jms_header_to_amqp_field_name(<<"JMSXGroupSeq">>) -> group_sequence; ``` This commit does a similar matching for `header.` and `properties.` prefixed identifiers to field names in the AMQP property section. The only field that is supported to filter on in the AMQP header section is `priority`, that is identifier `header.priority`. By default, as described in the AMQP extension spec, if an identifier is not prefixed, it refers to a key in the application-properties section. Hence, all identifiers prefixed with `header.`, and `properties.` have special meanings and MUST be avoided by applications unless they want to refer to those specific fields. Azure Service Bus uses the `sys.` and `user.` prefixes for well known field names and arbitrary application-provided keys, respectively. ### SQL lexer, parser and evaluator This commit implements the SQL lexer and parser in files rabbit_jms_selector_lexer.xrl and rabbit_jms_selector_parser.yrl, respectively. Advantages: * Both the definitions in the lexer and the grammar in the parser are defined **declaratively**. * In total, the entire SQL syntax and grammar is defined in only 240 lines. * Therefore, lexer and parser are simple to maintain. The idea of this commit is to use the same lexer and parser for native AMQP clients consumings from streams (this commit) as for JMS clients (in the future). All native AMQP client vs JMS client bits are then manipulated after the Abstract Syntax Tree (AST) has been created by the parser. For example, this commit transforms the AST specifically for native AMQP clients by mapping `properties.` prefixed identifiers (field names) to atoms. A JMS client's mapping from `JMS` prefixed headers can transform the AST differently. Likewise, this commit transforms the AST to compile a regex for complex LIKE expressions when consuming from a stream while a future version might not want to compile a regex when consuming from quorum queues. Module `rabbit_jms_ast` provides such AST helper methods. The lexer and parser are not performance critical as this work happens upon receivers attaching to the stream. The evaluator however is performance critical as message evaluation happens on the hot path. ### LIKE expressions The evaluator has been optimised to only compile a regex when necessary. If the LIKE expression-value contains no wildcard or only a single `%` wildcard, Erlang pattern matching is used as it's more efficient. Since `_` can match any UTF-8 character, a regex will be compiled with the `[unicode]` options. ### Filter errors Any errors upon a receiver attaching to a stream causes the filter to not become active. RabbitMQ will log a warning describing the reason and will omit the named filter in its attach reply frame. The client lib is responsible for detaching the link as explained in the AMQP spec: > The receiving endpoint sets its desired filter, the sending endpoint sets the filter actually in place (including any filters defaulted at the node). The receiving endpoint MUST check that the filter in place meets its needs and take responsibility for detaching if it does not. This applies to lexer and parser errors. Errors during message evaluation will result in an unknown value. Conditional operators on unknown are described in the JMS spec. If the entire selector condition is unknown, the message does not match, and will therefore not be delivered to the client. ## Clients Support for passing the SQL expression from app to broker is provided by the Java client in rabbitmq/rabbitmq-amqp-java-client#216
1 parent 7543523 commit 8df3dad

28 files changed

+6108
-151
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-record(filter, {
9+
descriptor :: binary() | non_neg_integer(),
10+
value :: term()
11+
}).

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(amqp10_client).
99

10-
-include("amqp10_client.hrl").
10+
-include("amqp10_client_internal.hrl").
1111
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1212

1313
-export([open_connection/1,

deps/amqp10_client/src/amqp10_client_connection.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-behaviour(gen_statem).
1111

12-
-include("amqp10_client.hrl").
12+
-include("amqp10_client_internal.hrl").
1313
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1414
-include_lib("amqp10_common/include/amqp10_types.hrl").
1515

deps/amqp10_client/src/amqp10_client_frame_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
-behaviour(gen_statem).
1010

11-
-include("amqp10_client.hrl").
11+
-include("amqp10_client_internal.hrl").
1212
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1313

1414
-ifdef(TEST).

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-behaviour(gen_statem).
1010

1111
-include("amqp10_client.hrl").
12+
-include("amqp10_client_internal.hrl").
1213
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1314
-include_lib("amqp10_common/include/amqp10_types.hrl").
1415

@@ -86,7 +87,7 @@
8687
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8788

8889
% http://www.amqp.org/specification/1.0/filters
89-
-type filter() :: #{binary() => binary() | map() | list(binary())}.
90+
-type filter() :: #{binary() => #filter{} | binary() | map() | list(binary())}.
9091
-type max_message_size() :: undefined | non_neg_integer().
9192
-type footer_opt() :: crc32 | adler32.
9293

@@ -781,29 +782,39 @@ translate_filters(Filters)
781782
when map_size(Filters) =:= 0 ->
782783
undefined;
783784
translate_filters(Filters) ->
784-
{map,
785-
maps:fold(
786-
fun
787-
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
788-
%% special case conversion
789-
Key = sym(K),
790-
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
791-
(K, V, Acc) when is_binary(K) ->
792-
%% try treat any filter value generically
793-
Key = sym(K),
794-
Value = filter_value_type(V),
795-
[{Key, {described, Key, Value}} | Acc]
796-
end, [], Filters)}.
797-
798-
filter_value_type(V) when is_binary(V) ->
785+
{map, lists:map(
786+
fun({Name, #filter{descriptor = Desc,
787+
value = V}})
788+
when is_binary(Name) ->
789+
Descriptor = if is_binary(Desc) -> {symbol, Desc};
790+
is_integer(Desc) -> {ulong, Desc}
791+
end,
792+
{{symbol, Name}, {described, Descriptor, V}};
793+
({<<"apache.org:legacy-amqp-headers-binding:map">> = K, V})
794+
when is_map(V) ->
795+
%% special case conversion
796+
Key = sym(K),
797+
Val = translate_legacy_amqp_headers_binding(V),
798+
{Key, {described, Key, Val}};
799+
({K, V})
800+
when is_binary(K) ->
801+
Key = {symbol, K},
802+
Val = filter_value_type(V),
803+
{Key, {described, Key, Val}}
804+
end, maps:to_list(Filters))}.
805+
806+
filter_value_type(V)
807+
when is_binary(V) ->
799808
%% this is clearly not always correct
800809
{utf8, V};
801810
filter_value_type(V)
802811
when is_integer(V) andalso V >= 0 ->
803812
{uint, V};
804-
filter_value_type(VList) when is_list(VList) ->
813+
filter_value_type(VList)
814+
when is_list(VList) ->
805815
{list, [filter_value_type(V) || V <- VList]};
806-
filter_value_type({T, _} = V) when is_atom(T) ->
816+
filter_value_type({T, _} = V)
817+
when is_atom(T) ->
807818
%% looks like an already tagged type, just pass it through
808819
V.
809820

@@ -1507,16 +1518,17 @@ translate_filters_selector_filter_test() ->
15071518
} = translate_filters(#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}).
15081519

15091520
translate_filters_multiple_filters_test() ->
1510-
{map,
1511-
[
1512-
{{symbol, <<"apache.org:selector-filter:string">>},
1513-
{described, {symbol, <<"apache.org:selector-filter:string">>},
1514-
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
1515-
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
1516-
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}
1517-
]
1518-
} = translate_filters(#{
1519-
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
1520-
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
1521-
}).
1521+
{map, Actual} = translate_filters(
1522+
#{
1523+
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
1524+
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
1525+
}),
1526+
Expected = [{{symbol, <<"apache.org:selector-filter:string">>},
1527+
{described, {symbol, <<"apache.org:selector-filter:string">>},
1528+
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
1529+
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
1530+
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}],
1531+
ActualSorted = lists:sort(Actual),
1532+
ExpectedSorted = lists:sort(Expected),
1533+
ExpectedSorted = ActualSorted.
15221534
-endif.

deps/amqp10_client/test/mock_server.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
recv_amqp_header_step/1
1717
]).
1818

19-
-include("src/amqp10_client.hrl").
19+
-include("src/amqp10_client_internal.hrl").
2020

2121
start(Port) ->
2222
{ok, LSock} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, false}]),
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
%% A filter with this name contains a JMS message selector.
8+
%% We use the same name as sent by the Qpid JMS client in
9+
%% https://github.com/apache/qpid-jms/blob/2.7.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
10+
-define(FILTER_NAME_JMS, <<"jms-selector">>).
11+
12+
%% A filter with this name contains an SQL expression.
13+
%% In the current version, such a filter must comply with the JMS message selector syntax.
14+
%% However, we use a name other than "jms-selector" in case we want to extend the allowed syntax
15+
%% in the future, for example allowing for some of the extended grammar described in
16+
%% §6 "SQL Filter Expressions" of
17+
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
18+
-define(FILTER_NAME_SQL, <<"sql-filter">>).
19+
20+
%% SQL-based filtering syntax
21+
%% These descriptors are defined in
22+
%% https://www.amqp.org/specification/1.0/filters
23+
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
24+
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).
25+
26+
%% AMQP Filter Expressions Version 1.0 Working Draft 09
27+
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
28+
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
29+
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
30+
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
31+
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).

deps/amqp10_common/include/amqp10_filtex.hrl

Lines changed: 0 additions & 15 deletions
This file was deleted.

deps/rabbit/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ define ct_master.erl
258258
endef
259259

260260
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
261-
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261+
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_jms_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
262262
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
263263
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
264264

@@ -363,6 +363,9 @@ ifdef TRACE_SUPERVISOR2
363363
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
364364
endif
365365

366+
# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
367+
export ERL_COMPILER_OPTIONS := deterministic
368+
366369
# --------------------------------------------------------------------
367370
# Documentation.
368371
# --------------------------------------------------------------------

deps/rabbit/src/mc.erl

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
-type str() :: atom() | string() | binary().
4646
-type internal_ann_key() :: atom().
4747
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
48-
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
48+
-type x_ann_value() :: str() | number() | TaggedValue :: tuple() | [x_ann_value()].
4949
-type protocol() :: module().
5050
-type annotations() :: #{internal_ann_key() => term(),
5151
x_ann_key() => x_ann_value()}.
@@ -76,8 +76,7 @@
7676
-type property_value() :: undefined |
7777
string() |
7878
binary() |
79-
integer() |
80-
float() |
79+
number() |
8180
boolean().
8281
-type tagged_value() :: {uuid, binary()} |
8382
{utf8, binary()} |
@@ -155,9 +154,9 @@ init(Proto, Data, Anns) ->
155154
-spec init(protocol(), term(), annotations(), environment()) -> state().
156155
init(Proto, Data, Anns0, Env) ->
157156
{ProtoData, ProtoAnns} = Proto:init(Data),
158-
Anns1 = case map_size(Env) =:= 0 of
159-
true -> Anns0;
160-
false -> Anns0#{env => Env}
157+
Anns1 = case map_size(Env) of
158+
0 -> Anns0;
159+
_ -> Anns0#{env => Env}
161160
end,
162161
Anns2 = maps:merge(ProtoAnns, Anns1),
163162
Anns = ensure_received_at_timestamp(Anns2),
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-module(rabbit_amqp_filter).
8+
9+
-export([eval/2]).
10+
11+
-type expression() :: undefined |
12+
{property, rabbit_amqp_filter_prop:parsed_expressions()} |
13+
{jms, rabbit_amqp_filter_jms:parsed_expression()}.
14+
15+
-export_type([expression/0]).
16+
17+
-spec eval(expression(), mc:state()) -> boolean().
18+
eval(undefined, _Mc) ->
19+
%% A receiver without filter wants all messages.
20+
true;
21+
eval({property, Expr}, Mc) ->
22+
rabbit_amqp_filter_prop:eval(Expr, Mc);
23+
eval({jms, Expr}, Mc) ->
24+
rabbit_amqp_filter_jms:eval(Expr, Mc).

0 commit comments

Comments
 (0)