22// Roland Pheasant licenses this file to you under the MIT license.
33// See the LICENSE file in the project root for full license information.
44
5- using System . Reactive . Disposables ;
65using System . Reactive . Linq ;
76using DynamicData . Internal ;
87
@@ -11,53 +10,68 @@ namespace DynamicData.Cache.Internal;
1110/// <summary>
1211/// Operator that is similiar to MergeMany but intelligently handles Cache ChangeSets.
1312/// </summary>
14- internal sealed class MergeManyCacheChangeSets < TObject , TKey , TDestination , TDestinationKey > ( IObservable < IChangeSet < TObject , TKey > > source , Func < TObject , TKey , IObservable < IChangeSet < TDestination , TDestinationKey > > > selector , IEqualityComparer < TDestination > ? equalityComparer , IComparer < TDestination > ? comparer )
13+ internal sealed class MergeManyCacheChangeSets < TObject , TKey , TDestination , TDestinationKey > ( IObservable < IChangeSet < TObject , TKey > > source , Func < TObject , TKey , IObservable < IChangeSet < TDestination , TDestinationKey > > > changeSetSelector , IEqualityComparer < TDestination > ? equalityComparer , IComparer < TDestination > ? comparer )
1514 where TObject : notnull
1615 where TKey : notnull
1716 where TDestination : notnull
1817 where TDestinationKey : notnull
1918{
2019 public IObservable < IChangeSet < TDestination , TDestinationKey > > Run ( ) => Observable . Create < IChangeSet < TDestination , TDestinationKey > > (
21- observer =>
20+ observer => new Subscription ( source , changeSetSelector , observer , equalityComparer , comparer ) ) ;
21+
22+ // Maintains state for a single subscription
23+ private sealed class Subscription : CacheParentSubscription < ChangeSetCache < TDestination , TDestinationKey > , TKey , IChangeSet < TDestination , TDestinationKey > , IChangeSet < TDestination , TDestinationKey > >
24+ {
25+ private readonly Cache < ChangeSetCache < TDestination , TDestinationKey > , TKey > _cache = new ( ) ;
26+ private readonly ChangeSetMergeTracker < TDestination , TDestinationKey > _changeSetMergeTracker ;
27+
28+ public Subscription (
29+ IObservable < IChangeSet < TObject , TKey > > source ,
30+ Func < TObject , TKey , IObservable < IChangeSet < TDestination , TDestinationKey > > > changeSetSelector ,
31+ IObserver < IChangeSet < TDestination , TDestinationKey > > observer ,
32+ IEqualityComparer < TDestination > ? equalityComparer ,
33+ IComparer < TDestination > ? comparer )
34+ : base ( observer )
35+ {
36+ _changeSetMergeTracker = new ( ( ) => _cache . Items , comparer , equalityComparer ) ;
37+
38+ // Child Observable has to go into the ChangeSetCache so the locking protects it
39+ CreateParentSubscription ( source . Transform ( ( obj , key ) =>
40+ new ChangeSetCache < TDestination , TDestinationKey > ( MakeChildObservable ( changeSetSelector ( obj , key ) . IgnoreSameReferenceUpdate ( ) ) ) ) ) ;
41+ }
42+
43+ protected override void ParentOnNext ( IChangeSet < ChangeSetCache < TDestination , TDestinationKey > , TKey > changes )
2244 {
23- var locker = InternalEx . NewLock ( ) ;
24- var cache = new Cache < ChangeSetCache < TDestination , TDestinationKey > , TKey > ( ) ;
25- var parentUpdate = false ;
26-
27- // This is manages all of the changes
28- var changeTracker = new ChangeSetMergeTracker < TDestination , TDestinationKey > ( ( ) => cache . Items , comparer , equalityComparer ) ;
29-
30- // Transform to a cache changeset of child caches, synchronize, update the local copy, and publish.
31- var shared = source
32- . Transform ( ( obj , key ) => new ChangeSetCache < TDestination , TDestinationKey > ( selector ( obj , key ) . Synchronize ( locker ) ) )
33- . Synchronize ( locker )
34- . Do ( changes =>
45+ // Process all the changes at once to preserve the changeset order
46+ foreach ( var change in changes . ToConcreteType ( ) )
47+ {
48+ switch ( change . Reason )
3549 {
36- cache . Clone ( changes ) ;
37- parentUpdate = true ;
38- } )
39- . Publish ( ) ;
40-
41- // Merge the child changeset changes together and apply to the tracker
42- var subMergeMany = shared
43- . MergeMany ( cacheChangeSet => cacheChangeSet . Source )
44- . SubscribeSafe (
45- changes => changeTracker . ProcessChangeSet ( changes , ! parentUpdate ? observer : null ) ,
46- observer . OnError ,
47- observer . OnCompleted ) ;
48-
49- // When a source item is removed, all of its sub-items need to be removed
50- var subRemove = shared
51- . OnItemRemoved ( changeSetCache => changeTracker . RemoveItems ( changeSetCache . Cache . KeyValues ) , invokeOnUnsubscribe : false )
52- . OnItemUpdated ( ( _ , prev ) => changeTracker . RemoveItems ( prev . Cache . KeyValues ) )
53- . SubscribeSafe (
54- _ =>
55- {
56- changeTracker . EmitChanges ( observer ) ;
57- parentUpdate = false ;
58- } ,
59- observer . OnError ) ;
60-
61- return new CompositeDisposable ( shared . Connect ( ) , subMergeMany , subRemove ) ;
62- } ) ;
50+ // Shutdown existing sub (if any) and create a new one that
51+ // Will update the cache and emit the changes
52+ case ChangeReason . Add or ChangeReason . Update :
53+ _cache . AddOrUpdate ( change . Current , change . Key ) ;
54+ AddChildSubscription ( change . Current . Source , change . Key ) ;
55+ if ( change . Previous . HasValue )
56+ {
57+ _changeSetMergeTracker . RemoveItems ( change . Previous . Value . Cache . KeyValues ) ;
58+ }
59+ break ;
60+
61+ // Shutdown the existing subscription and remove from the cache
62+ case ChangeReason . Remove :
63+ _cache . Remove ( change . Key ) ;
64+ RemoveChildSubscription ( change . Key ) ;
65+ _changeSetMergeTracker . RemoveItems ( change . Current . Cache . KeyValues ) ;
66+ break ;
67+ }
68+ }
69+ }
70+
71+ protected override void ChildOnNext ( IChangeSet < TDestination , TDestinationKey > changes , TKey parentKey ) =>
72+ _changeSetMergeTracker . ProcessChangeSet ( changes , null ) ;
73+
74+ protected override void EmitChanges ( IObserver < IChangeSet < TDestination , TDestinationKey > > observer ) =>
75+ _changeSetMergeTracker . EmitChanges ( observer ) ;
76+ }
6377}
0 commit comments