@@ -198,10 +198,10 @@ where
198198 trace2. cursor_through ( acknowledged2. borrow ( ) ) . unwrap ( ) ;
199199 let batch1_cursor = batch1. cursor ( ) ;
200200 todo1. push_back ( Deferred :: new (
201- trace2_cursor,
202- trace2_storage,
203201 batch1_cursor,
204202 batch1. clone ( ) ,
203+ trace2_cursor,
204+ trace2_storage,
205205 capability. clone ( ) ,
206206 ) ) ;
207207 }
@@ -273,27 +273,13 @@ where
273273 // which results in unintentionally quadratic processing time (each batch of either
274274 // input must scan all batches from the other input).
275275
276- let mut work_result = |k : & Tr1 :: Key ,
277- v1 : & Tr1 :: Val ,
278- v2 : & Tr2 :: Val ,
279- t : & G :: Timestamp ,
280- r1 : & Tr1 :: R ,
281- r2 : & Tr2 :: R | {
282- let t = t. clone ( ) ;
283- let r = r1. clone ( ) . multiply ( r2) ;
284- result ( k, v1, v2)
285- . into_iter ( )
286- . map ( move |d| ( d, t. clone ( ) , r. clone ( ) ) )
287- } ;
288-
289276 // Perform some amount of outstanding work.
290277 let mut fuel = 1_000_000 ;
291278 while !todo1. is_empty ( ) && fuel > 0 {
292- todo1. front_mut ( ) . unwrap ( ) . work (
293- output,
294- |k, v2, v1, t, r2, r1| work_result ( k, v1, v2, t, r1, r2) ,
295- & mut fuel,
296- ) ;
279+ todo1
280+ . front_mut ( )
281+ . unwrap ( )
282+ . work ( output, & mut result, & mut fuel) ;
297283 if !todo1. front ( ) . unwrap ( ) . work_remains ( ) {
298284 todo1. pop_front ( ) ;
299285 }
@@ -305,7 +291,7 @@ where
305291 todo2
306292 . front_mut ( )
307293 . unwrap ( )
308- . work ( output, & mut work_result , & mut fuel) ;
294+ . work ( output, & mut result , & mut fuel) ;
309295 if !todo2. front ( ) . unwrap ( ) . work_remains ( ) {
310296 todo2. pop_front ( ) ;
311297 }
@@ -373,10 +359,10 @@ where
373359 C1 : Cursor < Key = Row , Val = Row , Time = T , R = Diff > ,
374360 C2 : Cursor < Key = Row , Val = Row , Time = T , R = Diff > ,
375361{
376- trace : C1 ,
377- trace_storage : C1 :: Storage ,
378- batch : C2 ,
379- batch_storage : C2 :: Storage ,
362+ cursor1 : C1 ,
363+ storage1 : C1 :: Storage ,
364+ cursor2 : C2 ,
365+ storage2 : C2 :: Storage ,
380366 capability : Capability < T > ,
381367 done : bool ,
382368 temp : Vec < ( D , T , Diff ) > ,
@@ -390,17 +376,17 @@ where
390376 D : Data ,
391377{
392378 fn new (
393- trace : C1 ,
394- trace_storage : C1 :: Storage ,
395- batch : C2 ,
396- batch_storage : C2 :: Storage ,
379+ cursor1 : C1 ,
380+ storage1 : C1 :: Storage ,
381+ cursor2 : C2 ,
382+ storage2 : C2 :: Storage ,
397383 capability : Capability < T > ,
398384 ) -> Self {
399385 Deferred {
400- trace ,
401- trace_storage ,
402- batch ,
403- batch_storage ,
386+ cursor1 ,
387+ storage1 ,
388+ cursor2 ,
389+ storage2 ,
404390 capability,
405391 done : false ,
406392 temp : Vec :: new ( ) ,
@@ -415,46 +401,50 @@ where
415401 fn work < L , I > (
416402 & mut self ,
417403 output : & mut OutputHandle < T , ( D , T , Diff ) , Tee < T , ( D , T , Diff ) > > ,
418- mut logic : L ,
404+ mut result : L ,
419405 fuel : & mut usize ,
420406 ) where
421- I : IntoIterator < Item = ( D , T , Diff ) > ,
422- L : FnMut ( & C1 :: Key , & C1 :: Val , & C2 :: Val , & T , & C1 :: R , & C2 :: R ) -> I ,
407+ I : IntoIterator < Item = D > ,
408+ L : FnMut ( & C1 :: Key , & C1 :: Val , & C2 :: Val ) -> I ,
423409 {
424410 let meet = self . capability . time ( ) ;
425411
426412 let mut session = output. session ( & self . capability ) ;
427413
428- let trace_storage = & self . trace_storage ;
429- let batch_storage = & self . batch_storage ;
414+ let storage1 = & self . storage1 ;
415+ let storage2 = & self . storage2 ;
430416
431- let trace = & mut self . trace ;
432- let batch = & mut self . batch ;
417+ let cursor1 = & mut self . cursor1 ;
418+ let cursor2 = & mut self . cursor2 ;
433419
434420 let temp = & mut self . temp ;
435421
436- while batch . key_valid ( batch_storage ) && trace . key_valid ( trace_storage ) {
437- match trace . key ( trace_storage ) . cmp ( batch . key ( batch_storage ) ) {
438- Ordering :: Less => trace . seek_key ( trace_storage , batch . key ( batch_storage ) ) ,
439- Ordering :: Greater => batch . seek_key ( batch_storage , trace . key ( trace_storage ) ) ,
422+ while cursor1 . key_valid ( storage1 ) && cursor2 . key_valid ( storage2 ) {
423+ match cursor1 . key ( storage1 ) . cmp ( cursor2 . key ( storage2 ) ) {
424+ Ordering :: Less => cursor1 . seek_key ( storage1 , cursor2 . key ( storage2 ) ) ,
425+ Ordering :: Greater => cursor2 . seek_key ( storage2 , cursor1 . key ( storage1 ) ) ,
440426 Ordering :: Equal => {
441427 assert_eq ! ( temp. len( ) , 0 ) ;
442428
443429 // Populate `temp` with the results, as long as fuel remains.
444- let key = batch . key ( batch_storage ) ;
445- while let Some ( val1) = trace . get_val ( trace_storage ) {
446- while let Some ( val2) = batch . get_val ( batch_storage ) {
447- trace . map_times ( trace_storage , |time1, diff1| {
430+ let key = cursor2 . key ( storage2 ) ;
431+ while let Some ( val1) = cursor1 . get_val ( storage1 ) {
432+ while let Some ( val2) = cursor2 . get_val ( storage2 ) {
433+ cursor1 . map_times ( storage1 , |time1, diff1| {
448434 let time1 = time1. join ( meet) ;
449- batch . map_times ( batch_storage , |time2, diff2| {
435+ cursor2 . map_times ( storage2 , |time2, diff2| {
450436 let time = time1. join ( time2) ;
451- temp. extend ( logic ( key, val1, val2, & time, diff1, diff2) )
437+ let diff = diff1. multiply ( diff2) ;
438+ let results = result ( key, val1, val2)
439+ . into_iter ( )
440+ . map ( |d| ( d, time. clone ( ) , diff. clone ( ) ) ) ;
441+ temp. extend ( results) ;
452442 } ) ;
453443 } ) ;
454- batch . step_val ( batch_storage ) ;
444+ cursor2 . step_val ( storage2 ) ;
455445 }
456- batch . rewind_vals ( batch_storage ) ;
457- trace . step_val ( trace_storage ) ;
446+ cursor1 . step_val ( storage1 ) ;
447+ cursor2 . rewind_vals ( storage2 ) ;
458448
459449 // TODO: This consolidation is optional, and it may not be very
460450 // helpful. We might try harder to understand whether we
@@ -472,8 +462,8 @@ where
472462 }
473463 }
474464
475- batch . step_key ( batch_storage ) ;
476- trace . step_key ( trace_storage ) ;
465+ cursor1 . step_key ( storage1 ) ;
466+ cursor2 . step_key ( storage2 ) ;
477467 }
478468 }
479469 }
0 commit comments