diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index b6035ae13..0abac98ed 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -94,6 +94,8 @@ impl Probe for StreamCore { let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); + handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1))); + let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; @@ -111,6 +113,10 @@ impl Probe for StreamCore { if !started { // discard initial capability. progress.internals[0].update(G::Timestamp::minimum(), -1); + if let Some(shared_frontier) = shared_frontier.upgrade() { + let mut borrow = shared_frontier.borrow_mut(); + borrow.update_iter(std::iter::once((Timestamp::minimum(), -1))); + } started = true; } diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8ba131612..59689b432 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -228,11 +228,5 @@ where (self.summary.clone(), self.shared_progress.clone()) } - // initialize self.frontier antichains as indicated by hosting scope. - fn set_external_summary(&mut self) { - // should we schedule the operator here, or just await the first invocation? - self.schedule(); - } - fn notify_me(&self) -> bool { self.shape.notify } } diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 28fd10ae8..c671533f5 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -46,14 +46,6 @@ pub trait Operate : Schedule { /// any output, and no initial capabilities are held. fn get_internal_summary(&mut self) -> (Vec>>, Rc>>); - /// Signals that external frontiers have been set. - /// - /// By default this method does nothing, and leaves all changes in the `frontiers` element - /// of the shared progress state. An operator should be able to consult `frontiers` at any - /// point and read out the current frontier information, or the changes from the last time - /// that `frontiers` was drained. - fn set_external_summary(&mut self) { } - /// Indicates of whether the operator requires `push_external_progress` information or not. fn notify_me(&self) -> bool { true } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index dbe735655..8f0966034 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -581,15 +581,6 @@ where // Return summaries and shared progress information. (internal_summary, self.shared_progress.clone()) } - - fn set_external_summary(&mut self) { - self.accept_frontier(); - self.propagate_pointstamps(); // ensure propagation of input frontiers. - self.children - .iter_mut() - .flat_map(|child| child.operator.as_mut()) - .for_each(|op| op.set_external_summary()); - } } struct PerOperatorState { diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 4a5c0145b..523fc615f 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -652,7 +652,6 @@ impl Worker { } operator.get_internal_summary(); - operator.set_external_summary(); let mut temp_channel_ids = self.temp_channel_ids.borrow_mut(); let channel_ids = temp_channel_ids.drain(..).collect::>();