1use flams_math_archives::backend::{AnyBackend, SandboxedBackend, SandboxedRepository};
2use flams_utils::vecmap::VecMap;
3use std::{fmt::Display, num::NonZeroU32, sync::atomic::AtomicU8};
4
5use crate::settings::Settings;
6
7use super::queue::{Queue, QueueName, QueueState, RunningQueue};
8
9#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
10pub struct QueueId(NonZeroU32);
11impl QueueId {
12 #[must_use]
13 pub fn global() -> Self {
14 Self(NonZeroU32::new(1).unwrap_or_else(|| unreachable!()))
15 }
16}
17impl Display for QueueId {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 write!(f, "queue {}", self.0)
20 }
21}
22impl From<QueueId> for NonZeroU32 {
23 #[inline]
24 fn from(id: QueueId) -> Self {
25 id.0
26 }
27}
28impl From<NonZeroU32> for QueueId {
29 #[inline]
30 fn from(id: NonZeroU32) -> Self {
31 Self(id)
32 }
33}
34
35#[derive(Debug)]
36pub struct QueueManager {
37 inner: parking_lot::RwLock<VecMap<QueueId, Queue>>,
38 threads: Semaphore,
39}
40
41static QUEUE_MANAGER: std::sync::OnceLock<QueueManager> = std::sync::OnceLock::new();
42
43impl QueueManager {
44 #[inline]
45 pub fn clear() {
46 if let Some(m) = QUEUE_MANAGER.get() {
47 m.inner.write().0.clear();
48 }
49 }
50 pub fn initialize(num_threads: u8) {
51 QUEUE_MANAGER.get_or_init(|| {
52 let init = if crate::settings::Settings::get().admin_pwd.is_some() {
53 VecMap::new()
54 } else {
55 vec![(
56 QueueId::global(),
57 Queue::new(QueueId::global(), QueueName::Global, AnyBackend::Global),
58 )]
59 .into()
60 };
61 Self {
62 inner: parking_lot::RwLock::new(init),
63 threads: {
64 #[cfg(feature = "tokio")]
65 {
66 match num_threads {
67 0 => Semaphore::Linear,
68 i => Semaphore::Counting {
69 inner: std::sync::Arc::new(tokio::sync::Semaphore::new(i as usize)),
70 _num: flams_utils::triomphe::Arc::new(AtomicU8::new(i)),
71 },
72 }
73 }
74 #[cfg(not(feature = "tokio"))]
75 {
76 Semaphore::Linear
77 }
78 },
79 }
80 });
81 }
82 pub fn get() -> &'static Self {
84 QUEUE_MANAGER.get().expect("Queue manager not initialized")
85 }
86
87 pub fn new_queue(&self, queue_name: &str) -> QueueId {
88 super::BUILD_QUEUE_SPAN.in_scope(move || {
89 let mut inner = self.inner.write();
90 let mut count = 0;
91 loop {
92 if inner.0.iter().any(|(_,q)| matches!(q.name(),QueueName::Sandbox{name,idx} if &**name == queue_name && *idx == count)) {
93 count += 1;
94 } else {break}
95 }
96 let sbname = format!("{queue_name}_{count}");
97 tracing::info_span!("Build Queue",name = &sbname).in_scope(|| {
98 let id = QueueId(NonZeroU32::new(inner.0.iter().map(|(k,_)| k.0.get()).max().unwrap_or_default() + 1).unwrap_or_else(|| unreachable!()));
99 let backend = AnyBackend::Sandbox(SandboxedBackend::new(&sbname,&Settings::get().temp_dir()));
100 inner.insert(id,
101 Queue::new(id,
102 QueueName::Sandbox{name:queue_name.to_string().into(),idx:count},
103 backend
104 )
105 );
106 id
107 })
108 })
109 }
110
111 pub fn all_queues(&self) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
112 let inner = self.inner.read();
113 inner
114 .iter()
115 .map(|(k, v)| {
116 (
117 *k,
118 v.name().clone(),
119 if let AnyBackend::Sandbox(sb) = v.backend() {
120 Some(sb.get_repos())
121 } else {
122 None
123 },
124 )
125 })
126 .collect()
127 }
128
129 pub fn with_all_queues<R>(&self, f: impl FnOnce(&[(QueueId, Queue)]) -> R) -> R {
130 f(&self.inner.read().0)
131 }
132
133 pub fn queues_for_user(
134 &self,
135 user_name: &str,
136 ) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
137 let inner = self.inner.read();
138 inner
139 .iter()
140 .filter_map(|(k, v)| {
141 if let QueueName::Sandbox { name, .. } = v.name() {
142 if &**name == user_name {
143 Some((
144 *k,
145 v.name().clone(),
146 if let AnyBackend::Sandbox(sb) = v.backend() {
147 Some(sb.get_repos())
148 } else {
149 None
150 },
151 ))
152 } else {
153 None
154 }
155 } else {
156 None
157 }
158 })
159 .collect()
160 }
161
162 pub fn with_queue<R>(&self, id: QueueId, f: impl FnOnce(Option<&Queue>) -> R) -> R {
163 let inner = self.inner.read();
164 f(inner.get(&id))
165 }
166
167 #[cfg(feature = "tokio")]
169 pub fn migrate<R>(
170 &self,
171 id: QueueId,
172 then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
173 ) -> eyre::Result<(R, usize)> {
174 self.migrate_i(id, then).map_err(|e| {
175 tracing::error!("Error migrating: {e:#}");
176 e
177 })
178 }
179
180 #[cfg(feature = "tokio")]
181 #[allow(clippy::significant_drop_tightening)]
182 fn migrate_i<R>(
183 &self,
184 id: QueueId,
185 then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
186 ) -> eyre::Result<(R, usize)> {
187 use eyre::eyre;
188 use flams_math_archives::utils::SyncEngine;
189 use flams_utils::impossible;
190
191 let mut inner = self.inner.write();
192 let r = if let Some(queue) = inner.get(&id) {
193 if !matches!(&*queue.0.state.read(), QueueState::Finished { .. }) {
194 return Err(eyre!("Queue {id} not finished"));
195 }
196 if !matches!(queue.backend(), AnyBackend::Sandbox(_)) {
197 return Err(eyre!("Global Queue can not be migrated"));
198 }
199 let AnyBackend::Sandbox(sandbox) = queue.backend() else {
200 unreachable!()
201 };
202 then(sandbox)?
203 } else {
204 return Err(eyre!("No queue {id} found"));
205 };
206 let Some(queue) = inner.remove(&id) else {
207 unreachable!()
208 };
209 let AnyBackend::Sandbox(sandbox) = queue.backend() else {
210 impossible!()
211 };
212 Ok((r, sandbox.migrate::<SyncEngine>()?))
213 }
214
215 #[allow(clippy::significant_drop_tightening)]
216 pub fn delete(&self, id: QueueId) {
217 let mut inner = self.inner.write();
218 if let Some(q) = inner.remove(&id) {
219 let mut s = q.0.state.write();
220 if let QueueState::Running(RunningQueue { queue, blocked, .. }) = &mut *s {
221 queue.clear();
222 blocked.clear();
223 }
224 if matches!(q.name(), QueueName::Global) {
225 inner.insert(id, Queue::new(id, QueueName::Global, AnyBackend::Global));
226 }
227 }
228 }
229
230 #[allow(clippy::result_unit_err)]
233 pub fn start_queue(&self, id: QueueId) -> Result<(), ()> {
234 let sem = self.threads.clone();
235 self.with_queue(id, |q| {
236 let Some(q) = q else { return Err(()) };
237 q.start(sem);
238 Ok(())
239 })
240 }
241
242 #[allow(clippy::result_unit_err)]
245 pub fn requeue_failed(&self, id: QueueId) -> Result<(), ()> {
246 self.with_queue(id, |q| {
247 q.map_or(Err(()), |q| {
248 q.requeue_failed();
249 Ok(())
250 })
251 })
252 }
253
254 pub fn with_global<R>(&self, f: impl FnOnce(&Queue) -> R) -> R {
255 let inner = self.inner.read();
256 f(inner
257 .get(&QueueId::global())
258 .unwrap_or_else(|| unreachable!()))
259 }
260}
261
262#[derive(Debug, Clone)]
263pub(crate) enum Semaphore {
264 Linear,
265 #[cfg(feature = "tokio")]
266 Counting {
267 inner: std::sync::Arc<tokio::sync::Semaphore>,
268 _num: flams_utils::triomphe::Arc<AtomicU8>,
269 },
270}