1use std::future::Future;
5use std::pin::Pin;
6
7use anyhow::Result;
8use futures_channel::{mpsc, oneshot};
9use instant::Instant;
10use serde::de::DeserializeOwned;
11#[cfg(not(target_arch = "wasm32"))]
12use tokio::runtime::Runtime;
13
14use abstutil::Timer;
15use geom::Duration;
16
17use crate::{Color, EventCtx, GfxCtx, Line, Panel, State, Text, Transition, UpdateType};
18
19#[cfg(not(target_arch = "wasm32"))]
20pub use native_loader::FileLoader;
21
22#[cfg(target_arch = "wasm32")]
23pub use wasm_loader::FileLoader;
24
25pub struct RawBytes(pub Vec<u8>);
27
28#[cfg(not(target_arch = "wasm32"))]
29mod native_loader {
30 use super::*;
31
32 pub trait Readable {
33 fn read_file(path: String, timer: &mut Timer) -> Result<Self>
34 where
35 Self: Sized;
36 }
37
38 pub struct FileLoader<A, T> {
40 path: String,
41 on_load:
44 Option<Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>>,
45 }
46
47 impl<A: 'static, T: 'static + Readable> FileLoader<A, T> {
48 pub fn new_state(
49 _: &mut EventCtx,
50 path: String,
51 on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>,
52 ) -> Box<dyn State<A>> {
53 Box::new(FileLoader {
54 path,
55 on_load: Some(on_load),
56 })
57 }
58 }
59
60 impl<A: 'static, T: 'static + Readable> State<A> for FileLoader<A, T> {
61 fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
62 debug!("Loading {}", self.path);
63 ctx.loading_screen(format!("load {}", self.path), |ctx, timer| {
64 let file = T::read_file(self.path.clone(), timer);
65 (self.on_load.take().unwrap())(ctx, app, timer, file)
66 })
67 }
68
69 fn draw(&self, g: &mut GfxCtx, _: &A) {
70 g.clear(Color::BLACK);
71 }
72 }
73
74 impl<T: 'static + DeserializeOwned> Readable for T {
76 fn read_file(path: String, timer: &mut Timer) -> Result<T> {
77 abstio::read_object(path, timer)
79 }
80 }
81 impl Readable for RawBytes {
82 fn read_file(path: String, _: &mut Timer) -> Result<RawBytes> {
83 abstio::slurp_file(path).map(RawBytes)
84 }
85 }
86}
87
88#[cfg(target_arch = "wasm32")]
89mod wasm_loader {
90 use std::io::Read;
91
92 use futures::StreamExt;
93 use wasm_bindgen::{JsCast, UnwrapThrowExt};
94 use wasm_bindgen_futures::JsFuture;
95 use wasm_streams::ReadableStream;
96 use web_sys::{Request, RequestInit, RequestMode, Response};
97
98 use abstutil::prettyprint_usize;
99
100 use super::*;
101
102 pub trait Readable {
103 fn read_url(url: String, resp: Vec<u8>) -> Result<Self>
104 where
105 Self: Sized;
106 }
107
108 pub struct FileLoader<A, T> {
114 response: oneshot::Receiver<Result<Vec<u8>>>,
115 on_load:
116 Option<Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>>,
117 panel: Panel,
118 started: Instant,
119 url: String,
120
121 total_bytes: Option<usize>,
122 read_bytes: usize,
123 got_total_bytes: oneshot::Receiver<usize>,
124 got_read_bytes: mpsc::Receiver<usize>,
125 }
126
127 impl<A: 'static, T: 'static + Readable> FileLoader<A, T> {
128 pub fn new_state(
129 ctx: &mut EventCtx,
130 path: String,
131 on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>,
132 ) -> Box<dyn State<A>> {
133 let base_url = ctx
134 .prerender
135 .assets_base_url()
136 .expect("assets_base_url must be specified for wasm builds via `Settings`");
137
138 let url = if ctx.prerender.assets_are_gzipped() {
141 format!("{}/{}.gz", base_url, path)
142 } else {
143 format!("{}/{}", base_url, path)
144 };
145
146 let (tx, rx) = oneshot::channel();
149 let (tx_total_bytes, got_total_bytes) = oneshot::channel();
150 let (mut tx_read_bytes, got_read_bytes) = mpsc::channel(10);
151 let url_copy = url.clone();
152 debug!("Loading {}", url_copy);
153 wasm_bindgen_futures::spawn_local(async move {
154 let mut opts = RequestInit::new();
155 opts.method("GET");
156 opts.mode(RequestMode::Cors);
157 let request = Request::new_with_str_and_init(&url_copy, &opts).unwrap();
158
159 let window = web_sys::window().unwrap();
160 match JsFuture::from(window.fetch_with_request(&request)).await {
161 Ok(resp_value) => {
162 let resp: Response = resp_value.dyn_into().unwrap();
163 if resp.ok() {
164 let total_bytes = resp
165 .headers()
166 .get("Content-Length")
167 .unwrap()
168 .unwrap()
169 .parse::<usize>()
170 .unwrap();
171 tx_total_bytes.send(total_bytes).unwrap();
172
173 let raw_body = resp.body().unwrap_throw();
174 let body = ReadableStream::from_raw(raw_body.dyn_into().unwrap_throw());
175 let mut stream = body.into_stream();
176 let mut buffer = Vec::new();
177 while let Some(Ok(chunk)) = stream.next().await {
178 let array = js_sys::Uint8Array::new(&chunk);
179 if let Err(err) =
180 tx_read_bytes.try_send(array.byte_length() as usize)
181 {
182 warn!("Couldn't send update on bytes: {}", err);
183 }
184 buffer.extend(array.to_vec());
186 }
187 tx.send(Ok(buffer)).unwrap();
188 } else {
189 let status = resp.status();
190 let err = resp.status_text();
191 tx.send(Err(anyhow!("HTTP {}: {}", status, err))).unwrap();
192 }
193 }
194 Err(err) => {
195 tx.send(Err(anyhow!("{:?}", err))).unwrap();
196 }
197 }
198 });
199
200 Box::new(FileLoader {
201 response: rx,
202 on_load: Some(on_load),
203 panel: ctx.make_loading_screen(Text::from(format!("Loading {}...", url))),
204 started: Instant::now(),
205 url,
206 total_bytes: None,
207 read_bytes: 0,
208 got_total_bytes,
209 got_read_bytes,
210 })
211 }
212 }
213
214 impl<A: 'static, T: 'static + Readable> State<A> for FileLoader<A, T> {
215 fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
216 if self.total_bytes.is_none() {
217 if let Ok(Some(total)) = self.got_total_bytes.try_recv() {
218 self.total_bytes = Some(total);
219 }
220 }
221 if let Some(read) = self.got_read_bytes.try_next().ok().and_then(|value| value) {
222 self.read_bytes += read;
223 }
224
225 if let Some(maybe_resp) = self.response.try_recv().unwrap() {
226 let mut timer = Timer::new(format!("Loading {}...", self.url));
230 let result = maybe_resp.and_then(|resp| T::read_url(self.url.clone(), resp));
231 return (self.on_load.take().unwrap())(ctx, app, &mut timer, result);
232 }
233
234 let mut lines = vec![
235 Line(format!("Loading {}...", self.url)),
236 Line(format!(
237 "Time spent: {}",
238 Duration::realtime_elapsed(self.started)
239 )),
240 ];
241 if let Some(total) = self.total_bytes {
242 lines.push(Line(format!(
243 "Read {} / {} bytes",
244 prettyprint_usize(self.read_bytes),
245 prettyprint_usize(total)
246 )));
247 }
248 self.panel = ctx.make_loading_screen(Text::from_multiline(lines));
249
250 ctx.request_update(UpdateType::Game);
253 Transition::Keep
254 }
255
256 fn draw(&self, g: &mut GfxCtx, _: &A) {
257 g.clear(Color::BLACK);
259 self.panel.draw(g);
260 }
261 }
262
263 impl<T: 'static + DeserializeOwned> Readable for T {
265 fn read_url(url: String, resp: Vec<u8>) -> Result<T> {
266 if url.ends_with(".gz") {
267 let decoder = flate2::read::GzDecoder::new(&resp[..]);
268 if url.ends_with(".bin.gz") {
269 abstutil::from_binary_reader(decoder)
270 } else {
271 abstutil::from_json_reader(decoder)
272 }
273 } else if url.ends_with(".bin") {
274 abstutil::from_binary(&&resp)
275 } else {
276 abstutil::from_json(&&resp)
277 }
278 }
279 }
280 impl Readable for RawBytes {
281 fn read_url(url: String, resp: Vec<u8>) -> Result<RawBytes> {
282 if url.ends_with(".gz") {
283 let mut decoder = flate2::read::GzDecoder::new(&resp[..]);
284 let mut buffer: Vec<u8> = Vec::new();
285 decoder
286 .read_to_end(&mut buffer)
287 .map(|_| RawBytes(buffer))
288 .map_err(|err| err.into())
289 } else {
290 Ok(RawBytes(resp))
291 }
292 }
293 }
294}
295
296pub struct FutureLoader<A, T> {
297 loading_title: String,
298 started: Instant,
299 panel: Panel,
300 receiver: oneshot::Receiver<Result<Box<dyn Send + FnOnce(&A) -> T>>>,
301 on_load: Option<Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>>,
302 outer_progress_receiver: Option<mpsc::Receiver<String>>,
304 inner_progress_receiver: Option<mpsc::Receiver<String>>,
305 last_outer_progress: String,
306 last_inner_progress: String,
307
308 #[cfg(not(target_arch = "wasm32"))]
312 #[allow(dead_code)]
313 runtime: Runtime,
314}
315
316impl<A, T> FutureLoader<A, T>
317where
318 A: 'static,
319 T: 'static,
320{
321 #[cfg(target_arch = "wasm32")]
322 pub fn new_state(
323 ctx: &mut EventCtx,
324 future: Pin<Box<dyn Future<Output = Result<Box<dyn Send + FnOnce(&A) -> T>>>>>,
325 outer_progress_receiver: mpsc::Receiver<String>,
326 inner_progress_receiver: mpsc::Receiver<String>,
327 loading_title: &str,
328 on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>,
329 ) -> Box<dyn State<A>> {
330 let (tx, receiver) = oneshot::channel();
331 wasm_bindgen_futures::spawn_local(async move {
332 tx.send(future.await).ok().unwrap();
333 });
334 Box::new(FutureLoader {
335 loading_title: loading_title.to_string(),
336 started: Instant::now(),
337 panel: ctx.make_loading_screen(Text::from(loading_title)),
338 receiver,
339 on_load: Some(on_load),
340 outer_progress_receiver: Some(outer_progress_receiver),
341 inner_progress_receiver: Some(inner_progress_receiver),
342 last_outer_progress: String::new(),
343 last_inner_progress: String::new(),
344 })
345 }
346
347 #[cfg(not(target_arch = "wasm32"))]
348 pub fn new_state(
349 ctx: &mut EventCtx,
350 future: Pin<Box<dyn Send + Future<Output = Result<Box<dyn Send + FnOnce(&A) -> T>>>>>,
351 outer_progress_receiver: mpsc::Receiver<String>,
352 inner_progress_receiver: mpsc::Receiver<String>,
353 loading_title: &str,
354 on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>,
355 ) -> Box<dyn State<A>> {
356 let runtime = Runtime::new().unwrap();
357 let (tx, receiver) = oneshot::channel();
358 runtime.spawn(async move {
359 tx.send(future.await).ok().unwrap();
360 });
361
362 Box::new(FutureLoader {
363 loading_title: loading_title.to_string(),
364 started: Instant::now(),
365 panel: ctx.make_loading_screen(Text::from(loading_title)),
366 receiver,
367 on_load: Some(on_load),
368 runtime,
369 outer_progress_receiver: Some(outer_progress_receiver),
370 inner_progress_receiver: Some(inner_progress_receiver),
371 last_outer_progress: String::new(),
372 last_inner_progress: String::new(),
373 })
374 }
375}
376
377impl<A, T> State<A> for FutureLoader<A, T>
378where
379 A: 'static,
380 T: 'static,
381{
382 fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
383 match self.receiver.try_recv() {
384 Err(e) => {
385 error!("channel failed: {:?}", e);
386 let on_load = self.on_load.take().unwrap();
387 on_load(ctx, app, Err(anyhow!("channel canceled")))
388 }
389 Ok(None) => {
390 if let Some(ref mut rx) = self.outer_progress_receiver {
391 loop {
393 match rx.try_next() {
394 Ok(Some(msg)) => {
395 self.last_outer_progress = msg;
396 }
397 Ok(None) => {
398 self.outer_progress_receiver = None;
399 break;
400 }
401 Err(_) => {
402 break;
404 }
405 }
406 }
407 }
408 if let Some(ref mut rx) = self.inner_progress_receiver {
409 loop {
410 match rx.try_next() {
411 Ok(Some(msg)) => {
412 self.last_inner_progress = msg;
413 }
414 Ok(None) => {
415 self.inner_progress_receiver = None;
416 break;
417 }
418 Err(_) => {
419 break;
421 }
422 }
423 }
424 }
425
426 self.panel = ctx.make_loading_screen(Text::from_multiline(vec![
427 Line(&self.loading_title),
428 Line(format!(
429 "Time spent: {}",
430 Duration::realtime_elapsed(self.started)
431 )),
432 Line(&self.last_outer_progress),
433 Line(&self.last_inner_progress),
434 ]));
435
436 ctx.request_update(UpdateType::Game);
439 Transition::Keep
440 }
441 Ok(Some(Err(e))) => {
442 error!("error in fetching data");
443 let on_load = self.on_load.take().unwrap();
444 on_load(ctx, app, Err(e))
445 }
446 Ok(Some(Ok(builder))) => {
447 debug!("future complete");
448 let t = builder(app);
449 let on_load = self.on_load.take().unwrap();
450 on_load(ctx, app, Ok(t))
451 }
452 }
453 }
454
455 fn draw(&self, g: &mut GfxCtx, _: &A) {
456 g.clear(Color::BLACK);
457 self.panel.draw(g);
458 }
459}