1use crate::backend::{AnyBackend, Backend, GlobalBackend, SandboxedBackend, SandboxedRepository};
2use flams_utils::vecmap::VecMap;
3use std::{fmt::Display, num::NonZeroU32, sync::atomic::AtomicU8};
4
5use super::queue::{Queue, QueueName, QueueState, RunningQueue};
6
7#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
8pub struct QueueId(NonZeroU32);
9impl QueueId {
10 #[must_use]
11 pub fn global() -> Self {
12 Self(NonZeroU32::new(1).unwrap_or_else(|| unreachable!()))
13 }
14}
15impl Display for QueueId {
16 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17 write!(f, "queue {}", self.0)
18 }
19}
20impl From<QueueId> for NonZeroU32 {
21 #[inline]
22 fn from(id: QueueId) -> Self {
23 id.0
24 }
25}
26impl From<NonZeroU32> for QueueId {
27 #[inline]
28 fn from(id: NonZeroU32) -> Self {
29 Self(id)
30 }
31}
32
33#[derive(Debug)]
34pub struct QueueManager {
35 inner: parking_lot::RwLock<VecMap<QueueId, Queue>>,
36 threads: Semaphore,
37}
38
39static QUEUE_MANAGER: std::sync::OnceLock<QueueManager> = std::sync::OnceLock::new();
40
41impl QueueManager {
42 #[inline]
43 pub fn clear() {
44 if let Some(m) = QUEUE_MANAGER.get() {
45 m.inner.write().0.clear();
46 }
47 }
48 pub fn initialize(num_threads: u8) {
49 QUEUE_MANAGER.get_or_init(|| {
50 let init = if crate::settings::Settings::get().admin_pwd.is_some() {
51 VecMap::new()
52 } else {
53 vec![(
54 QueueId::global(),
55 Queue::new(
56 QueueId::global(),
57 QueueName::Global,
58 GlobalBackend::get().to_any(),
59 ),
60 )]
61 .into()
62 };
63 Self {
64 inner: parking_lot::RwLock::new(init),
65 threads: {
66 #[cfg(feature = "tokio")]
67 {
68 match num_threads {
69 0 => Semaphore::Linear,
70 i => Semaphore::Counting {
71 inner: std::sync::Arc::new(tokio::sync::Semaphore::new(i as usize)),
72 _num: flams_utils::triomphe::Arc::new(AtomicU8::new(i)),
73 },
74 }
75 }
76 #[cfg(not(feature = "tokio"))]
77 {
78 Semaphore::Linear
79 }
80 },
81 }
82 });
83 }
84 pub fn get() -> &'static Self {
86 QUEUE_MANAGER.get().expect("Queue manager not initialized")
87 }
88
89 pub fn new_queue(&self, queue_name: &str) -> QueueId {
90 super::BUILD_QUEUE_SPAN.in_scope(move || {
91 let mut inner = self.inner.write();
92 let mut count = 0;
93 loop {
94 if inner.0.iter().any(|(_,q)| matches!(q.name(),QueueName::Sandbox{name,idx} if &**name == queue_name && *idx == count)) {
95 count += 1;
96 } else {break}
97 }
98 let sbname = format!("{queue_name}_{count}");
99 tracing::info_span!("Build Queue",name = &sbname).in_scope(|| {
100 let id = QueueId(NonZeroU32::new(inner.0.iter().map(|(k,_)| k.0.get()).max().unwrap_or_default() + 1).unwrap_or_else(|| unreachable!()));
101 let backend = AnyBackend::Sandbox(SandboxedBackend::new(&sbname));
102 inner.insert(id,
103 Queue::new(id,
104 QueueName::Sandbox{name:queue_name.to_string().into(),idx:count},
105 backend
106 )
107 );
108 id
109 })
110 })
111 }
112
113 pub fn all_queues(&self) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
114 let inner = self.inner.read();
115 inner
116 .iter()
117 .map(|(k, v)| {
118 (
119 *k,
120 v.name().clone(),
121 if let AnyBackend::Sandbox(sb) = v.backend() {
122 Some(sb.get_repos())
123 } else {
124 None
125 },
126 )
127 })
128 .collect()
129 }
130
131 pub fn with_all_queues<R>(&self, f: impl FnOnce(&[(QueueId, Queue)]) -> R) -> R {
132 f(&self.inner.read().0)
133 }
134
135 pub fn queues_for_user(
136 &self,
137 user_name: &str,
138 ) -> Vec<(QueueId, QueueName, Option<Vec<SandboxedRepository>>)> {
139 let inner = self.inner.read();
140 inner
141 .iter()
142 .filter_map(|(k, v)| {
143 if let QueueName::Sandbox { name, .. } = v.name() {
144 if &**name == user_name {
145 Some((
146 *k,
147 v.name().clone(),
148 if let AnyBackend::Sandbox(sb) = v.backend() {
149 Some(sb.get_repos())
150 } else {
151 None
152 },
153 ))
154 } else {
155 None
156 }
157 } else {
158 None
159 }
160 })
161 .collect()
162 }
163
164 pub fn with_queue<R>(&self, id: QueueId, f: impl FnOnce(Option<&Queue>) -> R) -> R {
165 let inner = self.inner.read();
166 f(inner.get(&id))
167 }
168
169 #[cfg(feature = "tokio")]
171 pub fn migrate<R>(
172 &self,
173 id: QueueId,
174 then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
175 ) -> eyre::Result<(R, usize)> {
176 self.migrate_i(id, then).map_err(|e| {
177 tracing::error!("Error migrating: {e:#}");
178 e
179 })
180 }
181
182 #[cfg(feature = "tokio")]
183 #[allow(clippy::significant_drop_tightening)]
184 fn migrate_i<R>(
185 &self,
186 id: QueueId,
187 then: impl FnOnce(&SandboxedBackend) -> eyre::Result<R>,
188 ) -> eyre::Result<(R, usize)> {
189 use eyre::eyre;
190 use flams_utils::impossible;
191
192 let mut inner = self.inner.write();
193 let r = if let Some(queue) = inner.get(&id) {
194 if !matches!(&*queue.0.state.read(), QueueState::Finished { .. }) {
195 return Err(eyre!("Queue {id} not finished"));
196 }
197 if !matches!(queue.backend(), AnyBackend::Sandbox(_)) {
198 return Err(eyre!("Global Queue can not be migrated"));
199 }
200 let AnyBackend::Sandbox(sandbox) = queue.backend() else {
201 unreachable!()
202 };
203 then(sandbox)?
204 } else {
205 return Err(eyre!("No queue {id} found"));
206 };
207 let Some(queue) = inner.remove(&id) else {
208 unreachable!()
209 };
210 let AnyBackend::Sandbox(sandbox) = queue.backend() else {
211 impossible!()
212 };
213 Ok((r, sandbox.migrate()?))
214 }
215
216 #[allow(clippy::significant_drop_tightening)]
217 pub fn delete(&self, id: QueueId) {
218 let mut inner = self.inner.write();
219 if let Some(q) = inner.remove(&id) {
220 let mut s = q.0.state.write();
221 if let QueueState::Running(RunningQueue { queue, blocked, .. }) = &mut *s {
222 queue.clear();
223 blocked.clear();
224 }
225 if matches!(q.name(), QueueName::Global) {
226 inner.insert(
227 id,
228 Queue::new(id, QueueName::Global, GlobalBackend::get().to_any()),
229 );
230 }
231 }
232 }
233
234 #[allow(clippy::result_unit_err)]
237 pub fn start_queue(&self, id: QueueId) -> Result<(), ()> {
238 let sem = self.threads.clone();
239 self.with_queue(id, |q| {
240 let Some(q) = q else { return Err(()) };
241 q.start(sem);
242 Ok(())
243 })
244 }
245
246 #[allow(clippy::result_unit_err)]
249 pub fn requeue_failed(&self, id: QueueId) -> Result<(), ()> {
250 self.with_queue(id, |q| {
251 q.map_or(Err(()), |q| {
252 q.requeue_failed();
253 Ok(())
254 })
255 })
256 }
257
258 pub fn with_global<R>(&self, f: impl FnOnce(&Queue) -> R) -> R {
259 let inner = self.inner.read();
260 f(inner
261 .get(&QueueId::global())
262 .unwrap_or_else(|| unreachable!()))
263 }
264}
265
266#[derive(Debug, Clone)]
267pub(crate) enum Semaphore {
268 Linear,
269 #[cfg(feature = "tokio")]
270 Counting {
271 inner: std::sync::Arc<tokio::sync::Semaphore>,
272 _num: flams_utils::triomphe::Arc<AtomicU8>,
273 },
274}