Skip to content

Remove set_external_summary #592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions timely/src/dataflow/operators/core/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
let (tee, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(tee));

handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1)));

let shared_frontier = Rc::downgrade(&handle.frontier);
let mut started = false;

Expand All @@ -111,6 +113,10 @@ impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
if !started {
// discard initial capability.
progress.internals[0].update(G::Timestamp::minimum(), -1);
if let Some(shared_frontier) = shared_frontier.upgrade() {
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(std::iter::once((Timestamp::minimum(), -1)));
}
started = true;
}

Expand Down
6 changes: 0 additions & 6 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,5 @@ where
(self.summary.clone(), self.shared_progress.clone())
}

// initialize self.frontier antichains as indicated by hosting scope.
fn set_external_summary(&mut self) {
// should we schedule the operator here, or just await the first invocation?
self.schedule();
}

fn notify_me(&self) -> bool { self.shape.notify }
}
8 changes: 0 additions & 8 deletions timely/src/progress/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ pub trait Operate<T: Timestamp> : Schedule {
/// any output, and no initial capabilities are held.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);

/// Signals that external frontiers have been set.
///
/// By default this method does nothing, and leaves all changes in the `frontiers` element
/// of the shared progress state. An operator should be able to consult `frontiers` at any
/// point and read out the current frontier information, or the changes from the last time
/// that `frontiers` was drained.
fn set_external_summary(&mut self) { }

/// Indicates of whether the operator requires `push_external_progress` information or not.
fn notify_me(&self) -> bool { true }
}
Expand Down
9 changes: 0 additions & 9 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,15 +581,6 @@ where
// Return summaries and shared progress information.
(internal_summary, self.shared_progress.clone())
}

fn set_external_summary(&mut self) {
self.accept_frontier();
self.propagate_pointstamps(); // ensure propagation of input frontiers.
self.children
.iter_mut()
.flat_map(|child| child.operator.as_mut())
.for_each(|op| op.set_external_summary());
}
}

struct PerOperatorState<T: Timestamp> {
Expand Down
1 change: 0 additions & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,6 @@ impl<A: Allocate> Worker<A> {
}

operator.get_internal_summary();
operator.set_external_summary();

let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
Expand Down