1use std::collections::BTreeMap;
2use std::io::{stdout, BufReader, ErrorKind, Read, Write};
3
4use anyhow::{Context, Result};
5use fs_err::File;
6use instant::Instant;
7
8use crate::{prettyprint_usize, PROGRESS_FREQUENCY_SECONDS};
9
10pub fn elapsed_seconds(since: Instant) -> f64 {
11 let dt = since.elapsed();
12 (dt.as_secs() as f64) + (f64::from(dt.subsec_nanos()) * 1e-9)
13}
14
15#[derive(Debug)]
16struct Progress {
17 label: String,
18 processed_items: usize,
19 total_items: usize,
20 started_at: Instant,
21 last_printed_at: Instant,
22 first_update: bool,
23}
24
25impl Progress {
26 fn new(label: String, total_items: usize) -> Progress {
27 Progress {
28 label,
29 processed_items: 0,
30 total_items,
31 started_at: Instant::now(),
32 last_printed_at: Instant::now(),
33 first_update: true,
34 }
35 }
36
37 fn next<'a>(
39 &mut self,
40 maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>,
41 ) -> Option<(f64, String)> {
42 self.processed_items += 1;
43 if self.processed_items > self.total_items {
44 panic!(
45 "{} is too few items for {} progress",
46 prettyprint_usize(self.total_items),
47 self.label
48 );
49 }
50
51 if self.processed_items == self.total_items {
52 let elapsed = elapsed_seconds(self.started_at);
53 let line = format!(
54 "{} ({})... {}",
55 self.label,
56 prettyprint_usize(self.total_items),
57 prettyprint_time(elapsed)
58 );
59 if self.total_items == 1 {
60 temporary_println(maybe_sink, line.clone());
61 } else {
62 clear_current_line();
63 println!("{}", line);
64 if let Some(ref mut sink) = maybe_sink {
65 sink.reprintln(line.clone());
66 }
67 }
68 return Some((elapsed, line));
69 } else if elapsed_seconds(self.last_printed_at) >= PROGRESS_FREQUENCY_SECONDS {
70 self.last_printed_at = Instant::now();
71 let line = format!(
72 "{}: {}/{}... {}",
73 self.label,
74 prettyprint_usize(self.processed_items),
75 prettyprint_usize(self.total_items),
76 prettyprint_time(elapsed_seconds(self.started_at))
77 );
78 clear_current_line();
79 print!("{}", line);
80 stdout().flush().unwrap();
81
82 if let Some(ref mut sink) = maybe_sink {
83 if self.first_update {
84 sink.println(line);
85 self.first_update = false;
86 } else {
87 sink.reprintln(line);
88 }
89 }
90 }
91 None
92 }
93
94 fn cancel_iter_early(&mut self) -> f64 {
95 elapsed_seconds(self.started_at)
96 }
97}
98
99enum StackEntry {
100 TimerSpan(TimerSpan),
101 Progress(Progress),
102 File(TimedFileReader),
103}
104
105pub trait TimerSink {
106 fn println(&mut self, line: String);
107 fn reprintln(&mut self, line: String);
108}
109
110pub struct Timer<'a> {
112 results: Vec<String>,
113 stack: Vec<StackEntry>,
114
115 outermost_name: String,
116
117 sink: Option<Box<dyn TimerSink + 'a>>,
118}
119
120struct TimerSpan {
121 name: String,
122 started_at: Instant,
123 nested_results: Vec<String>,
124 nested_time: f64,
125}
126
127impl<'a> Timer<'a> {
128 pub fn new<S: Into<String>>(raw_name: S) -> Timer<'a> {
129 let name = raw_name.into();
130 let mut t = Timer {
131 results: Vec::new(),
132 stack: Vec::new(),
133 outermost_name: name.clone(),
134 sink: None,
135 };
136 t.start(name);
137 t
138 }
139
140 pub fn new_with_sink(name: &str, sink: Box<dyn TimerSink + 'a>) -> Timer<'a> {
141 let mut t = Timer::new(name);
142 t.sink = Some(sink);
143 t
144 }
145
146 pub fn throwaway() -> Timer<'a> {
148 Timer::new("throwaway")
149 }
150
151 fn temporary_println(&mut self, line: String) {
152 temporary_println(&mut self.sink, line);
153 }
154
155 pub fn done(self) {}
157
158 pub fn start<S: Into<String>>(&mut self, raw_name: S) {
159 if self.outermost_name == "throwaway" {
160 return;
161 }
162
163 let name = raw_name.into();
164 self.temporary_println(format!("{}...", name));
165 self.stack.push(StackEntry::TimerSpan(TimerSpan {
166 name,
167 started_at: Instant::now(),
168 nested_results: Vec::new(),
169 nested_time: 0.0,
170 }));
171 }
172
173 pub fn stop<S: Into<String>>(&mut self, raw_name: S) {
174 if self.outermost_name == "throwaway" {
175 return;
176 }
177 let name = raw_name.into();
178
179 let span = match self.stack.pop().unwrap() {
180 StackEntry::TimerSpan(s) => s,
181 StackEntry::Progress(p) => panic!("stop() during unfinished start_iter(): {:?}", p),
182 StackEntry::File(f) => panic!("stop() while reading file {}", f.path),
183 };
184 assert_eq!(span.name, name);
185 let elapsed = elapsed_seconds(span.started_at);
186 let line = format!("{} took {}", name, prettyprint_time(elapsed));
187
188 let padding = " ".repeat(self.stack.len());
189 match self.stack.last_mut() {
190 Some(StackEntry::TimerSpan(ref mut s)) => {
191 s.nested_results.push(format!("{}- {}", padding, line));
192 s.nested_results.extend(span.nested_results);
193 if span.nested_time != 0.0 {
194 temporary_println(
195 &mut self.sink,
196 format!(
197 "{}... plus {}",
198 name,
199 prettyprint_time(elapsed - span.nested_time)
200 ),
201 );
202 s.nested_results.push(format!(
203 " {}- ... plus {}",
204 padding,
205 prettyprint_time(elapsed - span.nested_time)
206 ));
207 }
208 s.nested_time += elapsed;
209 }
210 Some(_) => unreachable!(),
211 None => {
212 self.results.push(format!("{}- {}", padding, line));
213 self.results.extend(span.nested_results);
214 if span.nested_time != 0.0 {
215 self.temporary_println(format!(
216 "{}... plus {}",
217 name,
218 prettyprint_time(elapsed - span.nested_time)
219 ));
220 self.results.push(format!(
221 " - ... plus {}",
222 prettyprint_time(elapsed - span.nested_time)
223 ));
224 }
225 }
227 }
228
229 self.temporary_println(line);
230 }
231
232 pub fn start_iter<S: Into<String>>(&mut self, raw_name: S, total_items: usize) {
233 if self.outermost_name == "throwaway" {
234 return;
235 }
236 if total_items == 0 {
237 return;
238 }
239 let name = raw_name.into();
240 self.stack
243 .push(StackEntry::Progress(Progress::new(name, total_items)));
244 }
245
246 pub fn next(&mut self) {
247 if self.outermost_name == "throwaway" {
248 return;
249 }
250 let maybe_result =
251 if let Some(StackEntry::Progress(ref mut progress)) = self.stack.last_mut() {
252 progress.next(&mut self.sink)
253 } else {
254 panic!("Can't next() while a TimerSpan is top of the stack");
255 };
256 if let Some((elapsed, result)) = maybe_result {
257 self.stack.pop();
258 self.add_result(elapsed, result);
259 }
260 }
261
262 pub fn cancel_iter_early(&mut self) {
263 if self.outermost_name == "throwaway" {
264 return;
265 }
266 let elapsed = if let Some(StackEntry::Progress(ref mut progress)) = self.stack.last_mut() {
267 progress.cancel_iter_early()
268 } else {
269 panic!("Can't cancel_iter_early() while a TimerSpan is top of the stack");
270 };
271 self.stack.pop();
272 self.add_result(elapsed, "cancelled early".to_string());
273 }
274
275 pub fn add_result(&mut self, elapsed: f64, line: String) {
276 let padding = " ".repeat(self.stack.len());
277 match self.stack.last_mut() {
278 Some(StackEntry::TimerSpan(ref mut s)) => {
279 s.nested_results.push(format!("{}- {}", padding, line));
280 s.nested_time += elapsed;
281 }
282 Some(StackEntry::Progress(_)) => {}
283 Some(_) => unreachable!(),
284 None => {
285 self.results.push(format!("{}- {}", padding, line));
286 }
288 }
289 }
290
291 pub fn parallelize<I, O, F: Fn(I) -> O>(
294 &mut self,
295 timer_name: &str,
296 requests: Vec<I>,
297 cb: F,
298 ) -> Vec<O>
299 where
300 I: Send,
301 O: Send,
302 F: Send + Clone + Copy,
303 {
304 self.inner_parallelize(timer_name, requests, cb, num_cpus::get().max(1) as u32)
305 }
306
307 pub fn parallelize_polite<I, O, F: Fn(I) -> O>(
309 &mut self,
310 timer_name: &str,
311 requests: Vec<I>,
312 cb: F,
313 ) -> Vec<O>
314 where
315 I: Send,
316 O: Send,
317 F: Send + Clone + Copy,
318 {
319 self.inner_parallelize(
320 timer_name,
321 requests,
322 cb,
323 (num_cpus::get() - 1).max(1) as u32,
324 )
325 }
326
327 fn inner_parallelize<I, O, F: Fn(I) -> O>(
328 &mut self,
329 timer_name: &str,
330 requests: Vec<I>,
331 cb: F,
332 num_cpus: u32,
333 ) -> Vec<O>
334 where
335 I: Send,
336 O: Send,
337 F: Send + Clone + Copy,
338 {
339 #[cfg(target_arch = "wasm32")]
342 {
343 let _ = num_cpus;
345
346 let mut results: Vec<O> = Vec::new();
347 self.start_iter(timer_name, requests.len());
348 for req in requests {
349 self.next();
350 results.push(cb(req));
351 }
352 return results;
353 }
354
355 #[cfg(not(target_arch = "wasm32"))]
356 {
357 scoped_threadpool::Pool::new(num_cpus).scoped(|scope| {
358 let (tx, rx) = std::sync::mpsc::channel();
359 let mut results: Vec<Option<O>> = std::iter::repeat_with(|| None)
360 .take(requests.len())
361 .collect();
362 for (idx, req) in requests.into_iter().enumerate() {
363 let tx = tx.clone();
364 scope.execute(move || {
365 tx.send((idx, cb(req))).unwrap();
368 });
369 }
370 drop(tx);
371
372 self.start_iter(timer_name, results.len());
373 for (idx, result) in rx.iter() {
374 self.next();
375 results[idx] = Some(result);
376 }
377 results.into_iter().map(|x| x.unwrap()).collect()
378 })
379 }
380 }
381
382 pub fn retain_parallelized<K, V, F: Fn(&V) -> bool>(
384 &mut self,
385 timer_name: &str,
386 input: BTreeMap<K, V>,
387 keep: F,
388 ) -> BTreeMap<K, V>
389 where
390 K: Send + Ord,
391 V: Send,
392 F: Send + Sync + Clone + Copy,
393 {
394 self.parallelize(timer_name, input.into_iter().collect(), |(k, v)| {
395 if keep(&v) {
396 Some((k, v))
397 } else {
398 None
399 }
400 })
401 .into_iter()
402 .flatten()
403 .collect()
404 }
405
406 pub fn read_file(&mut self, path: &str) -> Result<()> {
408 self.stack
409 .push(StackEntry::File(TimedFileReader::new(path)?));
410 Ok(())
411 }
412}
413
414impl<'a> std::ops::Drop for Timer<'a> {
415 fn drop(&mut self) {
416 if self.outermost_name == "throwaway" {
417 return;
418 }
419
420 let stop_name = self.outermost_name.clone();
421
422 match self.stack.last() {
424 Some(StackEntry::TimerSpan(ref s)) => {
425 if s.name != stop_name {
426 error!("dropping Timer during {}, due to panic?", s.name);
427 return;
428 }
429 }
430 Some(StackEntry::File(ref r)) => {
431 error!("dropping Timer while reading {}, due to panic?", r.path);
432 return;
433 }
434 Some(StackEntry::Progress(ref p)) => {
435 error!(
436 "dropping Timer while doing progress {}, due to panic?",
437 p.label
438 );
439 return;
440 }
441 None => unreachable!(),
442 }
443
444 self.stop(&stop_name);
445 assert!(self.stack.is_empty());
446 for line in &self.results {
447 finalized_println(&mut self.sink, line.to_string());
448 }
449
450 if std::thread::panicking() {
451 error!("");
452 error!("");
453 error!("");
454 error!("");
455 error!("");
456 error!("!!! The program panicked, look above for the stack trace !!!");
457 }
458 }
459}
460
461pub fn prettyprint_time(seconds: f64) -> String {
462 format!("{:.4}s", seconds)
463}
464
465#[cfg(unix)]
466pub fn clear_current_line() {
467 if let Ok((terminal_width, _)) = termion::terminal_size() {
469 print!(
470 "{}{}",
471 termion::clear::CurrentLine,
472 termion::cursor::Left(terminal_width)
473 );
474 } else {
475 print!("\r");
476 }
477}
478
479#[cfg(not(unix))]
480pub fn clear_current_line() {
481 print!("\r");
482}
483
484struct TimedFileReader {
485 inner: BufReader<File>,
486
487 path: String,
488 processed_bytes: usize,
489 total_bytes: usize,
490 started_at: Instant,
491 last_printed_at: Option<Instant>,
492}
493
494impl TimedFileReader {
495 fn new(path: &str) -> Result<TimedFileReader> {
496 || -> Result<TimedFileReader> {
497 let file = File::open(path)?;
498 let total_bytes = file.metadata()?.len() as usize;
499 Ok(TimedFileReader {
500 inner: BufReader::new(file),
501 path: path.to_string(),
502 processed_bytes: 0,
503 total_bytes,
504 started_at: Instant::now(),
505 last_printed_at: None,
506 })
507 }()
508 .with_context(|| path.to_string())
509 }
510}
511
512impl<'a> Read for Timer<'a> {
513 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
514 let file = match self.stack.last_mut() {
515 Some(StackEntry::File(ref mut f)) => f,
516 _ => {
517 return Err(std::io::Error::new(
518 ErrorKind::Other,
519 "trying to read when Timer doesn't have file on the stack?!",
520 ));
521 }
522 };
523
524 let bytes = file.inner.read(buf)?;
525 file.processed_bytes += bytes;
526 if file.processed_bytes > file.total_bytes {
527 panic!(
528 "{} is too many bytes read from {}",
529 prettyprint_usize(file.processed_bytes),
530 file.path
531 );
532 }
533
534 if file.processed_bytes == file.total_bytes {
535 let elapsed = elapsed_seconds(file.started_at);
536 let line = format!(
537 "Read {} ({})... {}",
538 file.path,
539 prettyprint_usize(file.total_bytes / 1024 / 1024),
540 prettyprint_time(elapsed)
541 );
542 if self.outermost_name != "throwaway" {
543 if file.last_printed_at.is_none() {
544 self.temporary_println(line.clone());
545 } else {
546 clear_current_line();
547 println!("{}", line);
548 if let Some(ref mut sink) = self.sink {
549 sink.reprintln(line.clone());
550 }
551 }
552 }
553 self.stack.pop();
554 self.add_result(elapsed, line);
555 } else if file.last_printed_at.is_none()
556 || elapsed_seconds(file.last_printed_at.unwrap()) >= PROGRESS_FREQUENCY_SECONDS
557 {
558 if self.outermost_name != "throwaway" {
559 let line = format!(
560 "Reading {}: {}/{} MB... {}",
561 file.path,
562 prettyprint_usize(file.processed_bytes / 1024 / 1024),
563 prettyprint_usize(file.total_bytes / 1024 / 1024),
564 prettyprint_time(elapsed_seconds(file.started_at))
565 );
566 clear_current_line();
568 print!("{}", line);
569 stdout().flush().unwrap();
570
571 if let Some(ref mut sink) = self.sink {
572 if file.last_printed_at.is_none() {
573 sink.println(line);
574 } else {
575 sink.reprintln(line);
576 }
577 }
578 }
579
580 file.last_printed_at = Some(Instant::now());
581 }
582
583 Ok(bytes)
584 }
585}
586
587fn temporary_println<'a>(maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>, line: String) {
589 #[cfg(not(target_arch = "wasm32"))]
590 {
591 println!("{}", line);
592 }
593 #[cfg(target_arch = "wasm32")]
594 {
595 debug!("{}", line);
596 }
597 if let Some(ref mut sink) = maybe_sink {
598 sink.println(line);
599 }
600}
601
602fn finalized_println<'a>(maybe_sink: &mut Option<Box<dyn TimerSink + 'a>>, line: String) {
604 info!("{}", line);
605 if let Some(ref mut sink) = maybe_sink {
606 sink.println(line);
607 }
608}