1use std::path::{Path, PathBuf};
2
3use flams_math_archives::{utils::path_ext::PathExt, LocalArchive, MathArchive};
4use ftml_uris::ArchiveId;
5use tokio::io::AsyncWriteExt;
6
7pub async fn unzip_from_remote(
10 id: ArchiveId,
11 url: &str,
12 cont: impl FnMut(&Path),
13) -> Result<(), ()> {
14 use futures::TryStreamExt;
15 let resp = match reqwest::get(url).await {
16 Ok(r) => r,
17 Err(e) => {
18 tracing::error!("Error contacting remote: {e}");
19 return Err(());
20 }
21 };
22 let status = resp.status().as_u16();
23 if (400..=599).contains(&status) {
24 let text = resp.text().await;
25 tracing::error!("Error response from remote: {text:?}");
26 return Err(());
27 }
28 let stream = resp.bytes_stream().map_err(std::io::Error::other);
29 let stream = tokio_util::io::StreamReader::new(stream);
30 let decomp = async_compression::tokio::bufread::GzipDecoder::new(stream);
31 let dest = crate::settings::Settings::get()
32 .temp_dir()
33 .join(flams_utils::hashstr("download", &id));
34
35 let mut tar = tokio_tar::Archive::new(decomp);
36 if let Err(e) = unpack_with_callback(&mut tar, &dest, cont).await {
37 tracing::error!("Error unpacking stream: {e}");
38 let _ = tokio::fs::remove_dir_all(dest).await;
39 return Err(());
40 }
41 let mh = crate::settings::Settings::get()
42 .mathhubs()
43 .first()
44 .expect("this is a bug");
45 let mhdest = mh.join(id.as_ref());
46 if let Err(e) = tokio::fs::create_dir_all(&mhdest).await {
47 tracing::error!("Error moving to MathHub: {e}");
48 return Err(());
49 }
50 if mhdest.exists() {
51 let _ = tokio::fs::remove_dir_all(&mhdest).await;
52 }
53 match tokio::task::spawn_blocking(move || dest.rename_safe(&mhdest)).await {
54 Ok(Ok(())) => Ok(()),
55 Err(e) => {
56 tracing::error!("Error moving to MathHub: {e}");
57 Err(())
58 }
59 Ok(Err(e)) => {
60 tracing::error!("Error moving to MathHub: {e:#}");
61 Err(())
62 }
63 }
64}
65
66async fn unpack_with_callback<R: tokio::io::AsyncRead + Unpin, P: AsRef<std::path::Path>>(
67 tar: &mut tokio_tar::Archive<R>,
68 dst: P,
69 mut cont: impl FnMut(&std::path::Path),
70) -> tokio::io::Result<()> {
71 use rustc_hash::FxHashSet;
72 use std::pin::Pin;
73 use tokio::fs;
74 use tokio_stream::StreamExt;
75 let mut entries = tar.entries()?;
76 let mut pinned = Pin::new(&mut entries);
77 let dst = dst.as_ref();
78
79 if fs::symlink_metadata(dst).await.is_err() {
80 fs::create_dir_all(&dst).await?;
81 }
82
83 let dst = fs::canonicalize(dst).await?;
84
85 let mut targets = FxHashSet::default();
86
87 let mut directories = Vec::new();
88 while let Some(entry) = pinned.next().await {
89 let mut file = entry?;
90 if file.header().entry_type() == tokio_tar::EntryType::Directory {
91 directories.push(file);
92 } else {
93 if let Ok(p) = file.path() {
94 cont(&p);
95 }
96 file.unpack_in_raw(&dst, &mut targets).await?;
97 }
98 }
99
100 directories.sort_by(|a, b| b.path_bytes().cmp(&a.path_bytes()));
101 for mut dir in directories {
102 dir.unpack_in_raw(&dst, &mut targets).await?;
103 }
104
105 Ok(())
106}
107
108pub fn zip(
109 archive: &LocalArchive,
110) -> impl futures::Stream<Item = std::io::Result<tokio_util::bytes::Bytes>> {
111 let dir_path = archive.path().to_path_buf();
112 ZipStream::new(dir_path)
113}
114
115pub(super) struct ZipStream {
116 handle: tokio::task::JoinHandle<()>,
117 stream: tokio_util::io::ReaderStream<tokio::io::ReadHalf<tokio::io::SimplexStream>>,
118}
119impl ZipStream {
120 pub(super) fn new(p: PathBuf) -> Self {
121 let (reader, writer) = tokio::io::simplex(1024);
122 let stream = tokio_util::io::ReaderStream::new(reader);
123 let handle = tokio::task::spawn(Self::zip(p, writer));
124 Self { handle, stream }
125 }
126 async fn zip(p: PathBuf, writer: tokio::io::WriteHalf<tokio::io::SimplexStream>) {
127 let comp = async_compression::tokio::write::GzipEncoder::with_quality(
128 writer,
129 async_compression::Level::Best,
130 );
131 let mut tar = tokio_tar::Builder::new(comp);
132 let _ = tar.append_dir_all(".", &p).await;
133 let mut comp = match tar.into_inner().await {
134 Ok(r) => r,
135 Err(e) => {
136 tracing::error!("Failed to zip: {e}");
137 return;
138 }
139 };
140 let _ = comp.shutdown().await;
142 tracing::info!("Finished zipping {}", p.display());
143 }
144}
145impl Drop for ZipStream {
146 fn drop(&mut self) {
147 tracing::info!("Dropping");
148 self.handle.abort();
149 }
150}
151impl futures::Stream for ZipStream {
152 type Item = std::io::Result<tokio_util::bytes::Bytes>;
153 #[inline]
154 fn poll_next(
155 self: std::pin::Pin<&mut Self>,
156 cx: &mut std::task::Context<'_>,
157 ) -> std::task::Poll<Option<Self::Item>> {
158 unsafe { self.map_unchecked_mut(|f| &mut f.stream).poll_next(cx) }
159 }
160 #[inline]
161 fn size_hint(&self) -> (usize, Option<usize>) {
162 self.stream.size_hint()
163 }
164}