1use std::collections::BTreeMap;
2use std::io::{BufReader, Read};
3use std::process::Command;
4
5use anyhow::Result;
6use fs_err::File;
7use structopt::StructOpt;
8use walkdir::WalkDir;
9
10use abstio::{DataPacks, Entry, Manifest};
11use abstutil::{must_run_cmd, prettyprint_usize, Timer};
12
13const MD5_BUF_READ_SIZE: usize = 4096;
14
15#[derive(StructOpt)]
16#[structopt(
17 name = "updater",
18 about = "Download the latest version of per-city data"
19)]
20enum Task {
21 Upload,
24 IncrementalUpload {
27 #[structopt(long)]
29 version: String,
30 },
31 DryRun {
34 #[structopt(long)]
36 single_file: Option<String>,
37 },
38 OptIntoAll,
41 Download {
43 #[structopt(long)]
46 minimal: bool,
47 #[structopt(long)]
49 dont_delete: bool,
50 #[structopt(long)]
53 dl_from_local: bool,
54 #[structopt(long, default_value = "dev")]
57 version: String,
58 },
59}
60
61#[tokio::main]
62async fn main() {
63 if !std::path::Path::new("data").exists() {
64 panic!("Your current directory doesn't have the data/ directory. Run from the git root: cd ../; cargo run --bin updater -- args");
65 }
66
67 abstutil::logger::setup();
68 match Task::from_args() {
69 Task::Upload => {
70 upload("dev");
71 }
72 Task::IncrementalUpload { version } => {
73 assert_ne!(version, "dev");
76 incremental_upload(version);
77 }
78 Task::DryRun { single_file } => {
79 if let Some(path) = single_file {
80 let local = md5sum(&path);
81 let truth = Manifest::load()
82 .entries
83 .remove(&path)
84 .unwrap_or_else(|| panic!("{} not in data/MANIFEST.txt", path))
85 .checksum;
86 if local != truth {
87 println!("{} has changed", path);
88 }
89 } else {
90 just_compare();
91 }
92 }
93 Task::OptIntoAll => {
94 println!("{}", abstutil::to_json(&DataPacks::all_data_packs()));
95 }
96 Task::Download {
97 minimal,
98 dont_delete,
99 dl_from_local,
100 version,
101 } => {
102 download_updates(version, minimal, !dont_delete, dl_from_local).await;
103 }
104 }
105}
106
107async fn download_updates(version: String, minimal: bool, delete_local: bool, dl_from_local: bool) {
108 let data_packs = DataPacks::load_or_create();
109 let truth = Manifest::load().filter(data_packs);
110 let local = generate_manifest(&truth);
111
112 if delete_local {
114 for path in local.entries.keys() {
115 if !truth.entries.contains_key(path) {
116 rm(path);
117 }
118 }
119 }
120
121 let mut failed = Vec::new();
123 for (path, entry) in truth.entries {
124 if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
125 if minimal && !path.contains("montlake") && path != "data/system/us/seattle/city.bin" {
126 continue;
127 }
128
129 fs_err::create_dir_all(std::path::Path::new(&path).parent().unwrap()).unwrap();
130 match download_file(&version, &path, dl_from_local).await {
131 Ok(bytes) => {
132 println!(
133 "> decompress {}, which is {} bytes compressed",
134 path,
135 prettyprint_usize(bytes.len())
136 );
137 let mut decoder = flate2::read::GzDecoder::new(&bytes[..]);
138 let mut out = File::create(&path).unwrap();
139 if let Err(err) = std::io::copy(&mut decoder, &mut out) {
140 println!("{}, but continuing", err);
141 failed.push(format!("{} failed: {}", path, err));
142 }
143 }
144 Err(err) => {
145 println!("{}, but continuing", err);
146 failed.push(format!("{} failed: {}", path, err));
147 }
148 };
149 }
150 }
151 if !failed.is_empty() {
152 panic!("Failed to download stuff: {:?}", failed);
154 }
155
156 remove_empty_directories("data/input");
157 remove_empty_directories("data/system");
158}
159
160fn just_compare() {
161 let data_packs = DataPacks::load_or_create();
162 let truth = Manifest::load().filter(data_packs);
163 let local = generate_manifest(&truth);
164
165 for path in local.entries.keys() {
167 if !truth.entries.contains_key(path) {
168 println!("- Remove {}", path);
169 }
170 }
171
172 for (path, entry) in truth.entries {
174 if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
175 if let Some(old_bytes) = local.entries.get(&path).map(|x| x.uncompressed_size_bytes) {
176 if old_bytes == entry.uncompressed_size_bytes {
177 println!("- Update {}. Same size, md5sum changed", path);
178 } else {
179 println!(
180 "- Update {}. {} bytes difference",
181 path,
182 (old_bytes as isize) - (entry.uncompressed_size_bytes as isize)
183 );
184 }
185 } else {
186 println!("- Add {}", path);
187 }
188 }
189 }
190}
191
192fn upload(version: &str) {
193 let remote_base = format!("/home/dabreegster/s3_abst_data/{}", version);
194
195 let remote: Manifest = abstio::maybe_read_json(
196 format!("{}/MANIFEST.json", remote_base),
197 &mut Timer::throwaway(),
198 )
199 .unwrap_or(Manifest {
200 entries: BTreeMap::new(),
201 });
202 let mut local = generate_manifest(&remote);
203
204 for path in remote.entries.keys() {
206 if !local.entries.contains_key(path) {
207 rm(&format!("{}/{}.gz", remote_base, path));
208 }
209 }
210
211 let local_entries = std::mem::take(&mut local.entries);
213 for (path, entry) in Timer::new("compress files").parallelize(
214 "compress files",
215 local_entries.into_iter().collect(),
216 |(path, mut entry)| {
217 let remote_path = format!("{}/{}.gz", remote_base, path);
218 let changed = remote.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum);
219 if changed {
220 compress(&path, &remote_path);
221 }
222 entry.compressed_size_bytes = fs_err::metadata(&remote_path)
225 .unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
226 .len();
227 (path, entry)
228 },
229 ) {
230 local.entries.insert(path, entry);
231 }
232
233 abstio::write_json(format!("{}/MANIFEST.json", remote_base), &local);
234 abstio::write_json("data/MANIFEST.json".to_string(), &local);
235
236 must_run_cmd(
237 Command::new("aws")
238 .arg("s3")
239 .arg("sync")
240 .arg("--delete")
241 .arg(format!("{}/data", remote_base))
242 .arg(format!("s3://abstreet/{}/data", version)),
243 );
244 must_run_cmd(
247 Command::new("aws")
248 .arg("s3")
249 .arg("cp")
250 .arg(format!("{}/MANIFEST.json", remote_base))
251 .arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
252 );
253}
254
255fn incremental_upload(version: String) {
256 let remote_base = "tmp_incremental_upload";
257
258 let mut truth = Manifest::load();
260 let local = generate_manifest(&truth);
261
262 let mut changes = false;
264 for (path, entry) in Timer::new("compress files")
265 .parallelize(
266 "compress files",
267 local.entries.into_iter().collect(),
268 |(path, mut entry)| {
269 if truth.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
270 let remote_path = format!("{}/{}.gz", remote_base, path);
271 compress(&path, &remote_path);
272 entry.compressed_size_bytes = fs_err::metadata(&remote_path)
273 .unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
274 .len();
275 Some((path, entry))
276 } else {
277 None
278 }
279 },
280 )
281 .into_iter()
282 .flatten()
283 {
284 truth.entries.insert(path, entry);
285 changes = true;
286 }
287 if !changes {
288 return;
289 }
290
291 abstio::write_json("data/MANIFEST.json".to_string(), &truth);
293
294 must_run_cmd(
295 Command::new("aws")
296 .arg("s3")
297 .arg("sync")
298 .arg(format!("{}/data", remote_base))
299 .arg(format!("s3://abstreet/{}/data", version)),
300 );
301 must_run_cmd(
304 Command::new("aws")
305 .arg("s3")
306 .arg("cp")
307 .arg("data/MANIFEST.json")
308 .arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
309 );
310
311 must_run_cmd(Command::new("rm").arg("-rfv").arg(remote_base));
313}
314
315fn generate_manifest(truth: &Manifest) -> Manifest {
316 let mut paths = Vec::new();
317 for entry in WalkDir::new("data/input")
318 .into_iter()
319 .chain(WalkDir::new("data/system").into_iter())
320 .filter_map(|e| e.ok())
321 {
322 if entry.file_type().is_dir() {
323 continue;
324 }
325 let orig_path = entry.path().display().to_string();
326 let path = orig_path.replace("\\", "/");
327 if path.contains("system/assets/")
328 || path.contains("system/ltn_proposals")
329 || path.contains("system/proposals")
330 || path.contains("system/study_areas")
331 {
332 continue;
333 }
334 if path.ends_with(".osm") {
335 continue;
336 }
337 paths.push((orig_path, path));
338 }
339
340 let mut kv = BTreeMap::new();
341 for (path, entry) in
342 Timer::new("compute md5sums").parallelize("compute md5sums", paths, |(orig_path, path)| {
343 let metadata = fs_err::metadata(&orig_path).unwrap();
347 let uncompressed_size_bytes = metadata.len();
348 let recent_modtime = metadata.modified().unwrap().elapsed().unwrap()
349 < std::time::Duration::from_secs(60 * 60 * 12);
350
351 let checksum = if recent_modtime
352 || truth
353 .entries
354 .get(&path)
355 .map(|entry| entry.uncompressed_size_bytes != uncompressed_size_bytes)
356 .unwrap_or(true)
357 {
358 md5sum(&orig_path)
359 } else {
360 truth.entries[&path].checksum.clone()
361 };
362 (
363 path,
364 Entry {
365 checksum,
366 uncompressed_size_bytes,
367 compressed_size_bytes: 0,
369 },
370 )
371 })
372 {
373 kv.insert(path, entry);
374 }
375
376 Manifest { entries: kv }
377}
378
379fn md5sum(path: &str) -> String {
380 let mut file = File::open(path).unwrap();
382 let mut buffer = [0_u8; MD5_BUF_READ_SIZE];
383 let mut context = md5::Context::new();
384 while let Ok(n) = file.read(&mut buffer) {
385 if n == 0 {
386 break;
387 }
388 context.consume(&buffer[..n]);
389 }
390 format!("{:x}", context.compute())
391}
392
393fn rm(path: &str) {
394 println!("> rm {}", path);
395 match fs_err::remove_file(path) {
396 Ok(_) => {}
397 Err(e) => match e.kind() {
398 std::io::ErrorKind::NotFound => {
399 println!("file {} does not exist, continuing", &path);
400 }
401 other_error => {
402 panic!("problem removing file: {:?}", other_error);
403 }
404 },
405 }
406}
407
408async fn download_file(version: &str, path: &str, dl_from_local: bool) -> Result<Vec<u8>> {
409 if dl_from_local {
410 return abstio::slurp_file(format!(
411 "/home/dabreegster/s3_abst_data/{}/{}.gz",
412 version, path
413 ));
414 }
415
416 let url = format!("https://play.abstreet.org/{}/{}.gz", version, path);
417 println!("> download {}", url);
418 let (mut tx, rx) = futures_channel::mpsc::channel(1000);
419 abstio::print_download_progress(rx);
420 abstio::download_bytes(url, None, &mut tx).await
421}
422
423fn remove_empty_directories(root: &str) {
428 loop {
429 let mut all_paths = Vec::new();
431 let mut all_dirs = Vec::new();
432 for entry in WalkDir::new(root).into_iter().filter_map(|e| e.ok()) {
433 let path = entry.path().display().to_string();
434 all_paths.push(path.clone());
435 if entry.file_type().is_dir() {
436 all_dirs.push(path);
437 }
438 }
439
440 all_dirs.retain(|dir| !all_paths.iter().any(|p| p != dir && p.starts_with(dir)));
442
443 if all_dirs.is_empty() {
444 break;
445 } else {
446 for x in all_dirs {
448 println!("> Removing empty directory {}", x);
449 fs_err::remove_dir(&x).unwrap();
452 }
453 }
454 }
455}
456
457fn compress(path: &str, remote_path: &str) {
458 assert!(!path.ends_with(".gz"));
459 assert!(remote_path.ends_with(".gz"));
460
461 fs_err::create_dir_all(std::path::Path::new(remote_path).parent().unwrap()).unwrap();
462 println!("> compressing {}", path);
463 let mut input = BufReader::new(File::open(path).unwrap());
464 let out = File::create(remote_path).unwrap();
465 let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::best());
466 std::io::copy(&mut input, &mut encoder).unwrap();
467 encoder.finish().unwrap();
468}