sim/
scheduler.rs

1use std::collections::hash_map::Entry;
2use std::collections::{BinaryHeap, HashMap};
3
4use serde::{Deserialize, Serialize};
5
6use abstutil::{Counter, PriorityQueueItem};
7use geom::{Duration, Histogram, Time};
8use map_model::{IntersectionID, TransitRouteID};
9
10use crate::{
11    pandemic, AgentID, CarID, CreateCar, CreatePedestrian, PedestrianID, StartTripArgs, TripID,
12};
13
14#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
15pub(crate) enum Command {
16    /// If true, retry when there's no room to spawn somewhere
17    SpawnCar(CreateCar, bool),
18    SpawnPed(CreatePedestrian),
19    StartTrip(TripID, StartTripArgs),
20    UpdateCar(CarID),
21    /// Distinguish this from UpdateCar to avoid confusing things
22    UpdateLaggyHead(CarID),
23    UpdatePed(PedestrianID),
24    UpdateIntersection(IntersectionID),
25    Callback(Duration),
26    Pandemic(pandemic::Cmd),
27    /// The Time is redundant, just used to dedupe commands
28    StartBus(TransitRouteID, Time),
29}
30
31impl Command {
32    pub fn update_agent(id: AgentID) -> Command {
33        match id {
34            AgentID::Car(c) => Command::UpdateCar(c),
35            AgentID::Pedestrian(p) => Command::UpdatePed(p),
36            AgentID::BusPassenger(_, _) => unreachable!(),
37        }
38    }
39
40    fn to_type(&self) -> CommandType {
41        match self {
42            Command::SpawnCar(ref create, _) => CommandType::Car(create.vehicle.id),
43            Command::SpawnPed(ref create) => CommandType::Ped(create.id),
44            Command::StartTrip(id, _) => CommandType::StartTrip(*id),
45            Command::UpdateCar(id) => CommandType::Car(*id),
46            Command::UpdateLaggyHead(id) => CommandType::CarLaggyHead(*id),
47            Command::UpdatePed(id) => CommandType::Ped(*id),
48            Command::UpdateIntersection(id) => CommandType::Intersection(*id),
49            Command::Callback(_) => CommandType::Callback,
50            Command::Pandemic(ref p) => CommandType::Pandemic(p.clone()),
51            Command::StartBus(r, t) => CommandType::StartBus(*r, *t),
52        }
53    }
54
55    fn to_simple_type(&self) -> SimpleCommandType {
56        match self {
57            Command::SpawnCar(_, _) => SimpleCommandType::Car,
58            Command::SpawnPed(_) => SimpleCommandType::Ped,
59            Command::StartTrip(_, _) => SimpleCommandType::StartTrip,
60            Command::UpdateCar(_) => SimpleCommandType::Car,
61            Command::UpdateLaggyHead(_) => SimpleCommandType::CarLaggyHead,
62            Command::UpdatePed(_) => SimpleCommandType::Ped,
63            Command::UpdateIntersection(_) => SimpleCommandType::Intersection,
64            Command::Callback(_) => SimpleCommandType::Callback,
65            Command::Pandemic(_) => SimpleCommandType::Pandemic,
66            Command::StartBus(_, _) => SimpleCommandType::StartBus,
67        }
68    }
69}
70
71/// A smaller version of Command that satisfies many more properties. Only one Command per
72/// CommandType may exist at a time.
73#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Debug)]
74enum CommandType {
75    StartTrip(TripID),
76    Car(CarID),
77    CarLaggyHead(CarID),
78    Ped(PedestrianID),
79    Intersection(IntersectionID),
80    Callback,
81    Pandemic(pandemic::Cmd),
82    StartBus(TransitRouteID, Time),
83}
84
85/// A more compressed form of CommandType, just used for keeping stats on event processing.
86#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Debug)]
87enum SimpleCommandType {
88    StartTrip,
89    Car,
90    CarLaggyHead,
91    Ped,
92    Intersection,
93    Callback,
94    Pandemic,
95    StartBus,
96}
97
98/// The priority queue driving the discrete event simulation. Different pieces of the simulation
99/// schedule Commands to happen at a specific time, and the Scheduler hands out the commands in
100/// order.
101#[derive(Clone, Serialize, Deserialize)]
102pub(crate) struct Scheduler {
103    items: BinaryHeap<PriorityQueueItem<Time, CommandType>>,
104    queued_commands: HashMap<CommandType, (Command, Time)>,
105
106    latest_time: Time,
107    last_time: Time,
108    #[serde(skip_serializing, skip_deserializing)]
109    delta_times: Histogram<Duration>,
110    #[serde(skip_serializing, skip_deserializing)]
111    cmd_type_counts: Counter<SimpleCommandType>,
112}
113
114impl Scheduler {
115    pub fn new() -> Scheduler {
116        Scheduler {
117            items: BinaryHeap::new(),
118            queued_commands: HashMap::new(),
119            latest_time: Time::START_OF_DAY,
120            last_time: Time::START_OF_DAY,
121            delta_times: Histogram::new(),
122            cmd_type_counts: Counter::new(),
123        }
124    }
125
126    pub fn push(&mut self, time: Time, cmd: Command) {
127        if time < self.latest_time {
128            panic!(
129                "It's at least {}, so can't schedule a command for {}",
130                self.latest_time, time
131            );
132        }
133        self.last_time = self.last_time.max(time);
134        self.delta_times.add(time - self.latest_time);
135        self.cmd_type_counts.inc(cmd.to_simple_type());
136
137        let cmd_type = cmd.to_type();
138
139        match self.queued_commands.entry(cmd_type.clone()) {
140            Entry::Vacant(vacant) => {
141                vacant.insert((cmd, time));
142                self.items.push(PriorityQueueItem {
143                    cost: time,
144                    value: cmd_type,
145                });
146            }
147            Entry::Occupied(occupied) => {
148                let (existing_cmd, existing_time) = occupied.get();
149                panic!(
150                    "Can't push({}, {:?}) because ({}, {:?}) already queued",
151                    time, cmd, existing_time, existing_cmd
152                );
153            }
154        }
155    }
156
157    pub fn update(&mut self, new_time: Time, cmd: Command) {
158        if new_time < self.latest_time {
159            panic!(
160                "It's at least {}, so can't schedule a command for {}",
161                self.latest_time, new_time
162            );
163        }
164        self.last_time = self.last_time.max(new_time);
165
166        let cmd_type = cmd.to_type();
167
168        // It's fine if a previous command hasn't actually been scheduled.
169        if let Some((existing_cmd, _)) = self.queued_commands.get(&cmd_type) {
170            assert_eq!(cmd, *existing_cmd);
171        }
172        self.queued_commands
173            .insert(cmd_type.clone(), (cmd, new_time));
174        self.items.push(PriorityQueueItem {
175            cost: new_time,
176            value: cmd_type,
177        });
178    }
179
180    pub fn cancel(&mut self, cmd: Command) {
181        // It's fine if a previous command hasn't actually been scheduled.
182        self.queued_commands.remove(&cmd.to_type());
183    }
184
185    /// This next command might've actually been rescheduled to a later time; the caller won't know
186    /// that here.
187    pub fn peek_next_time(&self) -> Option<Time> {
188        self.items.peek().as_ref().map(|cmd| cmd.cost)
189    }
190
191    pub fn get_last_time(&self) -> Time {
192        self.last_time
193    }
194
195    /// This API is safer than handing out a batch of items at a time, because while processing one
196    /// item, we might change the priority of other items or add new items. Don't make the caller
197    /// reconcile those changes -- just keep pulling items from here, one at a time.
198    //
199    // TODO Above description is a little vague. This should be used with peek_next_time in a
200    // particular way...
201    pub fn get_next(&mut self) -> Option<Command> {
202        let item = self.items.pop().unwrap();
203        self.latest_time = item.cost;
204        match self.queued_commands.entry(item.value) {
205            Entry::Vacant(_) => {
206                // Command was cancelled
207                None
208            }
209            Entry::Occupied(occupied) => {
210                // Command was re-scheduled for later.
211                if occupied.get().1 > item.cost {
212                    return None;
213                }
214                Some(occupied.remove().0)
215            }
216        }
217    }
218
219    pub fn describe_stats(&self) -> Vec<String> {
220        let mut stats = vec![
221            format!("delta times for events: {}", self.delta_times.describe()),
222            "count for each command type:".to_string(),
223        ];
224        for (cmd, cnt) in self.cmd_type_counts.borrow() {
225            stats.push(format!("{:?}: {}", cmd, abstutil::prettyprint_usize(*cnt)));
226        }
227        stats
228    }
229}