Skip to content

Commit 2f8d051

Browse files
committed
fix tests
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent af8b4ed commit 2f8d051

File tree

6 files changed

+26
-44
lines changed

6 files changed

+26
-44
lines changed

src/storage/src/render/sinks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use tracing::warn;
3333
use crate::healthcheck::HealthStatusMessage;
3434
use crate::storage_state::StorageState;
3535

36-
/// _Renders_ complete _differential_ [`Collection`]s
36+
/// _Renders_ complete _differential_ Collections
3737
/// that represent the sink and its errors as requested
3838
/// by the original `CREATE SINK` statement.
3939
pub(crate) fn render_sink<G>(

src/storage/src/render/sources.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
4545
use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
4646
use crate::upsert::{UpsertKey, UpsertValue};
4747

48-
/// _Renders_ complete _differential_ [`Collection`]s
48+
/// _Renders_ complete _differential_ Collections
4949
/// that represent the final source and its errors
5050
/// as requested by the original `CREATE SOURCE` statement,
5151
/// encapsulated in the passed `SourceInstanceDesc`.
5252
///
53-
/// The first element in the returned tuple is the pair of [`Collection`]s,
53+
/// The first element in the returned tuple is the pair of Collections,
5454
/// the second is a type-erased token that will keep the source
5555
/// alive as long as it is not dropped.
5656
///

src/timely-util/src/builder_async.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,25 +211,34 @@ pub enum Event<T: Timestamp, C, D> {
211211
Progress(Antichain<T>),
212212
}
213213

214+
/// Shared part of an async output handle
214215
struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
216+
/// Handle to write to the output stream.
215217
output: Output<T, CB::Container>,
218+
/// Current capability held by this output handle.
216219
capability: Option<Capability<T>>,
220+
/// Container builder to accumulate data before sending at `capability`.
217221
builder: CB,
218222
}
219223

220224
impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
225+
/// Write all pending data to the output stream.
221226
fn flush(&mut self) {
222227
while let Some(container) = self.builder.finish() {
223228
self.output
224229
.give(self.capability.as_ref().expect("must exist"), container);
225230
}
226231
}
232+
233+
/// Cease this output handle, flushing all pending data and releasing its capability.
227234
fn cease(&mut self) {
228235
self.flush();
229236
let _ = self.output.activate();
230237
self.capability = None;
231238
}
232239

240+
/// Provides data at the time specified by the capability. Flushes automatically when the
241+
/// capability time changes.
233242
fn give<D>(&mut self, cap: &Capability<T>, data: D)
234243
where
235244
CB: PushInto<D>,
@@ -238,7 +247,7 @@ impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
238247
&& cap.time() != capability.time()
239248
{
240249
self.flush();
241-
self.capability = Some(cap.clone());
250+
self.capability = None;
242251
}
243252
if self.capability.is_none() {
244253
self.capability = Some(cap.clone());

src/timely-util/src/containers.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ pub mod stack;
1919

2020
pub(crate) use alloc::alloc_aligned_zeroed;
2121
pub use alloc::{enable_columnar_lgalloc, set_enable_columnar_lgalloc};
22-
pub use provided_builder::ProvidedBuilder;
2322

2423
mod alloc {
2524
use mz_ore::region::Region;
@@ -50,38 +49,3 @@ mod alloc {
5049
ENABLE_COLUMNAR_LGALLOC.set(enabled);
5150
}
5251
}
53-
54-
mod provided_builder {
55-
use timely::Container;
56-
use timely::container::ContainerBuilder;
57-
58-
/// A container builder that doesn't support pushing elements, and is only suitable for pushing
59-
/// whole containers at Timely sessions. See [`give_container`] for more information.
60-
///
61-
/// [`give_container`]: timely::dataflow::channels::pushers::buffer::Session::give_container
62-
pub struct ProvidedBuilder<C> {
63-
_marker: std::marker::PhantomData<C>,
64-
}
65-
66-
impl<C> Default for ProvidedBuilder<C> {
67-
fn default() -> Self {
68-
Self {
69-
_marker: std::marker::PhantomData,
70-
}
71-
}
72-
}
73-
74-
impl<C: Container + Clone + 'static> ContainerBuilder for ProvidedBuilder<C> {
75-
type Container = C;
76-
77-
#[inline(always)]
78-
fn extract(&mut self) -> Option<&mut Self::Container> {
79-
None
80-
}
81-
82-
#[inline(always)]
83-
fn finish(&mut self) -> Option<&mut Self::Container> {
84-
None
85-
}
86-
}
87-
}

src/timely-util/src/reclock.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -613,9 +613,12 @@ mod test {
613613
/// test provides a test logic closure which accepts four arguments:
614614
///
615615
/// * A reference to the worker that allows the test to step the computation
616-
/// * A `BindingHandle` that allows the test to manipulate the remap bindings
617-
/// * A `DataHandle` that allows the test to submit the data to be reclocked
618-
/// * A `ReclockedStream` that allows observing the result of the reclocking process
616+
/// * A [`BindingHandle`] that allows the test to manipulate the remap bindings
617+
/// * A [`DataHandle`] that allows the test to submit the data to be reclocked
618+
/// * A [`ReclockedStream`] that allows observing the result of the reclocking process
619+
///
620+
/// Note that the `DataHandle` contains a capability that should be dropped or downgraded before
621+
/// calling [`step`] to process data at the time.
619622
fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
620623
where
621624
FromTime: Timestamp + Refines<()>,
@@ -674,6 +677,7 @@ mod test {
674677
data.activate()
675678
.session(&data_cap)
676679
.give(('a', Partitioned::minimum(), Diff::ONE));
680+
drop(data_cap);
677681
step(worker);
678682
let extracted = reclocked.extract();
679683
let expected = vec![(0, vec![('a', 0, Diff::ONE)])];
@@ -966,6 +970,7 @@ mod test {
966970
Partitioned::new_singleton(0, 50),
967971
Diff::ONE,
968972
));
973+
drop(data_cap);
969974
step(worker);
970975
assert_eq!(
971976
reclocked.try_recv(),
@@ -1024,6 +1029,7 @@ mod test {
10241029
data.activate()
10251030
.session(&data_cap)
10261031
.give_iterator(source_updates1.iter().cloned());
1032+
drop(data_cap);
10271033
step(worker);
10281034
reclocked.extract()
10291035
},
@@ -1054,6 +1060,7 @@ mod test {
10541060
data.activate()
10551061
.session(&data_cap)
10561062
.give_iterator(source_updates.iter().cloned());
1063+
drop(data_cap);
10571064
step(worker);
10581065
reclocked.extract()
10591066
},

test/sqllogictest/introspection/relations.slt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,10 @@ Differential␠Logging␠Demux Consolidate␠Differential(BatcherCapacity) all
186186
Differential␠Logging␠Demux Consolidate␠Differential(BatcherRecords) alloc::vec::Vec<((usize,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
187187
Differential␠Logging␠Demux Consolidate␠Differential(BatcherSize) alloc::vec::Vec<((usize,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
188188
Differential␠Logging␠Demux Consolidate␠Differential(Sharing) alloc::vec::Vec<((usize,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
189+
FlatMapReachability Consolidate␠Timely(Reachability) mz_timely_util::columnar::Column<(((bool,␠usize,␠usize,␠usize,␠mz_repr::timestamp::Timestamp),␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
189190
Replay␠compute␠logs Compute␠Logging␠Demux mz_timely_util::columnar::Column<(core::time::Duration,␠mz_compute::logging::compute::ComputeEvent)>
190191
Replay␠differential␠logs Differential␠Logging␠Demux alloc::vec::Vec<(core::time::Duration,␠differential_dataflow::logging::DifferentialEvent)>
191-
Replay␠reachability␠logs Consolidate␠Timely(Reachability) mz_timely_util::columnar::Column<(((bool,␠usize,␠usize,␠usize,␠mz_repr::timestamp::Timestamp),␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
192+
Replay␠reachability␠logs FlatMapReachability mz_timely_util::columnar::Column<(core::time::Duration,␠(usize,␠alloc::vec::Vec<(usize,␠usize,␠bool,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>))>
192193
Replay␠timely␠logs Timely␠Logging␠Demux alloc::vec::Vec<(core::time::Duration,␠timely::logging::TimelyEvent)>
193194
Timely␠Logging␠Demux Consolidate␠Timely(Addresses) alloc::vec::Vec<((usize,␠alloc::vec::Vec<usize>),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
194195
Timely␠Logging␠Demux Consolidate␠Timely(BatchesReceived) alloc::vec::Vec<((mz_compute::logging::timely::MessageDatum,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
@@ -241,6 +242,7 @@ GROUP BY type;
241242
1 alloc::vec::Vec<alloc::vec::Vec<differential_dataflow::containers::TimelyStack<((usize,␠alloc::vec::Vec<usize>),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>>>
242243
1 mz_timely_util::columnar::Column<(((bool,␠usize,␠usize,␠usize,␠mz_repr::timestamp::Timestamp),␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
243244
1 mz_timely_util::columnar::Column<((mz_compute::logging::timely::ChannelDatum,␠()),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>
245+
1 mz_timely_util::columnar::Column<(core::time::Duration,␠(usize,␠alloc::vec::Vec<(usize,␠usize,␠bool,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>))>
244246
1 mz_timely_util::columnar::Column<(core::time::Duration,␠mz_compute::logging::compute::ComputeEvent)>
245247
32 alloc::vec::Vec<alloc::rc::Rc<differential_dataflow::trace::implementations::ord_neu::val_batch::OrdValBatch<mz_compute::row_spine::spines::RowRowLayout<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>>>>
246248
32 mz_timely_util::columnar::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing<i64>)>

0 commit comments

Comments
 (0)