flams_system/
zip.rs

1use std::path::{Path, PathBuf};
2
3use flams_math_archives::{utils::path_ext::PathExt, LocalArchive, MathArchive};
4use ftml_uris::ArchiveId;
5use tokio::io::AsyncWriteExt;
6
7/// # Errors
8/// # Panics
9pub async fn unzip_from_remote(
10    id: ArchiveId,
11    url: &str,
12    cont: impl FnMut(&Path),
13) -> Result<(), ()> {
14    use futures::TryStreamExt;
15    let resp = match reqwest::get(url).await {
16        Ok(r) => r,
17        Err(e) => {
18            tracing::error!("Error contacting remote: {e}");
19            return Err(());
20        }
21    };
22    let status = resp.status().as_u16();
23    if (400..=599).contains(&status) {
24        let text = resp.text().await;
25        tracing::error!("Error response from remote: {text:?}");
26        return Err(());
27    }
28    let stream = resp.bytes_stream().map_err(std::io::Error::other);
29    let stream = tokio_util::io::StreamReader::new(stream);
30    let decomp = async_compression::tokio::bufread::GzipDecoder::new(stream);
31    let dest = crate::settings::Settings::get()
32        .temp_dir()
33        .join(flams_utils::hashstr("download", &id));
34
35    let mut tar = tokio_tar::Archive::new(decomp);
36    if let Err(e) = unpack_with_callback(&mut tar, &dest, cont).await {
37        tracing::error!("Error unpacking stream: {e}");
38        let _ = tokio::fs::remove_dir_all(dest).await;
39        return Err(());
40    }
41    let mh = crate::settings::Settings::get()
42        .mathhubs()
43        .first()
44        .expect("this is a bug");
45    let mhdest = mh.join(id.as_ref());
46    if let Err(e) = tokio::fs::create_dir_all(&mhdest).await {
47        tracing::error!("Error moving to MathHub: {e}");
48        return Err(());
49    }
50    if mhdest.exists() {
51        let _ = tokio::fs::remove_dir_all(&mhdest).await;
52    }
53    match tokio::task::spawn_blocking(move || dest.rename_safe(&mhdest)).await {
54        Ok(Ok(())) => Ok(()),
55        Err(e) => {
56            tracing::error!("Error moving to MathHub: {e}");
57            Err(())
58        }
59        Ok(Err(e)) => {
60            tracing::error!("Error moving to MathHub: {e:#}");
61            Err(())
62        }
63    }
64}
65
66async fn unpack_with_callback<R: tokio::io::AsyncRead + Unpin, P: AsRef<std::path::Path>>(
67    tar: &mut tokio_tar::Archive<R>,
68    dst: P,
69    mut cont: impl FnMut(&std::path::Path),
70) -> tokio::io::Result<()> {
71    use rustc_hash::FxHashSet;
72    use std::pin::Pin;
73    use tokio::fs;
74    use tokio_stream::StreamExt;
75    let mut entries = tar.entries()?;
76    let mut pinned = Pin::new(&mut entries);
77    let dst = dst.as_ref();
78
79    if fs::symlink_metadata(dst).await.is_err() {
80        fs::create_dir_all(&dst).await?;
81    }
82
83    let dst = fs::canonicalize(dst).await?;
84
85    let mut targets = FxHashSet::default();
86
87    let mut directories = Vec::new();
88    while let Some(entry) = pinned.next().await {
89        let mut file = entry?;
90        if file.header().entry_type() == tokio_tar::EntryType::Directory {
91            directories.push(file);
92        } else {
93            if let Ok(p) = file.path() {
94                cont(&p);
95            }
96            file.unpack_in_raw(&dst, &mut targets).await?;
97        }
98    }
99
100    directories.sort_by(|a, b| b.path_bytes().cmp(&a.path_bytes()));
101    for mut dir in directories {
102        dir.unpack_in_raw(&dst, &mut targets).await?;
103    }
104
105    Ok(())
106}
107
108pub fn zip(
109    archive: &LocalArchive,
110) -> impl futures::Stream<Item = std::io::Result<tokio_util::bytes::Bytes>> {
111    let dir_path = archive.path().to_path_buf();
112    ZipStream::new(dir_path)
113}
114
115pub(super) struct ZipStream {
116    handle: tokio::task::JoinHandle<()>,
117    stream: tokio_util::io::ReaderStream<tokio::io::ReadHalf<tokio::io::SimplexStream>>,
118}
119impl ZipStream {
120    pub(super) fn new(p: PathBuf) -> Self {
121        let (reader, writer) = tokio::io::simplex(1024);
122        let stream = tokio_util::io::ReaderStream::new(reader);
123        let handle = tokio::task::spawn(Self::zip(p, writer));
124        Self { handle, stream }
125    }
126    async fn zip(p: PathBuf, writer: tokio::io::WriteHalf<tokio::io::SimplexStream>) {
127        let comp = async_compression::tokio::write::GzipEncoder::with_quality(
128            writer,
129            async_compression::Level::Best,
130        );
131        let mut tar = tokio_tar::Builder::new(comp);
132        let _ = tar.append_dir_all(".", &p).await;
133        let mut comp = match tar.into_inner().await {
134            Ok(r) => r,
135            Err(e) => {
136                tracing::error!("Failed to zip: {e}");
137                return;
138            }
139        };
140        //let _ = comp.flush().await;
141        let _ = comp.shutdown().await;
142        tracing::info!("Finished zipping {}", p.display());
143    }
144}
145impl Drop for ZipStream {
146    fn drop(&mut self) {
147        tracing::info!("Dropping");
148        self.handle.abort();
149    }
150}
151impl futures::Stream for ZipStream {
152    type Item = std::io::Result<tokio_util::bytes::Bytes>;
153    #[inline]
154    fn poll_next(
155        self: std::pin::Pin<&mut Self>,
156        cx: &mut std::task::Context<'_>,
157    ) -> std::task::Poll<Option<Self::Item>> {
158        unsafe { self.map_unchecked_mut(|f| &mut f.stream).poll_next(cx) }
159    }
160    #[inline]
161    fn size_hint(&self) -> (usize, Option<usize>) {
162        self.stream.size_hint()
163    }
164}