Skip to content

Commit ae70ad1

Browse files
committed
Pluggable subgraph builders
Demo to show that we could plug the subgraph builder to allow custom wrappers. An example shows how it could be used to compare wall-clock time to scheduling time. Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent 0d2b21f commit ae70ad1

File tree

8 files changed

+555
-97
lines changed

8 files changed

+555
-97
lines changed
+333
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
use std::cell::RefCell;
2+
use std::rc::Rc;
3+
use std::time::Duration;
4+
use timely::dataflow::operators::{Input, Map, Probe};
5+
use timely::logging::{TimelyLogger, TimelySummaryLogger};
6+
use timely::progress::{Antichain, ChangeBatch, Operate, Source, SubgraphBuilder, Target, Timestamp};
7+
use timely::progress::operate::SharedProgress;
8+
use timely::progress::subgraph::SubgraphBuilderT;
9+
use timely::progress::timestamp::Refines;
10+
use timely::scheduling::Schedule;
11+
use timely::worker::AsWorker;
12+
13+
struct ThreadStatSubgraphBuilder<SG> {
14+
inner: SG,
15+
}
16+
17+
impl<S: Schedule> Schedule for ThreadStatSubgraphBuilder<S> {
18+
fn name(&self) -> &str {
19+
self.inner.name()
20+
}
21+
22+
fn path(&self) -> &[usize] {
23+
self.inner.path()
24+
}
25+
26+
fn schedule(&mut self) -> bool {
27+
let start = std::time::Instant::now();
28+
let stats = stats::Stats::from_self();
29+
let done = self.inner.schedule();
30+
let elapsed = start.elapsed();
31+
if elapsed >= Duration::from_millis(10) {
32+
let stats_after = stats::Stats::from_self();
33+
if let (Ok(stats), Ok(stats_after)) = (stats, stats_after) {
34+
println!("schedule delta utime {}\tdelta stime {}\telapsed {elapsed:?}",
35+
stats_after.utime - stats.utime,
36+
stats_after.stime - stats.stime);
37+
}
38+
}
39+
done
40+
}
41+
}
42+
43+
impl<TOuter: Timestamp, OP: Operate<TOuter>> Operate<TOuter> for ThreadStatSubgraphBuilder<OP> {
44+
fn local(&self) -> bool {
45+
self.inner.local()
46+
}
47+
48+
fn inputs(&self) -> usize {
49+
self.inner.inputs()
50+
}
51+
52+
fn outputs(&self) -> usize {
53+
self.inner.outputs()
54+
}
55+
56+
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
57+
self.inner.get_internal_summary()
58+
}
59+
60+
fn set_external_summary(&mut self) {
61+
self.inner.set_external_summary();
62+
}
63+
64+
fn notify_me(&self) -> bool {
65+
self.inner.notify_me()
66+
}
67+
}
68+
69+
impl<TOuter, TInner, SG> SubgraphBuilderT<TOuter, TInner> for ThreadStatSubgraphBuilder<SG>
70+
where
71+
TOuter: Timestamp,
72+
TInner: Timestamp,
73+
SG: SubgraphBuilderT<TOuter, TInner>,
74+
{
75+
type Subgraph = ThreadStatSubgraphBuilder<SG::Subgraph>;
76+
77+
fn new_from(path: Rc<[usize]>, identifier: usize, logging: Option<TimelyLogger>, summary_logging: Option<TimelySummaryLogger<TInner::Summary>>, name: &str) -> Self {
78+
Self { inner: SG::new_from(path, identifier, logging, summary_logging, name)}
79+
}
80+
81+
fn build<A: AsWorker>(self, worker: &mut A) -> Self::Subgraph {
82+
ThreadStatSubgraphBuilder{ inner: self.inner.build(worker) }
83+
}
84+
85+
fn name(&self) -> &str {
86+
self.inner.name()
87+
}
88+
89+
fn path(&self) -> Rc<[usize]> {
90+
self.inner.path()
91+
}
92+
93+
fn connect(&mut self, source: Source, target: Target) {
94+
self.inner.connect(source, target)
95+
}
96+
97+
fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
98+
self.inner.add_child(child, index, identifier)
99+
}
100+
101+
fn allocate_child_id(&mut self) -> usize {
102+
self.inner.allocate_child_id()
103+
}
104+
105+
fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target
106+
where
107+
TInner: Refines<TOuter>
108+
{
109+
self.inner.new_input(shared_counts)
110+
}
111+
112+
fn new_output(&mut self) -> Source
113+
where
114+
TInner: Refines<TOuter>
115+
{
116+
self.inner.new_output()
117+
}
118+
}
119+
120+
pub mod stats {
121+
use std::str::FromStr;
122+
123+
/// based on https://elixir.bootlin.com/linux/v5.19.17/source/fs/proc/array.c#L567
124+
#[derive(Debug)]
125+
pub struct Stats {
126+
pub pid: usize,
127+
pub name: String,
128+
pub state: char,
129+
pub ppid: isize,
130+
pub pgid: isize,
131+
pub psid: isize,
132+
pub tty_nr: isize,
133+
pub tty_grp: isize,
134+
pub flags: usize,
135+
pub min_flt: usize,
136+
pub cmin_flt: usize,
137+
pub maj_flt: usize,
138+
pub cmaj_flt: usize,
139+
pub utime: usize,
140+
pub stime: usize,
141+
pub cutime: isize,
142+
pub cstime: isize,
143+
pub priority: isize,
144+
pub nice: isize,
145+
pub num_threads: isize,
146+
pub _zero0: usize,
147+
pub start_time: usize,
148+
pub vsize: usize,
149+
pub rss: usize,
150+
pub rsslim: usize,
151+
pub start_code: usize,
152+
pub end_code: usize,
153+
pub start_stack: usize,
154+
pub esp: usize,
155+
pub eip: usize,
156+
pub pending: usize,
157+
pub blocked: usize,
158+
pub sigign: usize,
159+
pub sigcatch: usize,
160+
pub wchan: usize,
161+
pub _zero1: usize,
162+
pub _zero2: usize,
163+
pub exit_signal: isize,
164+
pub task_cpu: isize,
165+
pub rt_priority: isize,
166+
pub policy: isize,
167+
pub blkio_ticks: usize,
168+
pub gtime: usize,
169+
pub cgtime: isize,
170+
pub start_data: usize,
171+
pub end_data: usize,
172+
pub start_brk: usize,
173+
pub arg_start: usize,
174+
pub arg_end: usize,
175+
pub env_start: usize,
176+
pub env_end: usize,
177+
pub exit_code: isize,
178+
}
179+
180+
pub enum Error {
181+
Underflow,
182+
ParseIntError(std::num::ParseIntError),
183+
IOError(std::io::Error),
184+
}
185+
186+
impl From<Option<&str>> for Error {
187+
fn from(_: Option<&str>) -> Self {
188+
Error::Underflow
189+
}
190+
}
191+
192+
impl From<std::num::ParseIntError> for Error {
193+
fn from(e: std::num::ParseIntError) -> Self {
194+
Error::ParseIntError(e)
195+
}
196+
}
197+
198+
impl From<std::io::Error> for Error {
199+
fn from(value: std::io::Error) -> Self {
200+
Error::IOError(value)
201+
}
202+
}
203+
204+
impl FromStr for Stats {
205+
type Err = Error;
206+
207+
fn from_str(s: &str) -> Result<Self, Self::Err> {
208+
let mut split = s.split_whitespace();
209+
Ok(Self {
210+
pid: split.next().ok_or(Error::Underflow)?.parse()?,
211+
name: split.next().ok_or(Error::Underflow)?.to_string(),
212+
state: split.next().ok_or(Error::Underflow)?.chars().next().ok_or(Error::Underflow)?,
213+
ppid: split.next().ok_or(Error::Underflow)?.parse()?,
214+
pgid: split.next().ok_or(Error::Underflow)?.parse()?,
215+
psid: split.next().ok_or(Error::Underflow)?.parse()?,
216+
tty_nr: split.next().ok_or(Error::Underflow)?.parse()?,
217+
tty_grp: split.next().ok_or(Error::Underflow)?.parse()?,
218+
flags: split.next().ok_or(Error::Underflow)?.parse()?,
219+
min_flt: split.next().ok_or(Error::Underflow)?.parse()?,
220+
cmin_flt: split.next().ok_or(Error::Underflow)?.parse()?,
221+
maj_flt: split.next().ok_or(Error::Underflow)?.parse()?,
222+
cmaj_flt: split.next().ok_or(Error::Underflow)?.parse()?,
223+
utime: split.next().ok_or(Error::Underflow)?.parse()?,
224+
stime: split.next().ok_or(Error::Underflow)?.parse()?,
225+
cutime: split.next().ok_or(Error::Underflow)?.parse()?,
226+
cstime: split.next().ok_or(Error::Underflow)?.parse()?,
227+
priority: split.next().ok_or(Error::Underflow)?.parse()?,
228+
nice: split.next().ok_or(Error::Underflow)?.parse()?,
229+
num_threads: split.next().ok_or(Error::Underflow)?.parse()?,
230+
_zero0: split.next().ok_or(Error::Underflow)?.parse()?,
231+
// constant 0,
232+
start_time: split.next().ok_or(Error::Underflow)?.parse()?,
233+
vsize: split.next().ok_or(Error::Underflow)?.parse()?,
234+
rss: split.next().ok_or(Error::Underflow)?.parse()?,
235+
rsslim: split.next().ok_or(Error::Underflow)?.parse()?,
236+
start_code: split.next().ok_or(Error::Underflow)?.parse()?,
237+
end_code: split.next().ok_or(Error::Underflow)?.parse()?,
238+
start_stack: split.next().ok_or(Error::Underflow)?.parse()?,
239+
esp: split.next().ok_or(Error::Underflow)?.parse()?,
240+
eip: split.next().ok_or(Error::Underflow)?.parse()?,
241+
pending: split.next().ok_or(Error::Underflow)?.parse()?,
242+
blocked: split.next().ok_or(Error::Underflow)?.parse()?,
243+
sigign: split.next().ok_or(Error::Underflow)?.parse()?,
244+
sigcatch: split.next().ok_or(Error::Underflow)?.parse()?,
245+
wchan: split.next().ok_or(Error::Underflow)?.parse()?,
246+
_zero1: split.next().ok_or(Error::Underflow)?.parse()?,
247+
// constant 0,
248+
_zero2: split.next().ok_or(Error::Underflow)?.parse()?,
249+
// constant 0,
250+
exit_signal: split.next().ok_or(Error::Underflow)?.parse()?,
251+
task_cpu: split.next().ok_or(Error::Underflow)?.parse()?,
252+
rt_priority: split.next().ok_or(Error::Underflow)?.parse()?,
253+
policy: split.next().ok_or(Error::Underflow)?.parse()?,
254+
blkio_ticks: split.next().ok_or(Error::Underflow)?.parse()?,
255+
gtime: split.next().ok_or(Error::Underflow)?.parse()?,
256+
cgtime: split.next().ok_or(Error::Underflow)?.parse()?,
257+
start_data: split.next().ok_or(Error::Underflow)?.parse()?,
258+
end_data: split.next().ok_or(Error::Underflow)?.parse()?,
259+
start_brk: split.next().ok_or(Error::Underflow)?.parse()?,
260+
arg_start: split.next().ok_or(Error::Underflow)?.parse()?,
261+
arg_end: split.next().ok_or(Error::Underflow)?.parse()?,
262+
env_start: split.next().ok_or(Error::Underflow)?.parse()?,
263+
env_end: split.next().ok_or(Error::Underflow)?.parse()?,
264+
exit_code: split.next().ok_or(Error::Underflow)?.parse()?,
265+
})
266+
}
267+
}
268+
269+
impl Stats {
270+
pub fn from_self() -> Result<Self, Error> {
271+
let mut buffer = String::new();
272+
use std::io::Read;
273+
std::fs::File::open("/proc/thread-self/stat")?.read_to_string(&mut buffer)?;
274+
buffer.parse()
275+
}
276+
}
277+
}
278+
279+
fn main() {
280+
// initializes and runs a timely dataflow.
281+
timely::execute_from_args(std::env::args(), |worker| {
282+
283+
let timer = std::time::Instant::now();
284+
285+
let mut args = std::env::args();
286+
args.next();
287+
288+
let dataflows = args.next().unwrap().parse::<usize>().unwrap();
289+
let length = args.next().unwrap().parse::<usize>().unwrap();
290+
let record = args.next() == Some("record".to_string());
291+
292+
let mut inputs = Vec::new();
293+
let mut probes = Vec::new();
294+
295+
// create a new input, exchange data, and inspect its output
296+
for _dataflow in 0 .. dataflows {
297+
let logging = worker.log_register().get("timely").map(Into::into);
298+
worker.dataflow_subgraph::<_, _, _, _, ThreadStatSubgraphBuilder<SubgraphBuilder<_, _>>>("Dataflow", logging, (), |(), scope| {
299+
let (input, mut stream) = scope.new_input();
300+
for _step in 0 .. length {
301+
stream = stream.map(|x: ()| {
302+
// Simluate CPU intensive task
303+
for i in 0..1_000_000 {
304+
std::hint::black_box(i);
305+
}
306+
// If we were to sleep here, `utime` would not increase.
307+
x
308+
});
309+
}
310+
let probe = stream.probe();
311+
inputs.push(input);
312+
probes.push(probe);
313+
});
314+
}
315+
316+
println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
317+
318+
for round in 0 .. 10_000 {
319+
let dataflow = round % dataflows;
320+
if record {
321+
inputs[dataflow].send(());
322+
}
323+
inputs[dataflow].advance_to(round);
324+
let mut steps = 0;
325+
while probes[dataflow].less_than(&round) {
326+
worker.step();
327+
steps += 1;
328+
}
329+
println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
330+
}
331+
332+
}).unwrap();
333+
}

timely/src/dataflow/operators/core/enterleave.rs

+24-5
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,16 @@ use crate::dataflow::channels::Message;
3333
use crate::worker::AsWorker;
3434
use crate::dataflow::{StreamCore, Scope};
3535
use crate::dataflow::scopes::Child;
36+
use crate::progress::subgraph::SubgraphBuilderT;
3637

3738
/// Extension trait to move a `Stream` into a child of its current `Scope`.
38-
pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
39+
pub trait Enter<G, T, C, SG>
40+
where
41+
G: Scope,
42+
T: Timestamp + Refines<G::Timestamp>,
43+
C: Container,
44+
SG: SubgraphBuilderT<G::Timestamp, T>,
45+
{
3946
/// Moves the `Stream` argument into a child of its current `Scope`.
4047
///
4148
/// # Examples
@@ -50,11 +57,17 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
5057
/// });
5158
/// });
5259
/// ```
53-
fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
60+
fn enter<'a>(&self, _: &Child<'a, G, T, SG>) -> StreamCore<Child<'a, G, T, SG>, C>;
5461
}
5562

56-
impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
57-
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
63+
impl<G, T, C, SG> Enter<G, T, C, SG> for StreamCore<G, C>
64+
where
65+
G: Scope,
66+
T: Timestamp + Refines<G::Timestamp>,
67+
C: Data + Container,
68+
SG: SubgraphBuilderT<G::Timestamp, T>,
69+
{
70+
fn enter<'a>(&self, scope: &Child<'a, G, T, SG>) -> StreamCore<Child<'a, G, T, SG>, C> {
5871

5972
use crate::scheduling::Scheduler;
6073

@@ -103,7 +116,13 @@ pub trait Leave<G: Scope, C: Container> {
103116
fn leave(&self) -> StreamCore<G, C>;
104117
}
105118

106-
impl<G: Scope, C: Container + Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
119+
impl<G, C, T, SG> Leave<G, C> for StreamCore<Child<'_, G, T, SG>, C>
120+
where
121+
G: Scope,
122+
C: Container + Data,
123+
T: Timestamp + Refines<G::Timestamp>,
124+
SG: SubgraphBuilderT<G::Timestamp, T>,
125+
{
107126
fn leave(&self) -> StreamCore<G, C> {
108127

109128
let scope = self.scope();

0 commit comments

Comments
 (0)