flams_system/building/
queue_manager.rs

1use crate::backend::{AnyBackend, Backend, GlobalBackend, SandboxedBackend, SandboxedRepository};
2use flams_utils::vecmap::VecMap;
3use std::{fmt::Display, num::NonZeroU32, sync::atomic::AtomicU8};
4
5use super::queue::{Queue, QueueName, QueueState, RunningQueue};
6
7#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
8pub struct QueueId(NonZeroU32);
9impl QueueId {
10    #[must_use]
11    pub fn global() -> Self {
12        Self(NonZeroU32::new(1).unwrap_or_else(|| unreachable!()))
13    }
14}
15impl Display for QueueId {
16    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17        write!(f, "queue {}", self.0)
18    }
19}
20impl From<QueueId> for NonZeroU32 {
21    #[inline]
22    fn from(id: QueueId) -> Self {
23        id.0
24    }
25}
26impl From<NonZeroU32> for QueueId {
27    #[inline]
28    fn from(id: NonZeroU32) -> Self {
29        Self(id)
30    }
31}
32
33#[derive(Debug)]
34pub struct QueueManager {
35    inner: parking_lot::RwLock<VecMap<QueueId, Queue>>,
36    threads: Semaphore,
37}
38
39static QUEUE_MANAGER: std::sync::OnceLock<QueueManager> = std::sync::OnceLock::new();
40
41impl QueueManager {
42    #[inline]
43    pub fn clear() {
44        if let Some(m) = QUEUE_MANAGER.get() {
45            m.inner.write().0.clear();
46        }
47    }
48    pub fn initialize(num_threads: u8) {
49        QUEUE_MANAGER.get_or_init(|| {
50            let init = if crate::settings::Settings::get().admin_pwd.is_some() {
51                VecMap::new()
52            } else {
53                vec![(
54                    QueueId::global(),
55                    Queue::new(
56                        QueueId::global(),
57                        QueueName::Global,
58                        GlobalBackend::get().to_any(),
59                    ),
60                )]
61                .into()
62            };
63            Self {
64                inner: parking_lot::RwLock::new(init),
65                threads: {
66                    #[cfg(feature = "tokio")]
67                    {
68                        match num_threads {
69                            0 => Semaphore::Linear,
70                            i => Semaphore::Counting {
71                                inner: std::sync::Arc::new(tokio::sync::Semaphore::new(i as usize)),
72                                _num: flams_utils::triomphe::Arc::new(AtomicU8::new(i)),
73                            },
74                        }
75                    }
76                    #[cfg(not(feature = "tokio"))]
77                    {
78                        Semaphore::Linear
79                    }
80                },
81            }
82        });
83    }
84    /// ### Panics
85    pub fn get() -> &'static Self {
86        QUEUE_MANAGER.get().expect("Queue manager not initialized")
87    }
88
89    pub fn new_queue(&self, queue_name: &str) -> QueueId {
90        super::BUILD_QUEUE_SPAN.in_scope(move || {
91      let mut inner = self.inner.write();
92      let mut count = 0;
93      loop {
94        if inner.0.iter().any(|(_,q)| matches!(q.name(),QueueName::Sandbox{name,idx} if &**name == queue_name && *idx == count)) {
95          count += 1;
96        } else {break}
97      }
98      let sbname = format!("{queue_name}_{count}");
99      tracing::info_span!("Build Queue",name = &sbname).in_scope(|| {
100        let id = QueueId(NonZeroU32::new(inner.0.iter().map(|(k,_)| k.0.get()).max().unwrap_or_default() + 1).unwrap_or_else(|| unreachable!()));
101        let backend = AnyBackend::Sandbox(SandboxedBackend::new(&sbname));
102        inner.insert(id,
103          Queue::new(id,
104            QueueName::Sandbox{name:queue_name.to_string().into(),idx:count},
105            backend
106          )
107        );
108        id
109      })
110    })
111    }
112
113    pub fn all_queues(&self) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
114        let inner = self.inner.read();
115        inner
116            .iter()
117            .map(|(k, v)| {
118                (
119                    *k,
120                    v.name().clone(),
121                    if let AnyBackend::Sandbox(sb) = v.backend() {
122                        Some(sb.get_repos())
123                    } else {
124                        None
125                    },
126                )
127            })
128            .collect()
129    }
130
131    pub fn with_all_queues<R>(&self, f: impl FnOnce(&[(QueueId, Queue)]) -> R) -> R {
132        f(&self.inner.read().0)
133    }
134
135    pub fn queues_for_user(
136        &self,
137        user_name: &str,
138    ) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
139        let inner = self.inner.read();
140        inner
141            .iter()
142            .filter_map(|(k, v)| {
143                if let QueueName::Sandbox { name, .. } = v.name() {
144                    if &**name == user_name {
145                        Some((
146                            *k,
147                            v.name().clone(),
148                            if let AnyBackend::Sandbox(sb) = v.backend() {
149                                Some(sb.get_repos())
150                            } else {
151                                None
152                            },
153                        ))
154                    } else {
155                        None
156                    }
157                } else {
158                    None
159                }
160            })
161            .collect()
162    }
163
164    pub fn with_queue<R>(&self, id: QueueId, f: impl FnOnce(Option<&Queue>) -> R) -> R {
165        let inner = self.inner.read();
166        f(inner.get(&id))
167    }
168
169    /// #### Errors
170    #[cfg(feature = "tokio")]
171    pub fn migrate<R>(
172        &self,
173        id: QueueId,
174        then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
175    ) -> eyre::Result<(R, usize)> {
176        self.migrate_i(id, then).map_err(|e| {
177            tracing::error!("Error migrating: {e:#}");
178            e
179        })
180    }
181
182    #[cfg(feature = "tokio")]
183    #[allow(clippy::significant_drop_tightening)]
184    fn migrate_i<R>(
185        &self,
186        id: QueueId,
187        then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
188    ) -> eyre::Result<(R, usize)> {
189        use eyre::eyre;
190        use flams_utils::impossible;
191
192        let mut inner = self.inner.write();
193        let r = if let Some(queue) = inner.get(&id) {
194            if !matches!(&*queue.0.state.read(), QueueState::Finished { .. }) {
195                return Err(eyre!("Queue {id} not finished"));
196            }
197            if !matches!(queue.backend(), AnyBackend::Sandbox(_)) {
198                return Err(eyre!("Global Queue can not be migrated"));
199            }
200            let AnyBackend::Sandbox(sandbox) = queue.backend() else {
201                unreachable!()
202            };
203            then(sandbox)?
204        } else {
205            return Err(eyre!("No queue {id} found"));
206        };
207        let Some(queue) = inner.remove(&id) else {
208            unreachable!()
209        };
210        let AnyBackend::Sandbox(sandbox) = queue.backend() else {
211            impossible!()
212        };
213        Ok((r, sandbox.migrate()?))
214    }
215
216    #[allow(clippy::significant_drop_tightening)]
217    pub fn delete(&self, id: QueueId) {
218        let mut inner = self.inner.write();
219        if let Some(q) = inner.remove(&id) {
220            let mut s = q.0.state.write();
221            if let QueueState::Running(RunningQueue { queue, blocked, .. }) = &mut *s {
222                queue.clear();
223                blocked.clear();
224            }
225            if matches!(q.name(), QueueName::Global) {
226                inner.insert(
227                    id,
228                    Queue::new(id, QueueName::Global, GlobalBackend::get().to_any()),
229                );
230            }
231        }
232    }
233
234    /// ### Errors
235    /// if no queue with that id exists
236    #[allow(clippy::result_unit_err)]
237    pub fn start_queue(&self, id: QueueId) -> Result<(), ()> {
238        let sem = self.threads.clone();
239        self.with_queue(id, |q| {
240            let Some(q) = q else { return Err(()) };
241            q.start(sem);
242            Ok(())
243        })
244    }
245
246    /// ### Errors
247    /// if no queue with that id exists
248    #[allow(clippy::result_unit_err)]
249    pub fn requeue_failed(&self, id: QueueId) -> Result<(), ()> {
250        self.with_queue(id, |q| {
251            q.map_or(Err(()), |q| {
252                q.requeue_failed();
253                Ok(())
254            })
255        })
256    }
257
258    pub fn with_global<R>(&self, f: impl FnOnce(&Queue) -> R) -> R {
259        let inner = self.inner.read();
260        f(inner
261            .get(&QueueId::global())
262            .unwrap_or_else(|| unreachable!()))
263    }
264}
265
266#[derive(Debug, Clone)]
267pub(crate) enum Semaphore {
268    Linear,
269    #[cfg(feature = "tokio")]
270    Counting {
271        inner: std::sync::Arc<tokio::sync::Semaphore>,
272        _num: flams_utils::triomphe::Arc<AtomicU8>,
273    },
274}