@@ -172,6 +172,115 @@ async Task SendTo(string state)
172172        await  SystemUtils . CleanUpStreamSystem ( system ,  stream ) . ConfigureAwait ( false ) ; 
173173    } 
174174
175+     [ SkippableFact ] 
176+     public  async  void  FilterShouldReturnOnlyOneChunkWithDeduplication ( ) 
177+     { 
178+         SystemUtils . InitStreamSystemWithRandomStream ( out  var  system ,  out  var  stream ) ; 
179+         if  ( ! AvailableFeaturesSingleton . Instance . PublishFilter ) 
180+         { 
181+             throw  new  SkipException ( "broker does not support filter" ) ; 
182+         } 
183+ 
184+         var  deduplicatingProducer  =  await  DeduplicatingProducer . Create ( 
185+             new  DeduplicatingProducerConfig ( system ,  stream ,  "my_ref" ) 
186+             { 
187+                 Filter  =  new  ProducerFilter ( ) 
188+                 { 
189+                     // define the producer filter  
190+                     FilterValue  =  message =>  message . ApplicationProperties [ "state" ] . ToString ( ) , 
191+                 } 
192+             } 
193+         ) ; 
194+ 
195+         const  int  ToSend  =  50 ; 
196+ 
197+         async  Task  SendTo ( string  state ,  ulong  start ) 
198+         { 
199+             for  ( var  i  =  ( 0  +  start ) ;  i  <  ToSend  +  start ;  i ++ ) 
200+             { 
201+                 var  message  =  new  Message ( Encoding . UTF8 . GetBytes ( $ "Message: { i } .  State: { state } ") ) 
202+                 { 
203+                     ApplicationProperties  =  new  ApplicationProperties ( )  {  [ "state" ]  =  state  } , 
204+                     Properties  =  new  Properties ( )  {  GroupId  =  $ "group_{ i } "  } 
205+                 } ; 
206+                 await  deduplicatingProducer . Send ( i ,  message ) . ConfigureAwait ( false ) ; 
207+             } 
208+         } 
209+ 
210+         await  SendTo ( "Alabama" ,  0 ) ; 
211+         await  Task . Delay ( TimeSpan . FromSeconds ( 2 ) ) . ConfigureAwait ( false ) ; 
212+         await  SendTo ( "New York" ,  ToSend ) ; 
213+         await  Task . Delay ( TimeSpan . FromSeconds ( 2 ) ) . ConfigureAwait ( false ) ; 
214+ 
215+         var  testPassedAlabama  =  new  TaskCompletionSource < int > ( ) ; 
216+         var  consumedAlabama  =  new  List < Message > ( ) ; 
217+         var  consumerAlabama  =  await  Consumer . Create ( new  ConsumerConfig ( system ,  stream ) 
218+         { 
219+             OffsetSpec  =  new  OffsetTypeFirst ( ) , 
220+ 
221+             // This is mandatory for enabling the filter 
222+             Filter  =  new  ConsumerFilter ( ) 
223+             { 
224+                 Values  =  new  List < string > ( )  {  "Alabama"  } , 
225+                 PostFilter  = 
226+                     _ => 
227+                         true ,  // we don't apply any post filter here to be sure that the server is doing the filtering  
228+                 MatchUnfiltered  =  true 
229+             } , 
230+             MessageHandler  =  ( _ ,  _ ,  _ ,  message )  => 
231+             { 
232+                 consumedAlabama . Add ( message ) ; 
233+                 if  ( consumedAlabama . Count  ==  ToSend ) 
234+                 { 
235+                     testPassedAlabama . SetResult ( ToSend ) ; 
236+                 } 
237+ 
238+                 return  Task . CompletedTask ; 
239+             } 
240+         } ) . ConfigureAwait ( false ) ; 
241+         Assert . True ( testPassedAlabama . Task . Wait ( TimeSpan . FromSeconds ( 5 ) ) ) ; 
242+ 
243+         Assert . Equal ( ToSend ,  consumedAlabama . Count ) ; 
244+ 
245+         // check that only the messages from Alabama were 
246+         consumedAlabama . Where ( m =>  m . ApplicationProperties [ "state" ] . Equals ( "Alabama" ) ) . ToList ( ) . ForEach ( m => 
247+         { 
248+             Assert . Equal ( "Alabama" ,  m . ApplicationProperties [ "state" ] ) ; 
249+         } ) ; 
250+ 
251+         await  consumerAlabama . Close ( ) . ConfigureAwait ( false ) ; 
252+         // let's reset  
253+         var  consumedNY  =  new  List < Message > ( ) ; 
254+ 
255+         var  consumerNY  =  await  Consumer . Create ( new  ConsumerConfig ( system ,  stream ) 
256+         { 
257+             OffsetSpec  =  new  OffsetTypeFirst ( ) , 
258+ 
259+             // This is mandatory for enabling the filter 
260+             Filter  =  new  ConsumerFilter ( ) 
261+             { 
262+                 Values  =  new  List < string > ( )  {  "New York"  } , 
263+                 PostFilter  = 
264+                     message =>  message . Properties . GroupId . ToString ( ) ! 
265+                         . Equals ( "group_55" ) ,  // we only want the message with  group_55 ignoring the rest 
266+                 // this filter is client side. We should have two messages with group_55 
267+                 // One for the standard send and one for the batch send 
268+                 MatchUnfiltered  =  true 
269+             } , 
270+             MessageHandler  =  ( _ ,  _ ,  _ ,  message )  => 
271+             { 
272+                 consumedNY . Add ( message ) ; 
273+                 return  Task . CompletedTask ; 
274+             } 
275+         } ) . ConfigureAwait ( false ) ; 
276+ 
277+         SystemUtils . Wait ( TimeSpan . FromSeconds ( 2 ) ) ; 
278+         Assert . Single ( consumedNY ) ; 
279+         Assert . Equal ( "group_55" ,  consumedNY [ 0 ] . Properties . GroupId ! ) ; 
280+         await  consumerNY . Close ( ) . ConfigureAwait ( false ) ; 
281+         await  SystemUtils . CleanUpStreamSystem ( system ,  stream ) . ConfigureAwait ( false ) ; 
282+     } 
283+ 
175284    // This test is to test when there are errors on the filter functions 
176285    // producer side and consumer side.  
177286    // FilterValue and PostFilter are user's functions and can throw exceptions 
@@ -249,7 +358,7 @@ await producer.Send(new Message(Encoding.UTF8.GetBytes("Message: " + i))
249358            OffsetSpec  =  new  OffsetTypeFirst ( ) , 
250359            Filter  =  new  ConsumerFilter ( ) 
251360            { 
252-                 Values  =  new  List < string > ( )  {  "my_filter"  } , // at this level we don't care about the filter value 
361+                 Values  =  new  List < string > ( )  {  "my_filter"  } ,   // at this level we don't care about the filter value 
253362                PostFilter  = 
254363                    message => 
255364                    { 
0 commit comments