From 0aec3d5f52d5eba96e927cc0eacacd7d025ce80d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 15 Oct 2024 14:54:26 +0200 Subject: [PATCH] Remove set_external_summary Remove the `set_external_summary` function because it is not strictly required and forces the operator to be scheduled during dataflow construction. This surfaced an inconsistency in probe which would report incorrect data until scheduled the first time. The fix is to initialize it correctly and retract the initial information when scheduled the first time. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/core/probe.rs | 6 ++++++ timely/src/dataflow/operators/generic/builder_raw.rs | 6 ------ timely/src/progress/operate.rs | 8 -------- timely/src/progress/subgraph.rs | 9 --------- timely/src/worker.rs | 1 - 5 files changed, 6 insertions(+), 24 deletions(-) diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index b6035ae13b..0abac98ed6 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -94,6 +94,8 @@ impl Probe for StreamCore { let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); + handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1))); + let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; @@ -111,6 +113,10 @@ impl Probe for StreamCore { if !started { // discard initial capability. progress.internals[0].update(G::Timestamp::minimum(), -1); + if let Some(shared_frontier) = shared_frontier.upgrade() { + let mut borrow = shared_frontier.borrow_mut(); + borrow.update_iter(std::iter::once((Timestamp::minimum(), -1))); + } started = true; } diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8ba131612a..59689b432f 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -228,11 +228,5 @@ where (self.summary.clone(), self.shared_progress.clone()) } - // initialize self.frontier antichains as indicated by hosting scope. - fn set_external_summary(&mut self) { - // should we schedule the operator here, or just await the first invocation? - self.schedule(); - } - fn notify_me(&self) -> bool { self.shape.notify } } diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 28fd10ae8c..c671533f58 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -46,14 +46,6 @@ pub trait Operate : Schedule { /// any output, and no initial capabilities are held. fn get_internal_summary(&mut self) -> (Vec>>, Rc>>); - /// Signals that external frontiers have been set. - /// - /// By default this method does nothing, and leaves all changes in the `frontiers` element - /// of the shared progress state. An operator should be able to consult `frontiers` at any - /// point and read out the current frontier information, or the changes from the last time - /// that `frontiers` was drained. - fn set_external_summary(&mut self) { } - /// Indicates of whether the operator requires `push_external_progress` information or not. fn notify_me(&self) -> bool { true } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index dbe7356553..8f09660343 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -581,15 +581,6 @@ where // Return summaries and shared progress information. (internal_summary, self.shared_progress.clone()) } - - fn set_external_summary(&mut self) { - self.accept_frontier(); - self.propagate_pointstamps(); // ensure propagation of input frontiers. - self.children - .iter_mut() - .flat_map(|child| child.operator.as_mut()) - .for_each(|op| op.set_external_summary()); - } } struct PerOperatorState { diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 4a5c0145b2..523fc615fe 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -652,7 +652,6 @@ impl Worker { } operator.get_internal_summary(); - operator.set_external_summary(); let mut temp_channel_ids = self.temp_channel_ids.borrow_mut(); let channel_ids = temp_channel_ids.drain(..).collect::>();