flams_system/building/
queue_manager.rs

1use flams_math_archives::backend::{AnyBackend, SandboxedBackend, SandboxedRepository};
2use flams_utils::vecmap::VecMap;
3use std::{fmt::Display, num::NonZeroU32, sync::atomic::AtomicU8};
4
5use crate::settings::Settings;
6
7use super::queue::{Queue, QueueName, QueueState, RunningQueue};
8
9#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
10pub struct QueueId(NonZeroU32);
11impl QueueId {
12    #[must_use]
13    pub fn global() -> Self {
14        Self(NonZeroU32::new(1).unwrap_or_else(|| unreachable!()))
15    }
16}
17impl Display for QueueId {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        write!(f, "queue {}", self.0)
20    }
21}
22impl From<QueueId> for NonZeroU32 {
23    #[inline]
24    fn from(id: QueueId) -> Self {
25        id.0
26    }
27}
28impl From<NonZeroU32> for QueueId {
29    #[inline]
30    fn from(id: NonZeroU32) -> Self {
31        Self(id)
32    }
33}
34
35#[derive(Debug)]
36pub struct QueueManager {
37    inner: parking_lot::RwLock<VecMap<QueueId, Queue>>,
38    threads: Semaphore,
39}
40
41static QUEUE_MANAGER: std::sync::OnceLock<QueueManager> = std::sync::OnceLock::new();
42
43impl QueueManager {
44    #[inline]
45    pub fn clear() {
46        if let Some(m) = QUEUE_MANAGER.get() {
47            m.inner.write().0.clear();
48        }
49    }
50    pub fn initialize(num_threads: u8) {
51        QUEUE_MANAGER.get_or_init(|| {
52            let init = if crate::settings::Settings::get().admin_pwd.is_some() {
53                VecMap::new()
54            } else {
55                vec![(
56                    QueueId::global(),
57                    Queue::new(QueueId::global(), QueueName::Global, AnyBackend::Global),
58                )]
59                .into()
60            };
61            Self {
62                inner: parking_lot::RwLock::new(init),
63                threads: {
64                    #[cfg(feature = "tokio")]
65                    {
66                        match num_threads {
67                            0 => Semaphore::Linear,
68                            i => Semaphore::Counting {
69                                inner: std::sync::Arc::new(tokio::sync::Semaphore::new(i as usize)),
70                                _num: flams_utils::triomphe::Arc::new(AtomicU8::new(i)),
71                            },
72                        }
73                    }
74                    #[cfg(not(feature = "tokio"))]
75                    {
76                        Semaphore::Linear
77                    }
78                },
79            }
80        });
81    }
82    /// ### Panics
83    pub fn get() -> &'static Self {
84        QUEUE_MANAGER.get().expect("Queue manager not initialized")
85    }
86
87    pub fn new_queue(&self, queue_name: &str) -> QueueId {
88        super::BUILD_QUEUE_SPAN.in_scope(move || {
89      let mut inner = self.inner.write();
90      let mut count = 0;
91      loop {
92        if inner.0.iter().any(|(_,q)| matches!(q.name(),QueueName::Sandbox{name,idx} if &**name == queue_name && *idx == count)) {
93          count += 1;
94        } else {break}
95      }
96      let sbname = format!("{queue_name}_{count}");
97      tracing::info_span!("Build Queue",name = &sbname).in_scope(|| {
98        let id = QueueId(NonZeroU32::new(inner.0.iter().map(|(k,_)| k.0.get()).max().unwrap_or_default() + 1).unwrap_or_else(|| unreachable!()));
99        let backend = AnyBackend::Sandbox(SandboxedBackend::new(&sbname,&Settings::get().temp_dir()));
100        inner.insert(id,
101          Queue::new(id,
102            QueueName::Sandbox{name:queue_name.to_string().into(),idx:count},
103            backend
104          )
105        );
106        id
107      })
108    })
109    }
110
111    pub fn all_queues(&self) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
112        let inner = self.inner.read();
113        inner
114            .iter()
115            .map(|(k, v)| {
116                (
117                    *k,
118                    v.name().clone(),
119                    if let AnyBackend::Sandbox(sb) = v.backend() {
120                        Some(sb.get_repos())
121                    } else {
122                        None
123                    },
124                )
125            })
126            .collect()
127    }
128
129    pub fn with_all_queues<R>(&self, f: impl FnOnce(&[(QueueId, Queue)]) -> R) -> R {
130        f(&self.inner.read().0)
131    }
132
133    pub fn queues_for_user(
134        &self,
135        user_name: &str,
136    ) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
137        let inner = self.inner.read();
138        inner
139            .iter()
140            .filter_map(|(k, v)| {
141                if let QueueName::Sandbox { name, .. } = v.name() {
142                    if &**name == user_name {
143                        Some((
144                            *k,
145                            v.name().clone(),
146                            if let AnyBackend::Sandbox(sb) = v.backend() {
147                                Some(sb.get_repos())
148                            } else {
149                                None
150                            },
151                        ))
152                    } else {
153                        None
154                    }
155                } else {
156                    None
157                }
158            })
159            .collect()
160    }
161
162    pub fn with_queue<R>(&self, id: QueueId, f: impl FnOnce(Option<&Queue>) -> R) -> R {
163        let inner = self.inner.read();
164        f(inner.get(&id))
165    }
166
167    /// #### Errors
168    #[cfg(feature = "tokio")]
169    pub fn migrate<R>(
170        &self,
171        id: QueueId,
172        then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
173    ) -> eyre::Result<(R, usize)> {
174        self.migrate_i(id, then).map_err(|e| {
175            tracing::error!("Error migrating: {e:#}");
176            e
177        })
178    }
179
180    #[cfg(feature = "tokio")]
181    #[allow(clippy::significant_drop_tightening)]
182    fn migrate_i<R>(
183        &self,
184        id: QueueId,
185        then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
186    ) -> eyre::Result<(R, usize)> {
187        use eyre::eyre;
188        use flams_math_archives::utils::SyncEngine;
189        use flams_utils::impossible;
190
191        let mut inner = self.inner.write();
192        let r = if let Some(queue) = inner.get(&id) {
193            if !matches!(&*queue.0.state.read(), QueueState::Finished { .. }) {
194                return Err(eyre!("Queue {id} not finished"));
195            }
196            if !matches!(queue.backend(), AnyBackend::Sandbox(_)) {
197                return Err(eyre!("Global Queue can not be migrated"));
198            }
199            let AnyBackend::Sandbox(sandbox) = queue.backend() else {
200                unreachable!()
201            };
202            then(sandbox)?
203        } else {
204            return Err(eyre!("No queue {id} found"));
205        };
206        let Some(queue) = inner.remove(&id) else {
207            unreachable!()
208        };
209        let AnyBackend::Sandbox(sandbox) = queue.backend() else {
210            impossible!()
211        };
212        Ok((r, sandbox.migrate::<SyncEngine>()?))
213    }
214
215    #[allow(clippy::significant_drop_tightening)]
216    pub fn delete(&self, id: QueueId) {
217        let mut inner = self.inner.write();
218        if let Some(q) = inner.remove(&id) {
219            let mut s = q.0.state.write();
220            if let QueueState::Running(RunningQueue { queue, blocked, .. }) = &mut *s {
221                queue.clear();
222                blocked.clear();
223            }
224            if matches!(q.name(), QueueName::Global) {
225                inner.insert(id, Queue::new(id, QueueName::Global, AnyBackend::Global));
226            }
227        }
228    }
229
230    /// ### Errors
231    /// if no queue with that id exists
232    #[allow(clippy::result_unit_err)]
233    pub fn start_queue(&self, id: QueueId) -> Result<(), ()> {
234        let sem = self.threads.clone();
235        self.with_queue(id, |q| {
236            let Some(q) = q else { return Err(()) };
237            q.start(sem);
238            Ok(())
239        })
240    }
241
242    /// ### Errors
243    /// if no queue with that id exists
244    #[allow(clippy::result_unit_err)]
245    pub fn requeue_failed(&self, id: QueueId) -> Result<(), ()> {
246        self.with_queue(id, |q| {
247            q.map_or(Err(()), |q| {
248                q.requeue_failed();
249                Ok(())
250            })
251        })
252    }
253
254    pub fn with_global<R>(&self, f: impl FnOnce(&Queue) -> R) -> R {
255        let inner = self.inner.read();
256        f(inner
257            .get(&QueueId::global())
258            .unwrap_or_else(|| unreachable!()))
259    }
260}
261
262#[derive(Debug, Clone)]
263pub(crate) enum Semaphore {
264    Linear,
265    #[cfg(feature = "tokio")]
266    Counting {
267        inner: std::sync::Arc<tokio::sync::Semaphore>,
268        _num: flams_utils::triomphe::Arc<AtomicU8>,
269    },
270}