use std::collections::BTreeMap;
use std::io::{BufReader, Read};
use std::process::Command;
use anyhow::Result;
use fs_err::File;
use structopt::StructOpt;
use walkdir::WalkDir;
use abstio::{DataPacks, Entry, Manifest};
use abstutil::{must_run_cmd, prettyprint_usize, Timer};
const MD5_BUF_READ_SIZE: usize = 4096;
#[derive(StructOpt)]
#[structopt(
name = "updater",
about = "Download the latest version of per-city data"
)]
enum Task {
Upload,
IncrementalUpload {
#[structopt(long)]
version: String,
},
DryRun {
#[structopt(long)]
single_file: Option<String>,
},
OptIntoAll,
Download {
#[structopt(long)]
minimal: bool,
#[structopt(long)]
dont_delete: bool,
#[structopt(long)]
dl_from_local: bool,
#[structopt(long, default_value = "dev")]
version: String,
},
}
#[tokio::main]
async fn main() {
if !std::path::Path::new("data").exists() {
panic!("Your current directory doesn't have the data/ directory. Run from the git root: cd ../; cargo run --bin updater -- args");
}
abstutil::logger::setup();
match Task::from_args() {
Task::Upload => {
upload("dev");
}
Task::IncrementalUpload { version } => {
assert_ne!(version, "dev");
incremental_upload(version);
}
Task::DryRun { single_file } => {
if let Some(path) = single_file {
let local = md5sum(&path);
let truth = Manifest::load()
.entries
.remove(&path)
.unwrap_or_else(|| panic!("{} not in data/MANIFEST.txt", path))
.checksum;
if local != truth {
println!("{} has changed", path);
}
} else {
just_compare();
}
}
Task::OptIntoAll => {
println!("{}", abstutil::to_json(&DataPacks::all_data_packs()));
}
Task::Download {
minimal,
dont_delete,
dl_from_local,
version,
} => {
download_updates(version, minimal, !dont_delete, dl_from_local).await;
}
}
}
async fn download_updates(version: String, minimal: bool, delete_local: bool, dl_from_local: bool) {
let data_packs = DataPacks::load_or_create();
let truth = Manifest::load().filter(data_packs);
let local = generate_manifest(&truth);
if delete_local {
for path in local.entries.keys() {
if !truth.entries.contains_key(path) {
rm(path);
}
}
}
let mut failed = Vec::new();
for (path, entry) in truth.entries {
if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
if minimal && !path.contains("montlake") && path != "data/system/us/seattle/city.bin" {
continue;
}
fs_err::create_dir_all(std::path::Path::new(&path).parent().unwrap()).unwrap();
match download_file(&version, &path, dl_from_local).await {
Ok(bytes) => {
println!(
"> decompress {}, which is {} bytes compressed",
path,
prettyprint_usize(bytes.len())
);
let mut decoder = flate2::read::GzDecoder::new(&bytes[..]);
let mut out = File::create(&path).unwrap();
if let Err(err) = std::io::copy(&mut decoder, &mut out) {
println!("{}, but continuing", err);
failed.push(format!("{} failed: {}", path, err));
}
}
Err(err) => {
println!("{}, but continuing", err);
failed.push(format!("{} failed: {}", path, err));
}
};
}
}
if !failed.is_empty() {
panic!("Failed to download stuff: {:?}", failed);
}
remove_empty_directories("data/input");
remove_empty_directories("data/system");
}
fn just_compare() {
let data_packs = DataPacks::load_or_create();
let truth = Manifest::load().filter(data_packs);
let local = generate_manifest(&truth);
for path in local.entries.keys() {
if !truth.entries.contains_key(path) {
println!("- Remove {}", path);
}
}
for (path, entry) in truth.entries {
if local.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
if let Some(old_bytes) = local.entries.get(&path).map(|x| x.uncompressed_size_bytes) {
if old_bytes == entry.uncompressed_size_bytes {
println!("- Update {}. Same size, md5sum changed", path);
} else {
println!(
"- Update {}. {} bytes difference",
path,
(old_bytes as isize) - (entry.uncompressed_size_bytes as isize)
);
}
} else {
println!("- Add {}", path);
}
}
}
}
fn upload(version: &str) {
let remote_base = format!("/home/dabreegster/s3_abst_data/{}", version);
let remote: Manifest = abstio::maybe_read_json(
format!("{}/MANIFEST.json", remote_base),
&mut Timer::throwaway(),
)
.unwrap_or(Manifest {
entries: BTreeMap::new(),
});
let mut local = generate_manifest(&remote);
for path in remote.entries.keys() {
if !local.entries.contains_key(path) {
rm(&format!("{}/{}.gz", remote_base, path));
}
}
let local_entries = std::mem::take(&mut local.entries);
for (path, entry) in Timer::new("compress files").parallelize(
"compress files",
local_entries.into_iter().collect(),
|(path, mut entry)| {
let remote_path = format!("{}/{}.gz", remote_base, path);
let changed = remote.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum);
if changed {
compress(&path, &remote_path);
}
entry.compressed_size_bytes = fs_err::metadata(&remote_path)
.unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
.len();
(path, entry)
},
) {
local.entries.insert(path, entry);
}
abstio::write_json(format!("{}/MANIFEST.json", remote_base), &local);
abstio::write_json("data/MANIFEST.json".to_string(), &local);
must_run_cmd(
Command::new("aws")
.arg("s3")
.arg("sync")
.arg("--delete")
.arg(format!("{}/data", remote_base))
.arg(format!("s3://abstreet/{}/data", version)),
);
must_run_cmd(
Command::new("aws")
.arg("s3")
.arg("cp")
.arg(format!("{}/MANIFEST.json", remote_base))
.arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
);
}
fn incremental_upload(version: String) {
let remote_base = "tmp_incremental_upload";
let mut truth = Manifest::load();
let local = generate_manifest(&truth);
let mut changes = false;
for (path, entry) in Timer::new("compress files")
.parallelize(
"compress files",
local.entries.into_iter().collect(),
|(path, mut entry)| {
if truth.entries.get(&path).map(|x| &x.checksum) != Some(&entry.checksum) {
let remote_path = format!("{}/{}.gz", remote_base, path);
compress(&path, &remote_path);
entry.compressed_size_bytes = fs_err::metadata(&remote_path)
.unwrap_or_else(|_| panic!("Compressed {} not there?", remote_path))
.len();
Some((path, entry))
} else {
None
}
},
)
.into_iter()
.flatten()
{
truth.entries.insert(path, entry);
changes = true;
}
if !changes {
return;
}
abstio::write_json("data/MANIFEST.json".to_string(), &truth);
must_run_cmd(
Command::new("aws")
.arg("s3")
.arg("sync")
.arg(format!("{}/data", remote_base))
.arg(format!("s3://abstreet/{}/data", version)),
);
must_run_cmd(
Command::new("aws")
.arg("s3")
.arg("cp")
.arg("data/MANIFEST.json")
.arg(format!("s3://abstreet/{}/MANIFEST.json", version)),
);
must_run_cmd(Command::new("rm").arg("-rfv").arg(remote_base));
}
fn generate_manifest(truth: &Manifest) -> Manifest {
let mut paths = Vec::new();
for entry in WalkDir::new("data/input")
.into_iter()
.chain(WalkDir::new("data/system").into_iter())
.filter_map(|e| e.ok())
{
if entry.file_type().is_dir() {
continue;
}
let orig_path = entry.path().display().to_string();
let path = orig_path.replace("\\", "/");
if path.contains("system/assets/")
|| path.contains("system/ltn_proposals")
|| path.contains("system/proposals")
|| path.contains("system/study_areas")
{
continue;
}
if path.ends_with(".osm") {
continue;
}
paths.push((orig_path, path));
}
let mut kv = BTreeMap::new();
for (path, entry) in
Timer::new("compute md5sums").parallelize("compute md5sums", paths, |(orig_path, path)| {
let metadata = fs_err::metadata(&orig_path).unwrap();
let uncompressed_size_bytes = metadata.len();
let recent_modtime = metadata.modified().unwrap().elapsed().unwrap()
< std::time::Duration::from_secs(60 * 60 * 12);
let checksum = if recent_modtime
|| truth
.entries
.get(&path)
.map(|entry| entry.uncompressed_size_bytes != uncompressed_size_bytes)
.unwrap_or(true)
{
md5sum(&orig_path)
} else {
truth.entries[&path].checksum.clone()
};
(
path,
Entry {
checksum,
uncompressed_size_bytes,
compressed_size_bytes: 0,
},
)
})
{
kv.insert(path, entry);
}
Manifest { entries: kv }
}
fn md5sum(path: &str) -> String {
let mut file = File::open(path).unwrap();
let mut buffer = [0_u8; MD5_BUF_READ_SIZE];
let mut context = md5::Context::new();
while let Ok(n) = file.read(&mut buffer) {
if n == 0 {
break;
}
context.consume(&buffer[..n]);
}
format!("{:x}", context.compute())
}
fn rm(path: &str) {
println!("> rm {}", path);
match fs_err::remove_file(path) {
Ok(_) => {}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
println!("file {} does not exist, continuing", &path);
}
other_error => {
panic!("problem removing file: {:?}", other_error);
}
},
}
}
async fn download_file(version: &str, path: &str, dl_from_local: bool) -> Result<Vec<u8>> {
if dl_from_local {
return abstio::slurp_file(format!(
"/home/dabreegster/s3_abst_data/{}/{}.gz",
version, path
));
}
let url = format!("https://play.abstreet.org/{}/{}.gz", version, path);
println!("> download {}", url);
let (mut tx, rx) = futures_channel::mpsc::channel(1000);
abstio::print_download_progress(rx);
abstio::download_bytes(url, None, &mut tx).await
}
fn remove_empty_directories(root: &str) {
loop {
let mut all_paths = Vec::new();
let mut all_dirs = Vec::new();
for entry in WalkDir::new(root).into_iter().filter_map(|e| e.ok()) {
let path = entry.path().display().to_string();
all_paths.push(path.clone());
if entry.file_type().is_dir() {
all_dirs.push(path);
}
}
all_dirs.retain(|dir| !all_paths.iter().any(|p| p != dir && p.starts_with(dir)));
if all_dirs.is_empty() {
break;
} else {
for x in all_dirs {
println!("> Removing empty directory {}", x);
fs_err::remove_dir(&x).unwrap();
}
}
}
}
fn compress(path: &str, remote_path: &str) {
assert!(!path.ends_with(".gz"));
assert!(remote_path.ends_with(".gz"));
fs_err::create_dir_all(std::path::Path::new(remote_path).parent().unwrap()).unwrap();
println!("> compressing {}", path);
let mut input = BufReader::new(File::open(path).unwrap());
let out = File::create(remote_path).unwrap();
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::best());
std::io::copy(&mut input, &mut encoder).unwrap();
encoder.finish().unwrap();
}