1use std::collections::hash_map::Entry;
2use std::collections::{BinaryHeap, HashMap};
3
4use serde::{Deserialize, Serialize};
5
6use abstutil::{Counter, PriorityQueueItem};
7use geom::{Duration, Histogram, Time};
8use map_model::{IntersectionID, TransitRouteID};
9
10use crate::{
11 pandemic, AgentID, CarID, CreateCar, CreatePedestrian, PedestrianID, StartTripArgs, TripID,
12};
13
14#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
15pub(crate) enum Command {
16 SpawnCar(CreateCar, bool),
18 SpawnPed(CreatePedestrian),
19 StartTrip(TripID, StartTripArgs),
20 UpdateCar(CarID),
21 UpdateLaggyHead(CarID),
23 UpdatePed(PedestrianID),
24 UpdateIntersection(IntersectionID),
25 Callback(Duration),
26 Pandemic(pandemic::Cmd),
27 StartBus(TransitRouteID, Time),
29}
30
31impl Command {
32 pub fn update_agent(id: AgentID) -> Command {
33 match id {
34 AgentID::Car(c) => Command::UpdateCar(c),
35 AgentID::Pedestrian(p) => Command::UpdatePed(p),
36 AgentID::BusPassenger(_, _) => unreachable!(),
37 }
38 }
39
40 fn to_type(&self) -> CommandType {
41 match self {
42 Command::SpawnCar(ref create, _) => CommandType::Car(create.vehicle.id),
43 Command::SpawnPed(ref create) => CommandType::Ped(create.id),
44 Command::StartTrip(id, _) => CommandType::StartTrip(*id),
45 Command::UpdateCar(id) => CommandType::Car(*id),
46 Command::UpdateLaggyHead(id) => CommandType::CarLaggyHead(*id),
47 Command::UpdatePed(id) => CommandType::Ped(*id),
48 Command::UpdateIntersection(id) => CommandType::Intersection(*id),
49 Command::Callback(_) => CommandType::Callback,
50 Command::Pandemic(ref p) => CommandType::Pandemic(p.clone()),
51 Command::StartBus(r, t) => CommandType::StartBus(*r, *t),
52 }
53 }
54
55 fn to_simple_type(&self) -> SimpleCommandType {
56 match self {
57 Command::SpawnCar(_, _) => SimpleCommandType::Car,
58 Command::SpawnPed(_) => SimpleCommandType::Ped,
59 Command::StartTrip(_, _) => SimpleCommandType::StartTrip,
60 Command::UpdateCar(_) => SimpleCommandType::Car,
61 Command::UpdateLaggyHead(_) => SimpleCommandType::CarLaggyHead,
62 Command::UpdatePed(_) => SimpleCommandType::Ped,
63 Command::UpdateIntersection(_) => SimpleCommandType::Intersection,
64 Command::Callback(_) => SimpleCommandType::Callback,
65 Command::Pandemic(_) => SimpleCommandType::Pandemic,
66 Command::StartBus(_, _) => SimpleCommandType::StartBus,
67 }
68 }
69}
70
71#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Debug)]
74enum CommandType {
75 StartTrip(TripID),
76 Car(CarID),
77 CarLaggyHead(CarID),
78 Ped(PedestrianID),
79 Intersection(IntersectionID),
80 Callback,
81 Pandemic(pandemic::Cmd),
82 StartBus(TransitRouteID, Time),
83}
84
85#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Debug)]
87enum SimpleCommandType {
88 StartTrip,
89 Car,
90 CarLaggyHead,
91 Ped,
92 Intersection,
93 Callback,
94 Pandemic,
95 StartBus,
96}
97
98#[derive(Clone, Serialize, Deserialize)]
102pub(crate) struct Scheduler {
103 items: BinaryHeap<PriorityQueueItem<Time, CommandType>>,
104 queued_commands: HashMap<CommandType, (Command, Time)>,
105
106 latest_time: Time,
107 last_time: Time,
108 #[serde(skip_serializing, skip_deserializing)]
109 delta_times: Histogram<Duration>,
110 #[serde(skip_serializing, skip_deserializing)]
111 cmd_type_counts: Counter<SimpleCommandType>,
112}
113
114impl Scheduler {
115 pub fn new() -> Scheduler {
116 Scheduler {
117 items: BinaryHeap::new(),
118 queued_commands: HashMap::new(),
119 latest_time: Time::START_OF_DAY,
120 last_time: Time::START_OF_DAY,
121 delta_times: Histogram::new(),
122 cmd_type_counts: Counter::new(),
123 }
124 }
125
126 pub fn push(&mut self, time: Time, cmd: Command) {
127 if time < self.latest_time {
128 panic!(
129 "It's at least {}, so can't schedule a command for {}",
130 self.latest_time, time
131 );
132 }
133 self.last_time = self.last_time.max(time);
134 self.delta_times.add(time - self.latest_time);
135 self.cmd_type_counts.inc(cmd.to_simple_type());
136
137 let cmd_type = cmd.to_type();
138
139 match self.queued_commands.entry(cmd_type.clone()) {
140 Entry::Vacant(vacant) => {
141 vacant.insert((cmd, time));
142 self.items.push(PriorityQueueItem {
143 cost: time,
144 value: cmd_type,
145 });
146 }
147 Entry::Occupied(occupied) => {
148 let (existing_cmd, existing_time) = occupied.get();
149 panic!(
150 "Can't push({}, {:?}) because ({}, {:?}) already queued",
151 time, cmd, existing_time, existing_cmd
152 );
153 }
154 }
155 }
156
157 pub fn update(&mut self, new_time: Time, cmd: Command) {
158 if new_time < self.latest_time {
159 panic!(
160 "It's at least {}, so can't schedule a command for {}",
161 self.latest_time, new_time
162 );
163 }
164 self.last_time = self.last_time.max(new_time);
165
166 let cmd_type = cmd.to_type();
167
168 if let Some((existing_cmd, _)) = self.queued_commands.get(&cmd_type) {
170 assert_eq!(cmd, *existing_cmd);
171 }
172 self.queued_commands
173 .insert(cmd_type.clone(), (cmd, new_time));
174 self.items.push(PriorityQueueItem {
175 cost: new_time,
176 value: cmd_type,
177 });
178 }
179
180 pub fn cancel(&mut self, cmd: Command) {
181 self.queued_commands.remove(&cmd.to_type());
183 }
184
185 pub fn peek_next_time(&self) -> Option<Time> {
188 self.items.peek().as_ref().map(|cmd| cmd.cost)
189 }
190
191 pub fn get_last_time(&self) -> Time {
192 self.last_time
193 }
194
195 pub fn get_next(&mut self) -> Option<Command> {
202 let item = self.items.pop().unwrap();
203 self.latest_time = item.cost;
204 match self.queued_commands.entry(item.value) {
205 Entry::Vacant(_) => {
206 None
208 }
209 Entry::Occupied(occupied) => {
210 if occupied.get().1 > item.cost {
212 return None;
213 }
214 Some(occupied.remove().0)
215 }
216 }
217 }
218
219 pub fn describe_stats(&self) -> Vec<String> {
220 let mut stats = vec![
221 format!("delta times for events: {}", self.delta_times.describe()),
222 "count for each command type:".to_string(),
223 ];
224 for (cmd, cnt) in self.cmd_type_counts.borrow() {
225 stats.push(format!("{:?}: {}", cmd, abstutil::prettyprint_usize(*cnt)));
226 }
227 stats
228 }
229}