Skip to content

Commit 037b748

Browse files
authored
Merge pull request #236 from ReactiveX/time-fix
Fixing time based observable
2 parents e94d354 + fe66c9f commit 037b748

File tree

2 files changed

+227
-125
lines changed

2 files changed

+227
-125
lines changed

observable_operator.go

+169-55
Original file line numberDiff line numberDiff line change
@@ -451,39 +451,64 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
451451
}
452452

453453
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
454-
defer close(next)
455454
observe := o.Observe(opts...)
456455
buffer := make([]interface{}, 0)
456+
stop := make(chan struct{})
457+
mutex := sync.Mutex{}
458+
459+
checkBuffer := func() {
460+
mutex.Lock()
461+
if len(buffer) != 0 {
462+
if !Of(buffer).SendContext(ctx, next) {
463+
mutex.Unlock()
464+
return
465+
}
466+
buffer = make([]interface{}, 0)
467+
}
468+
mutex.Unlock()
469+
}
470+
471+
go func() {
472+
defer close(next)
473+
duration := timespan.duration()
474+
for {
475+
select {
476+
case <-stop:
477+
checkBuffer()
478+
return
479+
case <-ctx.Done():
480+
return
481+
case <-time.After(duration):
482+
checkBuffer()
483+
}
484+
}
485+
}()
457486

458487
for {
459488
select {
460489
case <-ctx.Done():
490+
close(stop)
461491
return
462492
case item, ok := <-observe:
463493
if !ok {
464-
if len(buffer) != 0 {
465-
Of(buffer).SendContext(ctx, next)
466-
}
494+
close(stop)
467495
return
468496
}
469497
if item.Error() {
470498
item.SendContext(ctx, next)
471499
if option.getErrorStrategy() == StopOnError {
500+
close(stop)
472501
return
473502
}
474503
} else {
504+
mutex.Lock()
475505
buffer = append(buffer, item.V)
476-
}
477-
case <-time.After(timespan.duration()):
478-
if len(buffer) != 0 {
479-
if !Of(buffer).SendContext(ctx, next) {
480-
return
481-
}
482-
buffer = make([]interface{}, 0)
506+
mutex.Unlock()
483507
}
484508
}
485509
}
486510
}
511+
487512
return customObservableOperator(f, opts...)
488513
}
489514

@@ -498,42 +523,69 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
498523
}
499524

500525
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
501-
defer close(next)
502526
observe := o.Observe(opts...)
503527
buffer := make([]interface{}, 0)
528+
stop := make(chan struct{})
529+
send := make(chan struct{})
530+
mutex := sync.Mutex{}
531+
532+
checkBuffer := func() {
533+
mutex.Lock()
534+
if len(buffer) != 0 {
535+
if !Of(buffer).SendContext(ctx, next) {
536+
mutex.Unlock()
537+
return
538+
}
539+
buffer = make([]interface{}, 0)
540+
}
541+
mutex.Unlock()
542+
}
543+
544+
go func() {
545+
defer close(next)
546+
duration := timespan.duration()
547+
for {
548+
select {
549+
case <-send:
550+
checkBuffer()
551+
case <-stop:
552+
checkBuffer()
553+
return
554+
case <-ctx.Done():
555+
return
556+
case <-time.After(duration):
557+
checkBuffer()
558+
}
559+
}
560+
}()
504561

505562
for {
506563
select {
507564
case <-ctx.Done():
508565
return
509566
case item, ok := <-observe:
510567
if !ok {
511-
if len(buffer) != 0 {
512-
Of(buffer).SendContext(ctx, next)
513-
}
568+
close(stop)
569+
close(send)
514570
return
515571
}
516572
if item.Error() {
517573
item.SendContext(ctx, next)
518574
if option.getErrorStrategy() == StopOnError {
575+
close(stop)
576+
close(send)
519577
return
520578
}
521579
} else {
580+
mutex.Lock()
522581
buffer = append(buffer, item.V)
523582
if len(buffer) == count {
524-
if !Of(buffer).SendContext(ctx, next) {
525-
return
526-
}
527-
buffer = make([]interface{}, 0)
583+
mutex.Unlock()
584+
send <- struct{}{}
585+
} else {
586+
mutex.Unlock()
528587
}
529588
}
530-
case <-time.After(timespan.duration()):
531-
if len(buffer) != 0 {
532-
if !Of(buffer).SendContext(ctx, next) {
533-
return
534-
}
535-
buffer = make([]interface{}, 0)
536-
}
537589
}
538590
}
539591
}
@@ -2676,47 +2728,77 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
26762728
}
26772729

26782730
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
2679-
defer close(next)
26802731
observe := o.Observe(opts...)
26812732
ch := option.buildChannel()
2733+
done := make(chan struct{})
26822734
empty := true
2735+
mutex := sync.Mutex{}
26832736
if !Of(FromChannel(ch)).SendContext(ctx, next) {
26842737
return
26852738
}
26862739

2740+
go func() {
2741+
defer func() {
2742+
mutex.Lock()
2743+
close(ch)
2744+
mutex.Unlock()
2745+
}()
2746+
defer close(next)
2747+
for {
2748+
select {
2749+
case <-ctx.Done():
2750+
return
2751+
case <-done:
2752+
return
2753+
case <-time.After(timespan.duration()):
2754+
mutex.Lock()
2755+
if empty {
2756+
mutex.Unlock()
2757+
continue
2758+
}
2759+
close(ch)
2760+
empty = true
2761+
ch = option.buildChannel()
2762+
if !Of(FromChannel(ch)).SendContext(ctx, next) {
2763+
close(done)
2764+
return
2765+
}
2766+
mutex.Unlock()
2767+
}
2768+
}
2769+
}()
2770+
26872771
for {
26882772
select {
26892773
case <-ctx.Done():
2690-
close(ch)
2774+
return
2775+
case <-done:
26912776
return
26922777
case item, ok := <-observe:
26932778
if !ok {
2694-
close(ch)
2779+
close(done)
26952780
return
26962781
}
26972782
if item.Error() {
2783+
mutex.Lock()
26982784
if !item.SendContext(ctx, ch) {
2785+
mutex.Unlock()
2786+
close(done)
26992787
return
27002788
}
2789+
mutex.Unlock()
27012790
if option.getErrorStrategy() == StopOnError {
2702-
close(ch)
2791+
close(done)
27032792
return
27042793
}
27052794
}
2795+
mutex.Lock()
27062796
if !item.SendContext(ctx, ch) {
2797+
mutex.Unlock()
27072798
return
27082799
}
27092800
empty = false
2710-
case <-time.After(timespan.duration()):
2711-
if empty {
2712-
continue
2713-
}
2714-
close(ch)
2715-
ch = option.buildChannel()
2716-
empty = true
2717-
if !Of(FromChannel(ch)).SendContext(ctx, next) {
2718-
return
2719-
}
2801+
mutex.Unlock()
27202802
}
27212803
}
27222804
}
@@ -2735,55 +2817,87 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
27352817
}
27362818

27372819
f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
2738-
defer close(next)
27392820
observe := o.Observe(opts...)
27402821
ch := option.buildChannel()
2822+
done := make(chan struct{})
2823+
mutex := sync.Mutex{}
27412824
iCount := 0
27422825
if !Of(FromChannel(ch)).SendContext(ctx, next) {
27432826
return
27442827
}
27452828

2829+
go func() {
2830+
defer func() {
2831+
mutex.Lock()
2832+
close(ch)
2833+
mutex.Unlock()
2834+
}()
2835+
defer close(next)
2836+
for {
2837+
select {
2838+
case <-ctx.Done():
2839+
return
2840+
case <-done:
2841+
return
2842+
case <-time.After(timespan.duration()):
2843+
mutex.Lock()
2844+
if iCount == 0 {
2845+
mutex.Unlock()
2846+
continue
2847+
}
2848+
close(ch)
2849+
iCount = 0
2850+
ch = option.buildChannel()
2851+
if !Of(FromChannel(ch)).SendContext(ctx, next) {
2852+
close(done)
2853+
return
2854+
}
2855+
mutex.Unlock()
2856+
}
2857+
}
2858+
}()
2859+
27462860
for {
27472861
select {
27482862
case <-ctx.Done():
2749-
close(ch)
2863+
return
2864+
case <-done:
27502865
return
27512866
case item, ok := <-observe:
27522867
if !ok {
2753-
close(ch)
2868+
close(done)
27542869
return
27552870
}
27562871
if item.Error() {
2872+
mutex.Lock()
27572873
if !item.SendContext(ctx, ch) {
2874+
mutex.Unlock()
2875+
close(done)
27582876
return
27592877
}
2878+
mutex.Unlock()
27602879
if option.getErrorStrategy() == StopOnError {
2761-
close(ch)
2880+
close(done)
27622881
return
27632882
}
27642883
}
2884+
mutex.Lock()
27652885
if !item.SendContext(ctx, ch) {
2886+
mutex.Unlock()
27662887
return
27672888
}
27682889
iCount++
27692890
if iCount == count {
27702891
close(ch)
2771-
ch = option.buildChannel()
27722892
iCount = 0
2893+
ch = option.buildChannel()
27732894
if !Of(FromChannel(ch)).SendContext(ctx, next) {
2895+
mutex.Unlock()
2896+
close(done)
27742897
return
27752898
}
27762899
}
2777-
case <-time.After(timespan.duration()):
2778-
if iCount == 0 {
2779-
continue
2780-
}
2781-
close(ch)
2782-
ch = option.buildChannel()
2783-
iCount = 0
2784-
if !Of(FromChannel(ch)).SendContext(ctx, next) {
2785-
return
2786-
}
2900+
mutex.Unlock()
27872901
}
27882902
}
27892903
}

0 commit comments

Comments
 (0)