updater/
main.rs

1use std::collections::BTreeMap;
2use std::io::{BufReader, Read};
3use std::process::Command;
4
5use anyhow::Result;
6use fs_err::File;
7use structopt::StructOpt;
8use walkdir::WalkDir;
9
10use abstio::{DataPacks, Entry, Manifest};
11use abstutil::{must_run_cmd, prettyprint_usize, Timer};
12
13const MD5_BUF_READ_SIZE: usize = 4096;
14
15#[derive(StructOpt)]
16#[structopt(
17    name = "updater",
18    about = "Download the latest version of per-city data"
19)]
20enum Task {
21    /// Synchronize the source-of-truth in S3 with data in the local directory. Based on current
22    /// permissions, only Dustin runs this.
23    Upload,
24    /// This uploads to S3 from cloud VMs that import maps. This never deletes files from S3, only
25    /// updates or creates ne ones.
26    IncrementalUpload {
27        /// Upload data to a temporary version managed by the cloud scripts.
28        #[structopt(long)]
29        version: String,
30    },
31    /// Just compare data in the current directory with the manifest, and describe any new,
32    /// deleted, or modified files.
33    DryRun {
34        /// Just check if one file has changed.
35        #[structopt(long)]
36        single_file: Option<String>,
37    },
38    /// Print the JSON list of all possible city data packs to download. You can write this output
39    /// to `data/player/data.json`, then download everything.
40    OptIntoAll,
41    /// Synchronize the local `data` directory with the source-of-truth in S3.
42    Download {
43        /// The Github Actions build uses this to include only a few files for the release to be
44        /// usable. People can use the UI to open another map and download more data.
45        #[structopt(long)]
46        minimal: bool,
47        /// Only update files from the manifest. Leave extra files alone.
48        #[structopt(long)]
49        dont_delete: bool,
50        /// Only useful for Dustin. "Download" from my local S3 source-of-truth, not from the
51        /// network.
52        #[structopt(long)]
53        dl_from_local: bool,
54        /// Download data tied to a named release. See
55        /// https://a-b-street.github.io/docs/tech/dev/data.html.
56        #[structopt(long, default_value = "dev")]
57        version: String,
58    },
59}
60
61#[tokio::main]
62async fn main() {
63    if !std::path::Path::new("data").exists() {
64        panic!("Your current directory doesn't have the data/ directory. Run from the git root: cd ../; cargo run --bin updater -- args");
65    }
66
67    abstutil::logger::setup();
68    match Task::from_args() {
69        Task::Upload => {
70            upload("dev");
71        }
72        Task::IncrementalUpload { version } => {
73            // We DON'T want to override the main data immediately from the batch Docker jobs. If
74            // running locally, can temporarily disable this assertion.
75            assert_ne!(version, "dev");
76            incremental_upload(version);
77        }
78        Task::DryRun { single_file } => {
79            if let Some(path) = single_file {
80                let local = md5sum(&path);
81                let truth = Manifest::load()
82                    .entries
83                    .remove(&path)
84                    .unwrap_or_else(|| panic!("{} not in data/MANIFEST.txt", path))
85                    .checksum;
86                if local != truth {
87                    println!("{} has changed", path);
88                }
89            } else {
90                just_compare();
91            }
92        }
93        Task::OptIntoAll => {
94            println!("{}", abstutil::to_json(&DataPacks::all_data_packs()));
95        }
96        Task::Download {
97            minimal,
98            dont_delete,
99            dl_from_local,
100            version,
101        } => {
102            download_updates(version, minimal, !dont_delete, dl_from_local).await;
103        }
104    }
105}
106
107async fn download_updates(version: String, minimal: bool, delete_local: bool, dl_from_local: bool) {
108    let data_packs = DataPacks::load_or_create();
109    let truth = Manifest::load().filter(data_packs);
110    let local = generate_manifest(&truth);
111
112    // Anything local need deleting?
113    if delete_local {
114        for path in local.entries.keys() {
115            if !truth.entries.contains_key(path) {
116                rm(path);
117            }
118        }
119    }
120
121    // Anything missing or needing updating?
122    let mut failed = Vec::new();
123    for (path, entry) in truth.entries {
124        if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
125            if minimal && !path.contains("montlake") && path != "data/system/us/seattle/city.bin" {
126                continue;
127            }
128
129            fs_err::create_dir_all(std::path::Path::new(&path).parent().unwrap()).unwrap();
130            match download_file(&version, &path, dl_from_local).await {
131                Ok(bytes) => {
132                    println!(
133                        "> decompress {}, which is {} bytes compressed",
134                        path,
135                        prettyprint_usize(bytes.len())
136                    );
137                    let mut decoder = flate2::read::GzDecoder::new(&bytes[..]);
138                    let mut out = File::create(&path).unwrap();
139                    if let Err(err) = std::io::copy(&mut decoder, &mut out) {
140                        println!("{}, but continuing", err);
141                        failed.push(format!("{} failed: {}", path, err));
142                    }
143                }
144                Err(err) => {
145                    println!("{}, but continuing", err);
146                    failed.push(format!("{} failed: {}", path, err));
147                }
148            };
149        }
150    }
151    if !failed.is_empty() {
152        // Fail the build.
153        panic!("Failed to download stuff: {:?}", failed);
154    }
155
156    remove_empty_directories("data/input");
157    remove_empty_directories("data/system");
158}
159
160fn just_compare() {
161    let data_packs = DataPacks::load_or_create();
162    let truth = Manifest::load().filter(data_packs);
163    let local = generate_manifest(&truth);
164
165    // Anything local need deleting?
166    for path in local.entries.keys() {
167        if !truth.entries.contains_key(path) {
168            println!("- Remove {}", path);
169        }
170    }
171
172    // Anything missing or needing updating?
173    for (path, entry) in truth.entries {
174        if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
175            if let Some(old_bytes) = local.entries.get(&path).map(|x| x.uncompressed_size_bytes) {
176                if old_bytes == entry.uncompressed_size_bytes {
177                    println!("- Update {}. Same size, md5sum changed", path);
178                } else {
179                    println!(
180                        "- Update {}. {} bytes difference",
181                        path,
182                        (old_bytes as isize) - (entry.uncompressed_size_bytes as isize)
183                    );
184                }
185            } else {
186                println!("- Add {}", path);
187            }
188        }
189    }
190}
191
192fn upload(version: &str) {
193    let remote_base = format!("/home/dabreegster/s3_abst_data/{}", version);
194
195    let remote: Manifest = abstio::maybe_read_json(
196        format!("{}/MANIFEST.json", remote_base),
197        &mut Timer::throwaway(),
198    )
199    .unwrap_or(Manifest {
200        entries: BTreeMap::new(),
201    });
202    let mut local = generate_manifest(&remote);
203
204    // Anything remote need deleting?
205    for path in remote.entries.keys() {
206        if !local.entries.contains_key(path) {
207            rm(&format!("{}/{}.gz", remote_base, path));
208        }
209    }
210
211    // Anything missing or needing updating?
212    let local_entries = std::mem::take(&mut local.entries);
213    for (path, entry) in Timer::new("compress files").parallelize(
214        "compress files",
215        local_entries.into_iter().collect(),
216        |(path, mut entry)| {
217            let remote_path = format!("{}/{}.gz", remote_base, path);
218            let changed = remote.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum);
219            if changed {
220                compress(&path, &remote_path);
221            }
222            // Always do this -- even if nothing changed, compressed_size_bytes isn't filled out by
223            // generate_manifest.
224            entry.compressed_size_bytes = fs_err::metadata(&remote_path)
225                .unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
226                .len();
227            (path, entry)
228        },
229    ) {
230        local.entries.insert(path, entry);
231    }
232
233    abstio::write_json(format!("{}/MANIFEST.json", remote_base), &local);
234    abstio::write_json("data/MANIFEST.json".to_string(), &local);
235
236    must_run_cmd(
237        Command::new("aws")
238            .arg("s3")
239            .arg("sync")
240            .arg("--delete")
241            .arg(format!("{}/data", remote_base))
242            .arg(format!("s3://abstreet/{}/data", version)),
243    );
244    // Because of the directory structure, do this one separately, without --delete. The wasm files
245    // also live in /dev/.
246    must_run_cmd(
247        Command::new("aws")
248            .arg("s3")
249            .arg("cp")
250            .arg(format!("{}/MANIFEST.json", remote_base))
251            .arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
252    );
253}
254
255fn incremental_upload(version: String) {
256    let remote_base = "tmp_incremental_upload";
257
258    // Assume the local copy of the manifest from git is the current source of truth.
259    let mut truth = Manifest::load();
260    let local = generate_manifest(&truth);
261
262    // Anything missing or needing updating?
263    let mut changes = false;
264    for (path, entry) in Timer::new("compress files")
265        .parallelize(
266            "compress files",
267            local.entries.into_iter().collect(),
268            |(path, mut entry)| {
269                if truth.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
270                    let remote_path = format!("{}/{}.gz", remote_base, path);
271                    compress(&path, &remote_path);
272                    entry.compressed_size_bytes = fs_err::metadata(&remote_path)
273                        .unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
274                        .len();
275                    Some((path, entry))
276                } else {
277                    None
278                }
279            },
280        )
281        .into_iter()
282        .flatten()
283    {
284        truth.entries.insert(path, entry);
285        changes = true;
286    }
287    if !changes {
288        return;
289    }
290
291    // TODO /home/dabreegster/s3_abst_data/{version}/MANIFEST.json will get out of sync...
292    abstio::write_json("data/MANIFEST.json".to_string(), &truth);
293
294    must_run_cmd(
295        Command::new("aws")
296            .arg("s3")
297            .arg("sync")
298            .arg(format!("{}/data", remote_base))
299            .arg(format!("s3://abstreet/{}/data", version)),
300    );
301    // Upload the new manifest file to S3.
302    // TODO This won't work from AWS Batch; the workers will stomp over each other.
303    must_run_cmd(
304        Command::new("aws")
305            .arg("s3")
306            .arg("cp")
307            .arg("data/MANIFEST.json")
308            .arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
309    );
310
311    // Nuke the temporary workspace
312    must_run_cmd(Command::new("rm").arg("-rfv").arg(remote_base));
313}
314
315fn generate_manifest(truth: &Manifest) -> Manifest {
316    let mut paths = Vec::new();
317    for entry in WalkDir::new("data/input")
318        .into_iter()
319        .chain(WalkDir::new("data/system").into_iter())
320        .filter_map(|e| e.ok())
321    {
322        if entry.file_type().is_dir() {
323            continue;
324        }
325        let orig_path = entry.path().display().to_string();
326        let path = orig_path.replace("\\", "/");
327        if path.contains("system/assets/")
328            || path.contains("system/ltn_proposals")
329            || path.contains("system/proposals")
330            || path.contains("system/study_areas")
331        {
332            continue;
333        }
334        if path.ends_with(".osm") {
335            continue;
336        }
337        paths.push((orig_path, path));
338    }
339
340    let mut kv = BTreeMap::new();
341    for (path, entry) in
342        Timer::new("compute md5sums").parallelize("compute md5sums", paths, |(orig_path, path)| {
343            // If the file's modtime is newer than 12 hours or the uncompressed size has changed,
344            // calculate md5sum. Otherwise assume no change. This heuristic saves lots of time and
345            // doesn't stress my poor SSD as much.
346            let metadata = fs_err::metadata(&orig_path).unwrap();
347            let uncompressed_size_bytes = metadata.len();
348            let recent_modtime = metadata.modified().unwrap().elapsed().unwrap()
349                < std::time::Duration::from_secs(60 * 60 * 12);
350
351            let checksum = if recent_modtime
352                || truth
353                    .entries
354                    .get(&path)
355                    .map(|entry| entry.uncompressed_size_bytes != uncompressed_size_bytes)
356                    .unwrap_or(true)
357            {
358                md5sum(&orig_path)
359            } else {
360                truth.entries[&path].checksum.clone()
361            };
362            (
363                path,
364                Entry {
365                    checksum,
366                    uncompressed_size_bytes,
367                    // Will calculate later
368                    compressed_size_bytes: 0,
369                },
370            )
371        })
372    {
373        kv.insert(path, entry);
374    }
375
376    Manifest { entries: kv }
377}
378
379fn md5sum(path: &str) -> String {
380    // since these files can be very large, computes the md5 hash in chunks
381    let mut file = File::open(path).unwrap();
382    let mut buffer = [0_u8; MD5_BUF_READ_SIZE];
383    let mut context = md5::Context::new();
384    while let Ok(n) = file.read(&mut buffer) {
385        if n == 0 {
386            break;
387        }
388        context.consume(&buffer[..n]);
389    }
390    format!("{:x}", context.compute())
391}
392
393fn rm(path: &str) {
394    println!("> rm {}", path);
395    match fs_err::remove_file(path) {
396        Ok(_) => {}
397        Err(e) => match e.kind() {
398            std::io::ErrorKind::NotFound => {
399                println!("file {} does not exist, continuing", &path);
400            }
401            other_error => {
402                panic!("problem removing file: {:?}", other_error);
403            }
404        },
405    }
406}
407
408async fn download_file(version: &str, path: &str, dl_from_local: bool) -> Result<Vec<u8>> {
409    if dl_from_local {
410        return abstio::slurp_file(format!(
411            "/home/dabreegster/s3_abst_data/{}/{}.gz",
412            version, path
413        ));
414    }
415
416    let url = format!("https://play.abstreet.org/{}/{}.gz", version, path);
417    println!("> download {}", url);
418    let (mut tx, rx) = futures_channel::mpsc::channel(1000);
419    abstio::print_download_progress(rx);
420    abstio::download_bytes(url, None, &mut tx).await
421}
422
423// download() will remove stray files, but leave empty directories around. Since some runtime code
424// discovers lists of countries, cities, etc from the filesystem, this can get confusing.
425//
426// I'm sure there's a simpler way to do this, but I haven't found it.
427fn remove_empty_directories(root: &str) {
428    loop {
429        // First just find all directories and files.
430        let mut all_paths = Vec::new();
431        let mut all_dirs = Vec::new();
432        for entry in WalkDir::new(root).into_iter().filter_map(|e| e.ok()) {
433            let path = entry.path().display().to_string();
434            all_paths.push(path.clone());
435            if entry.file_type().is_dir() {
436                all_dirs.push(path);
437            }
438        }
439
440        // Now filter out directories that're a prefix of some path.
441        all_dirs.retain(|dir| !all_paths.iter().any(|p| p != dir && p.starts_with(dir)));
442
443        if all_dirs.is_empty() {
444            break;
445        } else {
446            // Remove them! Then repeat, since we might have nested/empty/directories/.
447            for x in all_dirs {
448                println!("> Removing empty directory {}", x);
449                // This fails if the directory isn't empty, which is a good sanity check. If
450                // something weird happened, just bail.
451                fs_err::remove_dir(&x).unwrap();
452            }
453        }
454    }
455}
456
457fn compress(path: &str, remote_path: &str) {
458    assert!(!path.ends_with(".gz"));
459    assert!(remote_path.ends_with(".gz"));
460
461    fs_err::create_dir_all(std::path::Path::new(remote_path).parent().unwrap()).unwrap();
462    println!("> compressing {}", path);
463    let mut input = BufReader::new(File::open(path).unwrap());
464    let out = File::create(remote_path).unwrap();
465    let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::best());
466    std::io::copy(&mut input, &mut encoder).unwrap();
467    encoder.finish().unwrap();
468}