@@ -201,23 +201,27 @@ pub trait AsWorker : Scheduler {
201
201
/// Allocates a new worker-unique identifier.
202
202
fn new_identifier ( & mut self ) -> usize ;
203
203
/// Provides access to named logging streams.
204
- fn log_register ( & self ) -> :: std:: cell:: RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > ;
204
+ fn log_register ( & self ) -> Option < :: std:: cell:: RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > > ;
205
205
/// Provides access to the timely logging stream.
206
- fn logging ( & self ) -> Option < crate :: logging:: TimelyLogger > { self . log_register ( ) . get ( "timely" ) }
206
+ fn logging ( & self ) -> Option < crate :: logging:: TimelyLogger > { self . log_register ( ) . and_then ( |l| l . get ( "timely" ) ) }
207
207
}
208
208
209
209
/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210
210
/// and has a list of dataflows that it manages.
211
211
pub struct Worker < A : Allocate > {
212
212
config : Config ,
213
- timer : Instant ,
213
+ /// An optional instant from which the start of the computation should be reckoned.
214
+ ///
215
+ /// If this is set to none, system time-based functionality will be unavailable or work badly.
216
+ /// For example, logging will be unavailable, and activation after a delay will be unavailable.
217
+ timer : Option < Instant > ,
214
218
paths : Rc < RefCell < HashMap < usize , Vec < usize > > > > ,
215
219
allocator : Rc < RefCell < A > > ,
216
220
identifiers : Rc < RefCell < usize > > ,
217
221
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
218
222
dataflows : Rc < RefCell < HashMap < usize , Wrapper > > > ,
219
223
dataflow_counter : Rc < RefCell < usize > > ,
220
- logging : Rc < RefCell < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > > ,
224
+ logging : Option < Rc < RefCell < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > > > ,
221
225
222
226
activations : Rc < RefCell < Activations > > ,
223
227
active_dataflows : Vec < usize > ,
@@ -247,7 +251,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
247
251
}
248
252
249
253
fn new_identifier ( & mut self ) -> usize { self . new_identifier ( ) }
250
- fn log_register ( & self ) -> RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > {
254
+ fn log_register ( & self ) -> Option < RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > > {
251
255
self . log_register ( )
252
256
}
253
257
}
@@ -260,8 +264,7 @@ impl<A: Allocate> Scheduler for Worker<A> {
260
264
261
265
impl < A : Allocate > Worker < A > {
262
266
/// Allocates a new `Worker` bound to a channel allocator.
263
- pub fn new ( config : Config , c : A ) -> Worker < A > {
264
- let now = Instant :: now ( ) ;
267
+ pub fn new ( config : Config , c : A , now : Option < std:: time:: Instant > ) -> Worker < A > {
265
268
let index = c. index ( ) ;
266
269
Worker {
267
270
config,
@@ -271,7 +274,7 @@ impl<A: Allocate> Worker<A> {
271
274
identifiers : Default :: default ( ) ,
272
275
dataflows : Default :: default ( ) ,
273
276
dataflow_counter : Default :: default ( ) ,
274
- logging : Rc :: new ( RefCell :: new ( crate :: logging_core:: Registry :: new ( now, index) ) ) ,
277
+ logging : now . map ( |now| Rc :: new ( RefCell :: new ( crate :: logging_core:: Registry :: new ( now, index) ) ) ) ,
275
278
activations : Rc :: new ( RefCell :: new ( Activations :: new ( now) ) ) ,
276
279
active_dataflows : Default :: default ( ) ,
277
280
temp_channel_ids : Default :: default ( ) ,
@@ -407,7 +410,7 @@ impl<A: Allocate> Worker<A> {
407
410
}
408
411
409
412
// Clean up, indicate if dataflows remain.
410
- self . logging . borrow_mut ( ) . flush ( ) ;
413
+ self . logging . as_ref ( ) . map ( |l| l . borrow_mut ( ) . flush ( ) ) ;
411
414
self . allocator . borrow_mut ( ) . release ( ) ;
412
415
!self . dataflows . borrow ( ) . is_empty ( )
413
416
}
@@ -478,7 +481,7 @@ impl<A: Allocate> Worker<A> {
478
481
///
479
482
/// let index = worker.index();
480
483
/// let peers = worker.peers();
481
- /// let timer = worker.timer();
484
+ /// let timer = worker.timer().unwrap() ;
482
485
///
483
486
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
484
487
///
@@ -493,7 +496,7 @@ impl<A: Allocate> Worker<A> {
493
496
///
494
497
/// let index = worker.index();
495
498
/// let peers = worker.peers();
496
- /// let timer = worker.timer();
499
+ /// let timer = worker.timer().unwrap() ;
497
500
///
498
501
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
499
502
///
@@ -509,13 +512,13 @@ impl<A: Allocate> Worker<A> {
509
512
///
510
513
/// let index = worker.index();
511
514
/// let peers = worker.peers();
512
- /// let timer = worker.timer();
515
+ /// let timer = worker.timer().unwrap() ;
513
516
///
514
517
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
515
518
///
516
519
/// });
517
520
/// ```
518
- pub fn timer ( & self ) -> Instant { self . timer }
521
+ pub fn timer ( & self ) -> Option < Instant > { self . timer }
519
522
520
523
/// Allocate a new worker-unique identifier.
521
524
///
@@ -534,13 +537,14 @@ impl<A: Allocate> Worker<A> {
534
537
/// timely::execute_from_args(::std::env::args(), |worker| {
535
538
///
536
539
/// worker.log_register()
540
+ /// .unwrap()
537
541
/// .insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
538
542
/// println!("{:?}\t{:?}", time, data)
539
543
/// );
540
544
/// });
541
545
/// ```
542
- pub fn log_register ( & self ) -> :: std:: cell:: RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > {
543
- self . logging . borrow_mut ( )
546
+ pub fn log_register ( & self ) -> Option < :: std:: cell:: RefMut < crate :: logging_core:: Registry < crate :: logging:: WorkerIdentifier > > > {
547
+ self . logging . as_ref ( ) . map ( |l| l . borrow_mut ( ) )
544
548
}
545
549
546
550
/// Construct a new dataflow.
@@ -563,7 +567,7 @@ impl<A: Allocate> Worker<A> {
563
567
T : Refines < ( ) > ,
564
568
F : FnOnce ( & mut Child < Self , T > ) ->R ,
565
569
{
566
- let logging = self . logging . borrow_mut ( ) . get ( "timely" ) ;
570
+ let logging = self . logging . as_ref ( ) . map ( |l| l . borrow_mut ( ) ) . and_then ( |l| l . get ( "timely" ) ) ;
567
571
self . dataflow_core ( "Dataflow" , logging, Box :: new ( ( ) ) , |_, child| func ( child) )
568
572
}
569
573
@@ -587,7 +591,7 @@ impl<A: Allocate> Worker<A> {
587
591
T : Refines < ( ) > ,
588
592
F : FnOnce ( & mut Child < Self , T > ) ->R ,
589
593
{
590
- let logging = self . logging . borrow_mut ( ) . get ( "timely" ) ;
594
+ let logging = self . logging . as_ref ( ) . map ( |l| l . borrow_mut ( ) ) . and_then ( |l| l . get ( "timely" ) ) ;
591
595
self . dataflow_core ( name, logging, Box :: new ( ( ) ) , |_, child| func ( child) )
592
596
}
593
597
@@ -626,7 +630,7 @@ impl<A: Allocate> Worker<A> {
626
630
let dataflow_index = self . allocate_dataflow_index ( ) ;
627
631
let identifier = self . new_identifier ( ) ;
628
632
629
- let progress_logging = self . logging . borrow_mut ( ) . get ( "timely/progress" ) ;
633
+ let progress_logging = self . logging . as_ref ( ) . map ( |l| l . borrow_mut ( ) ) . and_then ( |l| l . get ( "timely/progress" ) ) ;
630
634
let subscope = SubgraphBuilder :: new_from ( dataflow_index, addr, logging. clone ( ) , progress_logging. clone ( ) , name) ;
631
635
let subscope = RefCell :: new ( subscope) ;
632
636
0 commit comments