widgetry/tools/
load.rs

1//! Loading large resources requires different strategies on native and web. Both cases are wrapped
2//! up as a State that runs a callback when done.
3
4use std::future::Future;
5use std::pin::Pin;
6
7use anyhow::Result;
8use futures_channel::{mpsc, oneshot};
9use instant::Instant;
10use serde::de::DeserializeOwned;
11#[cfg(not(target_arch = "wasm32"))]
12use tokio::runtime::Runtime;
13
14use abstutil::Timer;
15use geom::Duration;
16
17use crate::{Color, EventCtx, GfxCtx, Line, Panel, State, Text, Transition, UpdateType};
18
19#[cfg(not(target_arch = "wasm32"))]
20pub use native_loader::FileLoader;
21
22#[cfg(target_arch = "wasm32")]
23pub use wasm_loader::FileLoader;
24
25/// Use this with FileLoader to just read raw bytes without any deserialization.
26pub struct RawBytes(pub Vec<u8>);
27
28#[cfg(not(target_arch = "wasm32"))]
29mod native_loader {
30    use super::*;
31
32    pub trait Readable {
33        fn read_file(path: String, timer: &mut Timer) -> Result<Self>
34        where
35            Self: Sized;
36    }
37
38    /// Loads a JSON, bincoded, or raw file, then deserializes it
39    pub struct FileLoader<A, T> {
40        path: String,
41        // Wrapped in an Option just to make calling from event() work. Technically this is unsafe
42        // if a caller fails to pop the FileLoader state in their transitions!
43        on_load:
44            Option<Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>>,
45    }
46
47    impl<A: 'static, T: 'static + Readable> FileLoader<A, T> {
48        pub fn new_state(
49            _: &mut EventCtx,
50            path: String,
51            on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>,
52        ) -> Box<dyn State<A>> {
53            Box::new(FileLoader {
54                path,
55                on_load: Some(on_load),
56            })
57        }
58    }
59
60    impl<A: 'static, T: 'static + Readable> State<A> for FileLoader<A, T> {
61        fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
62            debug!("Loading {}", self.path);
63            ctx.loading_screen(format!("load {}", self.path), |ctx, timer| {
64                let file = T::read_file(self.path.clone(), timer);
65                (self.on_load.take().unwrap())(ctx, app, timer, file)
66            })
67        }
68
69        fn draw(&self, g: &mut GfxCtx, _: &A) {
70            g.clear(Color::BLACK);
71        }
72    }
73
74    // Two implementations for reading the file, using serde or just raw bytes
75    impl<T: 'static + DeserializeOwned> Readable for T {
76        fn read_file(path: String, timer: &mut Timer) -> Result<T> {
77            // TODO We'll eventually want to remove the dependency on abstio
78            abstio::read_object(path, timer)
79        }
80    }
81    impl Readable for RawBytes {
82        fn read_file(path: String, _: &mut Timer) -> Result<RawBytes> {
83            abstio::slurp_file(path).map(RawBytes)
84        }
85    }
86}
87
88#[cfg(target_arch = "wasm32")]
89mod wasm_loader {
90    use std::io::Read;
91
92    use futures::StreamExt;
93    use wasm_bindgen::{JsCast, UnwrapThrowExt};
94    use wasm_bindgen_futures::JsFuture;
95    use wasm_streams::ReadableStream;
96    use web_sys::{Request, RequestInit, RequestMode, Response};
97
98    use abstutil::prettyprint_usize;
99
100    use super::*;
101
102    pub trait Readable {
103        fn read_url(url: String, resp: Vec<u8>) -> Result<Self>
104        where
105            Self: Sized;
106    }
107
108    /// Loads a JSON, bincoded, or raw file, then deserializes it
109    ///
110    /// Instead of blockingly reading a file within ctx.loading_screen, on the web have to
111    /// asynchronously make an HTTP request and keep "polling" for completion in a way that's
112    /// compatible with winit's event loop.
113    pub struct FileLoader<A, T> {
114        response: oneshot::Receiver<Result<Vec<u8>>>,
115        on_load:
116            Option<Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>>,
117        panel: Panel,
118        started: Instant,
119        url: String,
120
121        total_bytes: Option<usize>,
122        read_bytes: usize,
123        got_total_bytes: oneshot::Receiver<usize>,
124        got_read_bytes: mpsc::Receiver<usize>,
125    }
126
127    impl<A: 'static, T: 'static + Readable> FileLoader<A, T> {
128        pub fn new_state(
129            ctx: &mut EventCtx,
130            path: String,
131            on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, &mut Timer, Result<T>) -> Transition<A>>,
132        ) -> Box<dyn State<A>> {
133            let base_url = ctx
134                .prerender
135                .assets_base_url()
136                .expect("assets_base_url must be specified for wasm builds via `Settings`");
137
138            // Note that files are gzipped on S3 and other deployments. When running locally, we
139            // just symlink the data/ directory, where files aren't compressed.
140            let url = if ctx.prerender.assets_are_gzipped() {
141                format!("{}/{}.gz", base_url, path)
142            } else {
143                format!("{}/{}", base_url, path)
144            };
145
146            // Make the HTTP request nonblockingly. When the response is received, send it through
147            // the channel.
148            let (tx, rx) = oneshot::channel();
149            let (tx_total_bytes, got_total_bytes) = oneshot::channel();
150            let (mut tx_read_bytes, got_read_bytes) = mpsc::channel(10);
151            let url_copy = url.clone();
152            debug!("Loading {}", url_copy);
153            wasm_bindgen_futures::spawn_local(async move {
154                let mut opts = RequestInit::new();
155                opts.method("GET");
156                opts.mode(RequestMode::Cors);
157                let request = Request::new_with_str_and_init(&url_copy, &opts).unwrap();
158
159                let window = web_sys::window().unwrap();
160                match JsFuture::from(window.fetch_with_request(&request)).await {
161                    Ok(resp_value) => {
162                        let resp: Response = resp_value.dyn_into().unwrap();
163                        if resp.ok() {
164                            let total_bytes = resp
165                                .headers()
166                                .get("Content-Length")
167                                .unwrap()
168                                .unwrap()
169                                .parse::<usize>()
170                                .unwrap();
171                            tx_total_bytes.send(total_bytes).unwrap();
172
173                            let raw_body = resp.body().unwrap_throw();
174                            let body = ReadableStream::from_raw(raw_body.dyn_into().unwrap_throw());
175                            let mut stream = body.into_stream();
176                            let mut buffer = Vec::new();
177                            while let Some(Ok(chunk)) = stream.next().await {
178                                let array = js_sys::Uint8Array::new(&chunk);
179                                if let Err(err) =
180                                    tx_read_bytes.try_send(array.byte_length() as usize)
181                                {
182                                    warn!("Couldn't send update on bytes: {}", err);
183                                }
184                                // TODO Can we avoid this clone?
185                                buffer.extend(array.to_vec());
186                            }
187                            tx.send(Ok(buffer)).unwrap();
188                        } else {
189                            let status = resp.status();
190                            let err = resp.status_text();
191                            tx.send(Err(anyhow!("HTTP {}: {}", status, err))).unwrap();
192                        }
193                    }
194                    Err(err) => {
195                        tx.send(Err(anyhow!("{:?}", err))).unwrap();
196                    }
197                }
198            });
199
200            Box::new(FileLoader {
201                response: rx,
202                on_load: Some(on_load),
203                panel: ctx.make_loading_screen(Text::from(format!("Loading {}...", url))),
204                started: Instant::now(),
205                url,
206                total_bytes: None,
207                read_bytes: 0,
208                got_total_bytes,
209                got_read_bytes,
210            })
211        }
212    }
213
214    impl<A: 'static, T: 'static + Readable> State<A> for FileLoader<A, T> {
215        fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
216            if self.total_bytes.is_none() {
217                if let Ok(Some(total)) = self.got_total_bytes.try_recv() {
218                    self.total_bytes = Some(total);
219                }
220            }
221            if let Some(read) = self.got_read_bytes.try_next().ok().and_then(|value| value) {
222                self.read_bytes += read;
223            }
224
225            if let Some(maybe_resp) = self.response.try_recv().unwrap() {
226                // TODO We stop drawing and start blocking at this point. It can take a
227                // while. Any way to make it still be nonblockingish? Maybe put some of the work
228                // inside that spawn_local?
229                let mut timer = Timer::new(format!("Loading {}...", self.url));
230                let result = maybe_resp.and_then(|resp| T::read_url(self.url.clone(), resp));
231                return (self.on_load.take().unwrap())(ctx, app, &mut timer, result);
232            }
233
234            let mut lines = vec![
235                Line(format!("Loading {}...", self.url)),
236                Line(format!(
237                    "Time spent: {}",
238                    Duration::realtime_elapsed(self.started)
239                )),
240            ];
241            if let Some(total) = self.total_bytes {
242                lines.push(Line(format!(
243                    "Read {} / {} bytes",
244                    prettyprint_usize(self.read_bytes),
245                    prettyprint_usize(total)
246                )));
247            }
248            self.panel = ctx.make_loading_screen(Text::from_multiline(lines));
249
250            // Until the response is received, just ask winit to regularly call event(), so we can
251            // keep polling the channel.
252            ctx.request_update(UpdateType::Game);
253            Transition::Keep
254        }
255
256        fn draw(&self, g: &mut GfxCtx, _: &A) {
257            // TODO Progress bar for bytes received
258            g.clear(Color::BLACK);
259            self.panel.draw(g);
260        }
261    }
262
263    // Two implementations for reading the file, using serde or just raw bytes
264    impl<T: 'static + DeserializeOwned> Readable for T {
265        fn read_url(url: String, resp: Vec<u8>) -> Result<T> {
266            if url.ends_with(".gz") {
267                let decoder = flate2::read::GzDecoder::new(&resp[..]);
268                if url.ends_with(".bin.gz") {
269                    abstutil::from_binary_reader(decoder)
270                } else {
271                    abstutil::from_json_reader(decoder)
272                }
273            } else if url.ends_with(".bin") {
274                abstutil::from_binary(&&resp)
275            } else {
276                abstutil::from_json(&&resp)
277            }
278        }
279    }
280    impl Readable for RawBytes {
281        fn read_url(url: String, resp: Vec<u8>) -> Result<RawBytes> {
282            if url.ends_with(".gz") {
283                let mut decoder = flate2::read::GzDecoder::new(&resp[..]);
284                let mut buffer: Vec<u8> = Vec::new();
285                decoder
286                    .read_to_end(&mut buffer)
287                    .map(|_| RawBytes(buffer))
288                    .map_err(|err| err.into())
289            } else {
290                Ok(RawBytes(resp))
291            }
292        }
293    }
294}
295
296pub struct FutureLoader<A, T> {
297    loading_title: String,
298    started: Instant,
299    panel: Panel,
300    receiver: oneshot::Receiver<Result<Box<dyn Send + FnOnce(&A) -> T>>>,
301    on_load: Option<Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>>,
302    // These're just two different types of progress updates that callers can provide
303    outer_progress_receiver: Option<mpsc::Receiver<String>>,
304    inner_progress_receiver: Option<mpsc::Receiver<String>>,
305    last_outer_progress: String,
306    last_inner_progress: String,
307
308    // If Runtime is dropped, any active tasks will be canceled, so we retain it here even
309    // though we never access it. It might make more sense for Runtime to live on App if we're
310    // going to be doing more background spawning.
311    #[cfg(not(target_arch = "wasm32"))]
312    #[allow(dead_code)]
313    runtime: Runtime,
314}
315
316impl<A, T> FutureLoader<A, T>
317where
318    A: 'static,
319    T: 'static,
320{
321    #[cfg(target_arch = "wasm32")]
322    pub fn new_state(
323        ctx: &mut EventCtx,
324        future: Pin<Box<dyn Future<Output = Result<Box<dyn Send + FnOnce(&A) -> T>>>>>,
325        outer_progress_receiver: mpsc::Receiver<String>,
326        inner_progress_receiver: mpsc::Receiver<String>,
327        loading_title: &str,
328        on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>,
329    ) -> Box<dyn State<A>> {
330        let (tx, receiver) = oneshot::channel();
331        wasm_bindgen_futures::spawn_local(async move {
332            tx.send(future.await).ok().unwrap();
333        });
334        Box::new(FutureLoader {
335            loading_title: loading_title.to_string(),
336            started: Instant::now(),
337            panel: ctx.make_loading_screen(Text::from(loading_title)),
338            receiver,
339            on_load: Some(on_load),
340            outer_progress_receiver: Some(outer_progress_receiver),
341            inner_progress_receiver: Some(inner_progress_receiver),
342            last_outer_progress: String::new(),
343            last_inner_progress: String::new(),
344        })
345    }
346
347    #[cfg(not(target_arch = "wasm32"))]
348    pub fn new_state(
349        ctx: &mut EventCtx,
350        future: Pin<Box<dyn Send + Future<Output = Result<Box<dyn Send + FnOnce(&A) -> T>>>>>,
351        outer_progress_receiver: mpsc::Receiver<String>,
352        inner_progress_receiver: mpsc::Receiver<String>,
353        loading_title: &str,
354        on_load: Box<dyn FnOnce(&mut EventCtx, &mut A, Result<T>) -> Transition<A>>,
355    ) -> Box<dyn State<A>> {
356        let runtime = Runtime::new().unwrap();
357        let (tx, receiver) = oneshot::channel();
358        runtime.spawn(async move {
359            tx.send(future.await).ok().unwrap();
360        });
361
362        Box::new(FutureLoader {
363            loading_title: loading_title.to_string(),
364            started: Instant::now(),
365            panel: ctx.make_loading_screen(Text::from(loading_title)),
366            receiver,
367            on_load: Some(on_load),
368            runtime,
369            outer_progress_receiver: Some(outer_progress_receiver),
370            inner_progress_receiver: Some(inner_progress_receiver),
371            last_outer_progress: String::new(),
372            last_inner_progress: String::new(),
373        })
374    }
375}
376
377impl<A, T> State<A> for FutureLoader<A, T>
378where
379    A: 'static,
380    T: 'static,
381{
382    fn event(&mut self, ctx: &mut EventCtx, app: &mut A) -> Transition<A> {
383        match self.receiver.try_recv() {
384            Err(e) => {
385                error!("channel failed: {:?}", e);
386                let on_load = self.on_load.take().unwrap();
387                on_load(ctx, app, Err(anyhow!("channel canceled")))
388            }
389            Ok(None) => {
390                if let Some(ref mut rx) = self.outer_progress_receiver {
391                    // Read all of the progress that's happened
392                    loop {
393                        match rx.try_next() {
394                            Ok(Some(msg)) => {
395                                self.last_outer_progress = msg;
396                            }
397                            Ok(None) => {
398                                self.outer_progress_receiver = None;
399                                break;
400                            }
401                            Err(_) => {
402                                // No messages
403                                break;
404                            }
405                        }
406                    }
407                }
408                if let Some(ref mut rx) = self.inner_progress_receiver {
409                    loop {
410                        match rx.try_next() {
411                            Ok(Some(msg)) => {
412                                self.last_inner_progress = msg;
413                            }
414                            Ok(None) => {
415                                self.inner_progress_receiver = None;
416                                break;
417                            }
418                            Err(_) => {
419                                // No messages
420                                break;
421                            }
422                        }
423                    }
424                }
425
426                self.panel = ctx.make_loading_screen(Text::from_multiline(vec![
427                    Line(&self.loading_title),
428                    Line(format!(
429                        "Time spent: {}",
430                        Duration::realtime_elapsed(self.started)
431                    )),
432                    Line(&self.last_outer_progress),
433                    Line(&self.last_inner_progress),
434                ]));
435
436                // Until the response is received, just ask winit to regularly call event(), so we
437                // can keep polling the channel.
438                ctx.request_update(UpdateType::Game);
439                Transition::Keep
440            }
441            Ok(Some(Err(e))) => {
442                error!("error in fetching data");
443                let on_load = self.on_load.take().unwrap();
444                on_load(ctx, app, Err(e))
445            }
446            Ok(Some(Ok(builder))) => {
447                debug!("future complete");
448                let t = builder(app);
449                let on_load = self.on_load.take().unwrap();
450                on_load(ctx, app, Ok(t))
451            }
452        }
453    }
454
455    fn draw(&self, g: &mut GfxCtx, _: &A) {
456        g.clear(Color::BLACK);
457        self.panel.draw(g);
458    }
459}