1use super::{
2 queue_manager::{QueueId, Semaphore},
3 BuildResult, BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
4};
5use crate::{
6 backend::{
7 archives::{source_files::SourceEntry, Archive, ArchiveOrGroup},
8 AnyBackend, Backend, GlobalBackend, SandboxedRepository,
9 },
10 formats::{BuildTargetId, FormatOrTargets},
11};
12use either::Either;
13use flams_ontology::uris::{ArchiveId, ArchiveURITrait};
14use flams_utils::time::Timestamp;
15use flams_utils::{
16 change_listener::{ChangeListener, ChangeSender},
17 prelude::{HMap, TreeLike},
18 triomphe::Arc,
19};
20use parking_lot::RwLock;
21use std::{collections::VecDeque, num::NonZeroU32};
22use tracing::{instrument, Instrument};
23
24#[derive(Debug)]
25pub(super) struct TaskMap {
26 pub(super) map: HMap<(ArchiveId, std::sync::Arc<str>), BuildTask>,
27 pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
28 pub(super) counter: NonZeroU32,
29 pub(super) total: usize,
30}
31
32impl Default for TaskMap {
33 fn default() -> Self {
34 Self {
35 map: HMap::default(),
36 dependents: HMap::default(),
37 counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
38 total: 0,
39 }
40 }
41}
42
43#[derive(Debug)]
44pub enum QueueState {
45 Running(RunningQueue),
46 Idle,
47 Finished(FinishedQueue),
48}
49
50#[derive(Debug, Clone)]
51pub enum QueueName {
52 Global,
53 Sandbox { name: std::sync::Arc<str>, idx: u16 },
54}
55impl std::fmt::Display for QueueName {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 Self::Global => f.write_str("global"),
59 Self::Sandbox { name, idx } => {
60 f.write_str(name)?;
61 idx.fmt(f)
62 }
63 }
64 }
65}
66
67#[derive(Debug)]
68pub(super) struct QueueI {
69 backend: AnyBackend,
70 name: QueueName,
71 pub id: QueueId,
72 span: tracing::Span,
73 pub(super) map: RwLock<TaskMap>,
74 sender: ChangeSender<QueueMessage>,
75 pub(super) state: RwLock<QueueState>,
76}
77
78#[derive(Debug, Clone)]
79pub struct Queue(pub(super) Arc<QueueI>);
80
81impl Queue {
82 pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
83 Self(Arc::new(QueueI {
84 id,
85 name,
86 backend,
87 span: tracing::Span::current(),
88 map: RwLock::default(),
89 sender: ChangeSender::new(32),
90 state: RwLock::new(QueueState::Idle),
91 }))
92 }
93
94 #[inline]
95 #[must_use]
96 pub fn backend(&self) -> &AnyBackend {
97 &self.0.backend
98 }
99
100 #[must_use]
101 pub fn listener(&self) -> ChangeListener<QueueMessage> {
102 self.0.sender.listener()
103 }
104
105 #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
106 pub fn state_message(&self) -> QueueMessage {
107 match &*self.0.state.read() {
108 QueueState::Running(RunningQueue {
109 running,
110 queue,
111 blocked,
112 failed,
113 done,
114 ..
115 }) => QueueMessage::Started {
116 running: running.iter().map(BuildTask::as_message).collect(),
117 queue: queue.iter().map(BuildTask::as_message).collect(),
118 blocked: blocked.iter().map(BuildTask::as_message).collect(),
119 failed: failed.iter().map(BuildTask::as_message).collect(),
120 done: done.iter().map(BuildTask::as_message).collect(),
121 },
122 QueueState::Idle => QueueMessage::Idle(
123 self.0
124 .map
125 .read()
126 .map
127 .values()
128 .map(BuildTask::as_message)
129 .collect(),
130 ),
131 QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
132 failed: failed.iter().map(BuildTask::as_message).collect(),
133 done: done.iter().map(BuildTask::as_message).collect(),
134 },
135 }
136 }
137
138 #[inline]
139 #[must_use]
140 pub fn name(&self) -> &QueueName {
141 &self.0.name
142 }
143
144 #[instrument(level = "info",
145 parent=&self.0.span,
146 target = "buildqueue",
147 name = "Running buildqueue",
148 skip_all
149 )]
150 pub fn start(&self, sem: Semaphore) {
151 let mut state = self.0.state.write();
152 if matches!(&*state, QueueState::Running(_)) {
153 return;
154 }
155 let map = self.0.map.read();
156 let mut running = RunningQueue::new(map.total);
157 tracing::info_span!("sorting...").in_scope(|| {
158 Self::sort(&map, &mut running);
159 tracing::info!("Done");
160 });
161 self.0.sender.lazy_send(|| QueueMessage::Started {
162 running: Vec::new(),
163 queue: running.queue.iter().map(BuildTask::as_message).collect(),
164 blocked: Vec::new(),
165 failed: Vec::new(),
166 done: Vec::new(),
167 });
168 *state = QueueState::Running(running);
169 drop(map);
170 drop(state);
171 match sem {
172 Semaphore::Linear => self.run_sync(),
173 #[cfg(feature = "tokio")]
174 Semaphore::Counting { inner: sem, .. } => {
175 tokio::task::spawn(self.clone().run_async(sem).in_current_span());
176 } }
178 }
179
180 #[inline]
181 fn run_sync(&self) {
182 while let Some((task, id)) = self.get_next() {
183 self.run_task(&task, id);
184 }
185 self.finish();
186 }
187
188 #[cfg(feature = "tokio")]
189 async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
190 loop {
191 let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
192 break;
193 };
194 let Some((task, id)) = self.get_next() else {
195 break;
196 };
197 let selfclone = self.clone();
198 let span = tracing::Span::current();
199 tokio::task::spawn_blocking(move || {
200 span.in_scope(move || selfclone.run_task_async(&task, id, permit));
201 });
202 }
203 loop {
204 if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
205 {
206 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
207 } else {
208 break;
209 }
210 }
211 self.finish();
212 }
213
214 fn finish(&self) {
215 let state = &mut *self.0.state.write();
216 let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
217 unreachable!()
218 };
219 let done = std::mem::take(done);
220 let failed = std::mem::take(failed);
221 self.0.sender.lazy_send(|| QueueMessage::Finished {
222 failed: failed.iter().map(BuildTask::as_message).collect(),
223 done: done.iter().map(BuildTask::as_message).collect(),
224 });
225 *state = QueueState::Finished(FinishedQueue { done, failed });
226 }
227
228 #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
229 pub fn requeue_failed(&self) {
230 let mut state = self.0.state.write();
231 let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
232 return;
233 };
234 let failed = std::mem::take(failed);
235 *state = QueueState::Idle;
236 drop(state);
237 if failed.is_empty() {
238 return;
239 }
240 let map = &mut *self.0.map.write();
241 map.dependents.clear();
242 map.counter = unsafe { NonZeroU32::new_unchecked(1) };
243 map.total = failed.iter().map(|t| t.0.steps.len()).sum();
244 map.map.clear();
245 for t in failed {
246 for s in &t.0.steps {
247 *s.0.state.write() = TaskState::None;
248 }
249 map.map.insert(
250 (t.archive().id().clone(), t.0.rel_path.clone()),
251 BuildTask(Arc::new(super::BuildTaskI {
252 id: BuildTaskId(map.counter),
253 rel_path: t.0.rel_path.clone(),
254 archive: t.0.archive.clone(),
255 steps: t.0.steps.clone(),
256 source: t.0.source.clone(),
257 })),
258 );
259 map.counter = map.counter.saturating_add(1);
260 }
261 self.0.sender.lazy_send(|| {
262 QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
263 });
264 }
265
266 #[cfg(feature = "tokio")]
267 #[inline]
268 fn run_task_async(
269 &self,
270 task: &BuildTask,
271 target: BuildTargetId,
272 permit: tokio::sync::OwnedSemaphorePermit,
273 ) {
274 self.run_task(task, target);
275 drop(permit);
276 }
277
278 #[allow(clippy::cast_possible_truncation)]
279 #[allow(clippy::significant_drop_tightening)]
280 fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
281 self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
282 id: task.0.id,
283 target,
284 });
285 let BuildResult { log, result } = tracing::info_span!(target:"buildqueue","Running task",
286 archive = %task.0.archive.archive_id(),
287 rel_path = %task.0.rel_path,
288 format = %target
289 )
290 .in_scope(|| (target.run())(&self.0.backend, task));
291 let (idx, _) = task
292 .steps()
293 .iter()
294 .enumerate()
295 .find(|(_, s)| s.0.target == target)
296 .unwrap_or_else(|| unreachable!());
297 let mut lock = self.0.state.write();
298 let QueueState::Running(ref mut state) = &mut *lock else {
299 unreachable!()
300 };
301 state.running.retain(|t| t != task);
302 let eta = state.timer.update(1);
303
304 match result {
305 Err(_deps) => {
306 let mut found = false;
308 for s in task.steps() {
309 if s.0.target == target {
310 found = true;
311 }
312 if found {
313 *s.0.state.write() = TaskState::Failed;
314 }
315 }
316 state.failed.push(task.clone());
317 drop(lock);
318
319 self.0
320 .backend
321 .with_archive(task.archive().archive_id(), |a| {
322 let Some(a) = a else { return };
323 a.save(task.rel_path(), log, target, None);
324 });
325 self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
326 id: task.0.id,
327 target,
328 eta,
329 });
330 }
331 Ok(data) => {
332 let mut found = false;
333 let mut requeue = false;
334 for s in task.steps() {
335 if s.0.target == target {
336 found = true;
337 *s.0.state.write() = TaskState::Done;
338 } else if found {
339 *s.0.state.write() = TaskState::Queued;
340 requeue = true;
341 break;
342 }
343 }
344 if requeue {
345 state.queue.push_front(task.clone());
346 } else {
347 state.done.push(task.clone());
348 }
349 drop(lock);
350
351 self.0
352 .backend
353 .with_archive(task.archive().archive_id(), |a| {
354 let Some(a) = a else { return };
355 a.save(task.rel_path(), log, target, Some(data));
356 });
357
358 self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
359 id: task.0.id,
360 target,
361 eta,
362 });
363 }
364 }
365 }
366
367 fn maybe_restart(&self) {
368 let mut state = self.0.state.write();
369 if let QueueState::Finished(_) = &mut *state {
370 drop(state);
371 self.requeue_failed();
372 }
373 }
374
375 #[instrument(level = "info",
376 parent=&self.0.span,
377 target = "buildqueue",
378 name = "Queueing tasks",
379 skip_all
380 )]
381 pub fn enqueue_all(&self, target: FormatOrTargets, stale_only: bool, clean: bool) -> usize {
382 self.maybe_restart();
383 if let AnyBackend::Sandbox(b) = &self.0.backend {
384 b.0.repos.write().clear();
385 }
386 let mut acc = 0;
387 for a in &*GlobalBackend::get().all_archives() {
388 let Archive::Local(archive) = a else { continue };
389
390 if let AnyBackend::Sandbox(b) = &self.0.backend {
391 b.0.repos
392 .write()
393 .push(SandboxedRepository::Copy(archive.id().clone()));
394 b.copy_archive(archive);
395 if clean {
396 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
397 }
398 } else if clean {
399 let _ = std::fs::remove_dir_all(archive.out_dir());
400 }
401 acc += archive.with_sources(|d| {
402 let Some(d) = d.dfs() else { return 0 };
403 let map = &mut *self.0.map.write();
404 Self::enqueue(
405 map,
406 &self.0.backend,
407 a,
408 target,
409 stale_only,
410 d.filter_map(|e| match e {
411 SourceEntry::Dir(_) => None,
412 SourceEntry::File(f) => Some(f),
413 }),
414 )
415 });
416 }
417 acc
418 }
419
420 #[instrument(level = "info",
421 parent=&self.0.span,
422 target = "buildqueue",
423 name = "Queueing tasks",
424 skip_all
425 )]
426 pub fn enqueue_group(
427 &self,
428 id: &ArchiveId,
429 target: FormatOrTargets,
430 stale_only: bool,
431 clean: bool,
432 ) -> usize {
433 self.maybe_restart();
434 if let AnyBackend::Sandbox(b) = &self.0.backend {
435 b.require(id);
436 }
437 self.0.backend.with_archive_or_group(id, |g| match g {
438 None => 0,
439 Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
440 let Some(archive) = a else { return 0 };
441 if clean {
442 if let AnyBackend::Sandbox(b) = &self.0.backend {
443 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
444 } else if let Archive::Local(a) = archive {
445 let _ = std::fs::remove_dir_all(a.out_dir());
446 }
447 }
448 archive.with_sources(|d| {
449 let Some(d) = d.dfs() else { return 0 };
450 let map = &mut *self.0.map.write();
451 Self::enqueue(
452 map,
453 &self.0.backend,
454 archive,
455 target,
456 stale_only,
457 d.filter_map(|e| match e {
458 SourceEntry::Dir(_) => None,
459 SourceEntry::File(f) => Some(f),
460 }),
461 )
462 })
463 }),
464 Some(ArchiveOrGroup::Group(g)) => {
465 let Some(g) = g.dfs() else { return 0 };
466 let map = &mut *self.0.map.write();
467 let mut ret = 0;
468 for id in g.filter_map(|e| match e {
469 ArchiveOrGroup::Archive(id) => Some(id),
470 ArchiveOrGroup::Group(_) => None,
471 }) {
472 ret += self.0.backend.with_archive(id, |a| {
473 let Some(archive) = a else { return 0 };
474
475 if clean {
476 if let AnyBackend::Sandbox(b) = &self.0.backend {
477 let _ = std::fs::remove_dir_all(
478 b.path_for(archive.id()).join(".flams"),
479 );
480 } else if let Archive::Local(a) = archive {
481 let _ = std::fs::remove_dir_all(a.out_dir());
482 }
483 }
484 archive.with_sources(|d| {
485 let Some(d) = d.dfs() else { return 0 };
486 Self::enqueue(
487 map,
488 &self.0.backend,
489 archive,
490 target,
491 stale_only,
492 d.filter_map(|e| match e {
493 SourceEntry::Dir(_) => None,
494 SourceEntry::File(f) => Some(f),
495 }),
496 )
497 })
498 });
499 }
500 ret
501 }
502 })
503 }
504
505 #[instrument(level = "info",
506 parent=&self.0.span,
507 target = "buildqueue",
508 name = "Queueing tasks",
509 skip_all
510 )]
511 pub fn enqueue_archive(
512 &self,
513 id: &ArchiveId,
514 target: FormatOrTargets,
515 stale_only: bool,
516 rel_path: Option<&str>,
517 clean: bool,
518 ) -> usize {
519 self.maybe_restart();
520 if let AnyBackend::Sandbox(b) = &self.0.backend {
521 b.require(id);
522 }
523 self.0.backend.with_archive(id, |archive| {
524 let Some(archive) = archive else { return 0 };
525 if clean {
526 if let AnyBackend::Sandbox(b) = &self.0.backend {
527 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
528 } else if let Archive::Local(a) = archive {
529 let _ = std::fs::remove_dir_all(a.out_dir());
530 }
531 }
532 archive.with_sources(|d| match rel_path {
533 None => {
534 let Some(d) = d.dfs() else { return 0 };
535 let map = &mut *self.0.map.write();
536 Self::enqueue(
537 map,
538 &self.0.backend,
539 archive,
540 target,
541 stale_only,
542 d.filter_map(|e| match e {
543 SourceEntry::Dir(_) => None,
544 SourceEntry::File(f) => Some(f),
545 }),
546 )
547 }
548 Some(p) => {
549 let Some(d) = d.find(p) else { return 0 };
550 match d {
551 Either::Left(d) => {
552 let Some(d) = d.dfs() else { return 0 };
553 let map = &mut *self.0.map.write();
554 Self::enqueue(
555 map,
556 &self.0.backend,
557 archive,
558 target,
559 stale_only,
560 d.filter_map(|e| match e {
561 SourceEntry::Dir(_) => None,
562 SourceEntry::File(f) => Some(f),
563 }),
564 )
565 }
566 Either::Right(f) => {
567 let map = &mut *self.0.map.write();
568 Self::enqueue(
569 map,
570 &self.0.backend,
571 archive,
572 target,
573 stale_only,
574 std::iter::once(f),
575 )
576 }
577 }
578 }
579 })
580 })
581 }
582}
583
584#[derive(Debug)]
585pub struct RunningQueue {
586 pub(super) queue: VecDeque<BuildTask>,
587 pub(super) blocked: Vec<BuildTask>,
588 pub(super) done: Vec<BuildTask>,
589 pub(super) failed: Vec<BuildTask>,
590 pub(super) running: Vec<BuildTask>,
591 timer: Timer,
592}
593impl RunningQueue {
594 fn new(total: usize) -> Self {
595 Self {
596 queue: VecDeque::new(),
597 failed: Vec::new(),
598 blocked: Vec::new(),
599 done: Vec::new(),
600 running: Vec::new(),
601 timer: Timer::new(total),
602 }
603 }
604}
605
606#[derive(Debug)]
607pub struct FinishedQueue {
608 pub(super) done: Vec<BuildTask>,
609 pub(super) failed: Vec<BuildTask>,
610}
611
612#[derive(Debug)]
613struct Timer {
614 started: Timestamp,
615 steps: usize,
616 done: usize,
617}
618impl Timer {
619 fn new(total: usize) -> Self {
620 Self {
621 started: Timestamp::now(),
622 steps: total,
623 done: 0,
624 }
625 }
626 #[allow(clippy::cast_precision_loss)]
627 fn update(&mut self, dones: u8) -> Eta {
628 self.done += dones as usize;
629 let avg = self.started.since_now() * (1.0 / (self.done as f64));
630 let time_left = avg * ((self.steps - self.done) as f64);
631 Eta {
632 time_left,
633 done: self.done,
634 total: self.steps,
635 }
636 }
637}