Skip to content

Commit 2b8ff0d

Browse files
committed
Move parking from communication into scheduler
1 parent e0d98d3 commit 2b8ff0d

File tree

8 files changed

+44
-67
lines changed

8 files changed

+44
-67
lines changed

communication/src/allocator/generic.rs

-8
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,6 @@ impl Allocate for Generic {
9393
fn receive(&mut self) { self.receive(); }
9494
fn release(&mut self) { self.release(); }
9595
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
96-
fn await_events(&self, _duration: Option<std::time::Duration>) {
97-
match self {
98-
Generic::Thread(t) => t.await_events(_duration),
99-
Generic::Process(p) => p.await_events(_duration),
100-
Generic::ProcessBinary(pb) => pb.await_events(_duration),
101-
Generic::ZeroCopy(z) => z.await_events(_duration),
102-
}
103-
}
10496
}
10597

10698

communication/src/allocator/mod.rs

-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65

76
pub use self::thread::Thread;
87
pub use self::process::Process;
@@ -51,14 +50,6 @@ pub trait Allocate {
5150
/// into a performance problem.
5251
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
5352

54-
/// Awaits communication events.
55-
///
56-
/// This method may park the current thread, for at most `duration`,
57-
/// until new events arrive.
58-
/// The method is not guaranteed to wait for any amount of time, but
59-
/// good implementations should use this as a hint to park the thread.
60-
fn await_events(&self, _duration: Option<Duration>) { }
61-
6253
/// Ensure that received messages are surfaced in each channel.
6354
///
6455
/// This method should be called to ensure that received messages are

communication/src/allocator/process.rs

-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::sync::{Arc, Mutex};
66
use std::any::Any;
7-
use std::time::Duration;
87
use std::collections::{HashMap};
98
use crossbeam_channel::{Sender, Receiver};
109

@@ -178,10 +177,6 @@ impl Allocate for Process {
178177
self.inner.events()
179178
}
180179

181-
fn await_events(&self, duration: Option<Duration>) {
182-
self.inner.await_events(duration);
183-
}
184-
185180
fn receive(&mut self) {
186181
let mut events = self.inner.events().borrow_mut();
187182
while let Ok(index) = self.counters_recv.try_recv() {

communication/src/allocator/thread.rs

-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65
use std::collections::VecDeque;
76

87
use crate::allocator::{Allocate, AllocateBuilder};
@@ -35,16 +34,6 @@ impl Allocate for Thread {
3534
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
3635
&self.events
3736
}
38-
fn await_events(&self, duration: Option<Duration>) {
39-
if self.events.borrow().is_empty() {
40-
if let Some(duration) = duration {
41-
std::thread::park_timeout(duration);
42-
}
43-
else {
44-
std::thread::park();
45-
}
46-
}
47-
}
4837
}
4938

5039
/// Thread-local counting channel push endpoint.

communication/src/allocator/zero_copy/allocator.rs

-3
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
271271
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
272272
self.inner.events()
273273
}
274-
fn await_events(&self, duration: Option<std::time::Duration>) {
275-
self.inner.await_events(duration);
276-
}
277274
}

communication/src/allocator/zero_copy/allocator_process.rs

-10
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator {
240240
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
241241
&self.events
242242
}
243-
fn await_events(&self, duration: Option<std::time::Duration>) {
244-
if self.events.borrow().is_empty() {
245-
if let Some(duration) = duration {
246-
std::thread::park_timeout(duration);
247-
}
248-
else {
249-
std::thread::park();
250-
}
251-
}
252-
}
253243
}

timely/src/scheduling/activate.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl Activations {
177177
/// This method should be used before putting a worker thread to sleep, as it
178178
/// indicates the amount of time before the thread should be unparked for the
179179
/// next scheduled activation.
180-
pub fn empty_for(&self) -> Option<Duration> {
180+
fn empty_for(&self) -> Option<Duration> {
181181
if !self.bounds.is_empty() || self.timer.is_none() {
182182
Some(Duration::new(0,0))
183183
}
@@ -189,6 +189,34 @@ impl Activations {
189189
})
190190
}
191191
}
192+
193+
/// Indicates that there is nothing to do for `timeout`, and that the scheduler
194+
/// can allow the thread to sleep until then.
195+
///
196+
/// The method does not *need* to park the thread, and indeed it may elect to
197+
/// unpark earlier if there are deferred activations.
198+
pub fn park_timeout(&self, timeout: Option<Duration>) {
199+
let empty_for = self.empty_for();
200+
let timeout = match (timeout, empty_for) {
201+
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
202+
(x, y) => x.or(y),
203+
};
204+
205+
if let Some(timeout) = timeout {
206+
std::thread::park_timeout(timeout);
207+
}
208+
else {
209+
std::thread::park();
210+
}
211+
}
212+
213+
/// True iff there are no immediate activations.
214+
///
215+
/// Used by others to guard work done in anticipation of potentially parking.
216+
/// An alternate method name could be `would_park`.
217+
pub fn is_idle(&self) -> bool {
218+
self.bounds.is_empty() || self.timer.is_none()
219+
}
192220
}
193221

194222
/// A thread-safe handle to an `Activations`.

timely/src/worker.rs

+15-20
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ impl<A: Allocate> Worker<A> {
333333
/// worker.step_or_park(Some(Duration::from_secs(1)));
334334
/// });
335335
/// ```
336-
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
336+
pub fn step_or_park(&mut self, timeout: Option<Duration>) -> bool {
337337

338338
{ // Process channel events. Activate responders.
339339
let mut allocator = self.allocator.borrow_mut();
@@ -362,28 +362,23 @@ impl<A: Allocate> Worker<A> {
362362
.borrow_mut()
363363
.advance();
364364

365-
// Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
366-
let empty_for = self.activations.borrow().empty_for();
367-
// Determine the minimum park duration, where `None` are an absence of a constraint.
368-
let delay = match (duration, empty_for) {
369-
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
370-
(x, y) => x.or(y),
371-
};
365+
if self.activations.borrow().is_idle() {
366+
// If the timeout is zero, don't bother trying to park.
367+
// More generally, we could put some threshold in here.
368+
if timeout != Some(Duration::new(0, 0)) {
369+
// Log parking and flush log.
370+
if let Some(l) = self.logging().as_mut() {
371+
l.log(crate::logging::ParkEvent::park(timeout));
372+
l.flush();
373+
}
372374

373-
if delay != Some(Duration::new(0,0)) {
375+
// We have just drained `allocator.events()` up above;
376+
// otherwise we should first check it for emptiness.
377+
self.activations.borrow().park_timeout(timeout);
374378

375-
// Log parking and flush log.
376-
if let Some(l) = self.logging().as_mut() {
377-
l.log(crate::logging::ParkEvent::park(delay));
378-
l.flush();
379+
// Log return from unpark.
380+
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
379381
}
380-
381-
self.allocator
382-
.borrow()
383-
.await_events(delay);
384-
385-
// Log return from unpark.
386-
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
387382
}
388383
else { // Schedule active dataflows.
389384

0 commit comments

Comments
 (0)