abstutil/
time.rs

1use std::collections::BTreeMap;
2use std::io::{stdout, BufReader, ErrorKind, Read, Write};
3
4use anyhow::{Context, Result};
5use fs_err::File;
6use instant::Instant;
7
8use crate::{prettyprint_usize, PROGRESS_FREQUENCY_SECONDS};
9
10pub fn elapsed_seconds(since: Instant) -> f64 {
11    let dt = since.elapsed();
12    (dt.as_secs() as f64) + (f64::from(dt.subsec_nanos()) * 1e-9)
13}
14
15#[derive(Debug)]
16struct Progress {
17    label: String,
18    processed_items: usize,
19    total_items: usize,
20    started_at: Instant,
21    last_printed_at: Instant,
22    first_update: bool,
23}
24
25impl Progress {
26    fn new(label: String, total_items: usize) -> Progress {
27        Progress {
28            label,
29            processed_items: 0,
30            total_items,
31            started_at: Instant::now(),
32            last_printed_at: Instant::now(),
33            first_update: true,
34        }
35    }
36
37    // Returns when done
38    fn next<'a>(
39        &mut self,
40        maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>,
41    ) -> Option<(f64, String)> {
42        self.processed_items += 1;
43        if self.processed_items > self.total_items {
44            panic!(
45                "{} is too few items for {} progress",
46                prettyprint_usize(self.total_items),
47                self.label
48            );
49        }
50
51        if self.processed_items == self.total_items {
52            let elapsed = elapsed_seconds(self.started_at);
53            let line = format!(
54                "{} ({})... {}",
55                self.label,
56                prettyprint_usize(self.total_items),
57                prettyprint_time(elapsed)
58            );
59            if self.total_items == 1 {
60                temporary_println(maybe_sink, line.clone());
61            } else {
62                clear_current_line();
63                println!("{}", line);
64                if let Some(ref mut sink) = maybe_sink {
65                    sink.reprintln(line.clone());
66                }
67            }
68            return Some((elapsed, line));
69        } else if elapsed_seconds(self.last_printed_at) >= PROGRESS_FREQUENCY_SECONDS {
70            self.last_printed_at = Instant::now();
71            let line = format!(
72                "{}: {}/{}... {}",
73                self.label,
74                prettyprint_usize(self.processed_items),
75                prettyprint_usize(self.total_items),
76                prettyprint_time(elapsed_seconds(self.started_at))
77            );
78            clear_current_line();
79            print!("{}", line);
80            stdout().flush().unwrap();
81
82            if let Some(ref mut sink) = maybe_sink {
83                if self.first_update {
84                    sink.println(line);
85                    self.first_update = false;
86                } else {
87                    sink.reprintln(line);
88                }
89            }
90        }
91        None
92    }
93
94    fn cancel_iter_early(&mut self) -> f64 {
95        elapsed_seconds(self.started_at)
96    }
97}
98
99enum StackEntry {
100    TimerSpan(TimerSpan),
101    Progress(Progress),
102    File(TimedFileReader),
103}
104
105pub trait TimerSink {
106    fn println(&mut self, line: String);
107    fn reprintln(&mut self, line: String);
108}
109
110/// Hierarchial magic
111pub struct Timer<'a> {
112    results: Vec<String>,
113    stack: Vec<StackEntry>,
114
115    outermost_name: String,
116
117    sink: Option<Box<dyn TimerSink + 'a>>,
118}
119
120struct TimerSpan {
121    name: String,
122    started_at: Instant,
123    nested_results: Vec<String>,
124    nested_time: f64,
125}
126
127impl<'a> Timer<'a> {
128    pub fn new<S: Into<String>>(raw_name: S) -> Timer<'a> {
129        let name = raw_name.into();
130        let mut t = Timer {
131            results: Vec::new(),
132            stack: Vec::new(),
133            outermost_name: name.clone(),
134            sink: None,
135        };
136        t.start(name);
137        t
138    }
139
140    pub fn new_with_sink(name: &str, sink: Box<dyn TimerSink + 'a>) -> Timer<'a> {
141        let mut t = Timer::new(name);
142        t.sink = Some(sink);
143        t
144    }
145
146    // TODO Shouldn't use this much.
147    pub fn throwaway() -> Timer<'a> {
148        Timer::new("throwaway")
149    }
150
151    fn temporary_println(&mut self, line: String) {
152        temporary_println(&mut self.sink, line);
153    }
154
155    /// Used to end the scope of a timer early.
156    pub fn done(self) {}
157
158    pub fn start<S: Into<String>>(&mut self, raw_name: S) {
159        if self.outermost_name == "throwaway" {
160            return;
161        }
162
163        let name = raw_name.into();
164        self.temporary_println(format!("{}...", name));
165        self.stack.push(StackEntry::TimerSpan(TimerSpan {
166            name,
167            started_at: Instant::now(),
168            nested_results: Vec::new(),
169            nested_time: 0.0,
170        }));
171    }
172
173    pub fn stop<S: Into<String>>(&mut self, raw_name: S) {
174        if self.outermost_name == "throwaway" {
175            return;
176        }
177        let name = raw_name.into();
178
179        let span = match self.stack.pop().unwrap() {
180            StackEntry::TimerSpan(s) => s,
181            StackEntry::Progress(p) => panic!("stop() during unfinished start_iter(): {:?}", p),
182            StackEntry::File(f) => panic!("stop() while reading file {}", f.path),
183        };
184        assert_eq!(span.name, name);
185        let elapsed = elapsed_seconds(span.started_at);
186        let line = format!("{} took {}", name, prettyprint_time(elapsed));
187
188        let padding = "  ".repeat(self.stack.len());
189        match self.stack.last_mut() {
190            Some(StackEntry::TimerSpan(ref mut s)) => {
191                s.nested_results.push(format!("{}- {}", padding, line));
192                s.nested_results.extend(span.nested_results);
193                if span.nested_time != 0.0 {
194                    temporary_println(
195                        &mut self.sink,
196                        format!(
197                            "{}... plus {}",
198                            name,
199                            prettyprint_time(elapsed - span.nested_time)
200                        ),
201                    );
202                    s.nested_results.push(format!(
203                        "  {}- ... plus {}",
204                        padding,
205                        prettyprint_time(elapsed - span.nested_time)
206                    ));
207                }
208                s.nested_time += elapsed;
209            }
210            Some(_) => unreachable!(),
211            None => {
212                self.results.push(format!("{}- {}", padding, line));
213                self.results.extend(span.nested_results);
214                if span.nested_time != 0.0 {
215                    self.temporary_println(format!(
216                        "{}... plus {}",
217                        name,
218                        prettyprint_time(elapsed - span.nested_time)
219                    ));
220                    self.results.push(format!(
221                        "  - ... plus {}",
222                        prettyprint_time(elapsed - span.nested_time)
223                    ));
224                }
225                // Don't bother tracking excess time that the Timer has existed but had no spans
226            }
227        }
228
229        self.temporary_println(line);
230    }
231
232    pub fn start_iter<S: Into<String>>(&mut self, raw_name: S, total_items: usize) {
233        if self.outermost_name == "throwaway" {
234            return;
235        }
236        if total_items == 0 {
237            return;
238        }
239        let name = raw_name.into();
240        // Note we may have two StackEntry::Progress entries nested
241
242        self.stack
243            .push(StackEntry::Progress(Progress::new(name, total_items)));
244    }
245
246    pub fn next(&mut self) {
247        if self.outermost_name == "throwaway" {
248            return;
249        }
250        let maybe_result =
251            if let Some(StackEntry::Progress(ref mut progress)) = self.stack.last_mut() {
252                progress.next(&mut self.sink)
253            } else {
254                panic!("Can't next() while a TimerSpan is top of the stack");
255            };
256        if let Some((elapsed, result)) = maybe_result {
257            self.stack.pop();
258            self.add_result(elapsed, result);
259        }
260    }
261
262    pub fn cancel_iter_early(&mut self) {
263        if self.outermost_name == "throwaway" {
264            return;
265        }
266        let elapsed = if let Some(StackEntry::Progress(ref mut progress)) = self.stack.last_mut() {
267            progress.cancel_iter_early()
268        } else {
269            panic!("Can't cancel_iter_early() while a TimerSpan is top of the stack");
270        };
271        self.stack.pop();
272        self.add_result(elapsed, "cancelled early".to_string());
273    }
274
275    pub fn add_result(&mut self, elapsed: f64, line: String) {
276        let padding = "  ".repeat(self.stack.len());
277        match self.stack.last_mut() {
278            Some(StackEntry::TimerSpan(ref mut s)) => {
279                s.nested_results.push(format!("{}- {}", padding, line));
280                s.nested_time += elapsed;
281            }
282            Some(StackEntry::Progress(_)) => {}
283            Some(_) => unreachable!(),
284            None => {
285                self.results.push(format!("{}- {}", padding, line));
286                // Don't bother tracking excess time that the Timer has existed but had no spans
287            }
288        }
289    }
290
291    /// Execute the callback over all requests, using all CPUs available. The order of the result
292    /// is deterministic and matches the input.
293    pub fn parallelize<I, O, F: Fn(I) -> O>(
294        &mut self,
295        timer_name: &str,
296        requests: Vec<I>,
297        cb: F,
298    ) -> Vec<O>
299    where
300        I: Send,
301        O: Send,
302        F: Send + Clone + Copy,
303    {
304        self.inner_parallelize(timer_name, requests, cb, num_cpus::get().max(1) as u32)
305    }
306
307    /// Like `parallelize`, but leave one CPU free, to avoid thrashing the user's system.
308    pub fn parallelize_polite<I, O, F: Fn(I) -> O>(
309        &mut self,
310        timer_name: &str,
311        requests: Vec<I>,
312        cb: F,
313    ) -> Vec<O>
314    where
315        I: Send,
316        O: Send,
317        F: Send + Clone + Copy,
318    {
319        self.inner_parallelize(
320            timer_name,
321            requests,
322            cb,
323            (num_cpus::get() - 1).max(1) as u32,
324        )
325    }
326
327    fn inner_parallelize<I, O, F: Fn(I) -> O>(
328        &mut self,
329        timer_name: &str,
330        requests: Vec<I>,
331        cb: F,
332        num_cpus: u32,
333    ) -> Vec<O>
334    where
335        I: Send,
336        O: Send,
337        F: Send + Clone + Copy,
338    {
339        // Here's the sequential equivalent, to conveniently compare times. Also gotta use this in
340        // wasm; no threads.
341        #[cfg(target_arch = "wasm32")]
342        {
343            // Silence a warning
344            let _ = num_cpus;
345
346            let mut results: Vec<O> = Vec::new();
347            self.start_iter(timer_name, requests.len());
348            for req in requests {
349                self.next();
350                results.push(cb(req));
351            }
352            return results;
353        }
354
355        #[cfg(not(target_arch = "wasm32"))]
356        {
357            scoped_threadpool::Pool::new(num_cpus).scoped(|scope| {
358                let (tx, rx) = std::sync::mpsc::channel();
359                let mut results: Vec<Option<O>> = std::iter::repeat_with(|| None)
360                    .take(requests.len())
361                    .collect();
362                for (idx, req) in requests.into_iter().enumerate() {
363                    let tx = tx.clone();
364                    scope.execute(move || {
365                        // TODO Can we catch panics here, dump a better stacktrace? widgetry runner
366                        // does this
367                        tx.send((idx, cb(req))).unwrap();
368                    });
369                }
370                drop(tx);
371
372                self.start_iter(timer_name, results.len());
373                for (idx, result) in rx.iter() {
374                    self.next();
375                    results[idx] = Some(result);
376                }
377                results.into_iter().map(|x| x.unwrap()).collect()
378            })
379        }
380    }
381
382    /// Like BTreeMap::retain, but parallelized
383    pub fn retain_parallelized<K, V, F: Fn(&V) -> bool>(
384        &mut self,
385        timer_name: &str,
386        input: BTreeMap<K, V>,
387        keep: F,
388    ) -> BTreeMap<K, V>
389    where
390        K: Send + Ord,
391        V: Send,
392        F: Send + Sync + Clone + Copy,
393    {
394        self.parallelize(timer_name, input.into_iter().collect(), |(k, v)| {
395            if keep(&v) {
396                Some((k, v))
397            } else {
398                None
399            }
400        })
401        .into_iter()
402        .flatten()
403        .collect()
404    }
405
406    /// Then the caller passes this in as a reader
407    pub fn read_file(&mut self, path: &str) -> Result<()> {
408        self.stack
409            .push(StackEntry::File(TimedFileReader::new(path)?));
410        Ok(())
411    }
412}
413
414impl<'a> std::ops::Drop for Timer<'a> {
415    fn drop(&mut self) {
416        if self.outermost_name == "throwaway" {
417            return;
418        }
419
420        let stop_name = self.outermost_name.clone();
421
422        // If we're in the middle of unwinding a panic, don't further blow up.
423        match self.stack.last() {
424            Some(StackEntry::TimerSpan(ref s)) => {
425                if s.name != stop_name {
426                    error!("dropping Timer during {}, due to panic?", s.name);
427                    return;
428                }
429            }
430            Some(StackEntry::File(ref r)) => {
431                error!("dropping Timer while reading {}, due to panic?", r.path);
432                return;
433            }
434            Some(StackEntry::Progress(ref p)) => {
435                error!(
436                    "dropping Timer while doing progress {}, due to panic?",
437                    p.label
438                );
439                return;
440            }
441            None => unreachable!(),
442        }
443
444        self.stop(&stop_name);
445        assert!(self.stack.is_empty());
446        for line in &self.results {
447            finalized_println(&mut self.sink, line.to_string());
448        }
449
450        if std::thread::panicking() {
451            error!("");
452            error!("");
453            error!("");
454            error!("");
455            error!("");
456            error!("!!! The program panicked, look above for the stack trace !!!");
457        }
458    }
459}
460
461pub fn prettyprint_time(seconds: f64) -> String {
462    format!("{:.4}s", seconds)
463}
464
465#[cfg(unix)]
466pub fn clear_current_line() {
467    // Fails in the test runner.
468    if let Ok((terminal_width, _)) = termion::terminal_size() {
469        print!(
470            "{}{}",
471            termion::clear::CurrentLine,
472            termion::cursor::Left(terminal_width)
473        );
474    } else {
475        print!("\r");
476    }
477}
478
479#[cfg(not(unix))]
480pub fn clear_current_line() {
481    print!("\r");
482}
483
484struct TimedFileReader {
485    inner: BufReader<File>,
486
487    path: String,
488    processed_bytes: usize,
489    total_bytes: usize,
490    started_at: Instant,
491    last_printed_at: Option<Instant>,
492}
493
494impl TimedFileReader {
495    fn new(path: &str) -> Result<TimedFileReader> {
496        || -> Result<TimedFileReader> {
497            let file = File::open(path)?;
498            let total_bytes = file.metadata()?.len() as usize;
499            Ok(TimedFileReader {
500                inner: BufReader::new(file),
501                path: path.to_string(),
502                processed_bytes: 0,
503                total_bytes,
504                started_at: Instant::now(),
505                last_printed_at: None,
506            })
507        }()
508        .with_context(|| path.to_string())
509    }
510}
511
512impl<'a> Read for Timer<'a> {
513    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
514        let file = match self.stack.last_mut() {
515            Some(StackEntry::File(ref mut f)) => f,
516            _ => {
517                return Err(std::io::Error::new(
518                    ErrorKind::Other,
519                    "trying to read when Timer doesn't have file on the stack?!",
520                ));
521            }
522        };
523
524        let bytes = file.inner.read(buf)?;
525        file.processed_bytes += bytes;
526        if file.processed_bytes > file.total_bytes {
527            panic!(
528                "{} is too many bytes read from {}",
529                prettyprint_usize(file.processed_bytes),
530                file.path
531            );
532        }
533
534        if file.processed_bytes == file.total_bytes {
535            let elapsed = elapsed_seconds(file.started_at);
536            let line = format!(
537                "Read {} ({})... {}",
538                file.path,
539                prettyprint_usize(file.total_bytes / 1024 / 1024),
540                prettyprint_time(elapsed)
541            );
542            if self.outermost_name != "throwaway" {
543                if file.last_printed_at.is_none() {
544                    self.temporary_println(line.clone());
545                } else {
546                    clear_current_line();
547                    println!("{}", line);
548                    if let Some(ref mut sink) = self.sink {
549                        sink.reprintln(line.clone());
550                    }
551                }
552            }
553            self.stack.pop();
554            self.add_result(elapsed, line);
555        } else if file.last_printed_at.is_none()
556            || elapsed_seconds(file.last_printed_at.unwrap()) >= PROGRESS_FREQUENCY_SECONDS
557        {
558            if self.outermost_name != "throwaway" {
559                let line = format!(
560                    "Reading {}: {}/{} MB... {}",
561                    file.path,
562                    prettyprint_usize(file.processed_bytes / 1024 / 1024),
563                    prettyprint_usize(file.total_bytes / 1024 / 1024),
564                    prettyprint_time(elapsed_seconds(file.started_at))
565                );
566                // TODO Refactor this pattern...
567                clear_current_line();
568                print!("{}", line);
569                stdout().flush().unwrap();
570
571                if let Some(ref mut sink) = self.sink {
572                    if file.last_printed_at.is_none() {
573                        sink.println(line);
574                    } else {
575                        sink.reprintln(line);
576                    }
577                }
578            }
579
580            file.last_printed_at = Some(Instant::now());
581        }
582
583        Ok(bytes)
584    }
585}
586
587// Print progress info while a Timer is still active. Invisible on web by default.
588fn temporary_println<'a>(maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>, line: String) {
589    #[cfg(not(target_arch = "wasm32"))]
590    {
591        println!("{}", line);
592    }
593    #[cfg(target_arch = "wasm32")]
594    {
595        debug!("{}", line);
596    }
597    if let Some(ref mut sink) = maybe_sink {
598        sink.println(line);
599    }
600}
601
602// Print info about a completed Timer. Always uses info logs, so works on native and web.
603fn finalized_println<'a>(maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>, line: String) {
604    info!("{}", line);
605    if let Some(ref mut sink) = maybe_sink {
606        sink.println(line);
607    }
608}