flams_system/building/
queue.rs

1use super::{
2    queue_manager::{QueueId, Semaphore},
3    BuildResult, BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
4};
5use crate::{
6    backend::{
7        archives::{source_files::SourceEntry, Archive, ArchiveOrGroup},
8        AnyBackend, Backend, GlobalBackend, SandboxedRepository,
9    },
10    formats::{BuildTargetId, FormatOrTargets},
11};
12use either::Either;
13use flams_ontology::uris::{ArchiveId, ArchiveURITrait};
14use flams_utils::time::Timestamp;
15use flams_utils::{
16    change_listener::{ChangeListener, ChangeSender},
17    prelude::{HMap, TreeLike},
18    triomphe::Arc,
19};
20use parking_lot::RwLock;
21use std::{collections::VecDeque, num::NonZeroU32};
22use tracing::{instrument, Instrument};
23
24#[derive(Debug)]
25pub(super) struct TaskMap {
26    pub(super) map: HMap<(ArchiveId, std::sync::Arc<str>), BuildTask>,
27    pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
28    pub(super) counter: NonZeroU32,
29    pub(super) total: usize,
30}
31
32impl Default for TaskMap {
33    fn default() -> Self {
34        Self {
35            map: HMap::default(),
36            dependents: HMap::default(),
37            counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
38            total: 0,
39        }
40    }
41}
42
43#[derive(Debug)]
44pub enum QueueState {
45    Running(RunningQueue),
46    Idle,
47    Finished(FinishedQueue),
48}
49
50#[derive(Debug, Clone)]
51pub enum QueueName {
52    Global,
53    Sandbox { name: std::sync::Arc<str>, idx: u16 },
54}
55impl std::fmt::Display for QueueName {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            Self::Global => f.write_str("global"),
59            Self::Sandbox { name, idx } => {
60                f.write_str(name)?;
61                idx.fmt(f)
62            }
63        }
64    }
65}
66
67#[derive(Debug)]
68pub(super) struct QueueI {
69    backend: AnyBackend,
70    name: QueueName,
71    pub id: QueueId,
72    span: tracing::Span,
73    pub(super) map: RwLock<TaskMap>,
74    sender: ChangeSender<QueueMessage>,
75    pub(super) state: RwLock<QueueState>,
76}
77
78#[derive(Debug, Clone)]
79pub struct Queue(pub(super) Arc<QueueI>);
80
81impl Queue {
82    pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
83        Self(Arc::new(QueueI {
84            id,
85            name,
86            backend,
87            span: tracing::Span::current(),
88            map: RwLock::default(),
89            sender: ChangeSender::new(32),
90            state: RwLock::new(QueueState::Idle),
91        }))
92    }
93
94    #[inline]
95    #[must_use]
96    pub fn backend(&self) -> &AnyBackend {
97        &self.0.backend
98    }
99
100    #[must_use]
101    pub fn listener(&self) -> ChangeListener<QueueMessage> {
102        self.0.sender.listener()
103    }
104
105    #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
106    pub fn state_message(&self) -> QueueMessage {
107        match &*self.0.state.read() {
108            QueueState::Running(RunningQueue {
109                running,
110                queue,
111                blocked,
112                failed,
113                done,
114                ..
115            }) => QueueMessage::Started {
116                running: running.iter().map(BuildTask::as_message).collect(),
117                queue: queue.iter().map(BuildTask::as_message).collect(),
118                blocked: blocked.iter().map(BuildTask::as_message).collect(),
119                failed: failed.iter().map(BuildTask::as_message).collect(),
120                done: done.iter().map(BuildTask::as_message).collect(),
121            },
122            QueueState::Idle => QueueMessage::Idle(
123                self.0
124                    .map
125                    .read()
126                    .map
127                    .values()
128                    .map(BuildTask::as_message)
129                    .collect(),
130            ),
131            QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
132                failed: failed.iter().map(BuildTask::as_message).collect(),
133                done: done.iter().map(BuildTask::as_message).collect(),
134            },
135        }
136    }
137
138    #[inline]
139    #[must_use]
140    pub fn name(&self) -> &QueueName {
141        &self.0.name
142    }
143
144    #[instrument(level = "info",
145    parent=&self.0.span,
146    target = "buildqueue",
147    name = "Running buildqueue",
148    skip_all
149  )]
150    pub fn start(&self, sem: Semaphore) {
151        let mut state = self.0.state.write();
152        if matches!(&*state, QueueState::Running(_)) {
153            return;
154        }
155        let map = self.0.map.read();
156        let mut running = RunningQueue::new(map.total);
157        tracing::info_span!("sorting...").in_scope(|| {
158            Self::sort(&map, &mut running);
159            tracing::info!("Done");
160        });
161        self.0.sender.lazy_send(|| QueueMessage::Started {
162            running: Vec::new(),
163            queue: running.queue.iter().map(BuildTask::as_message).collect(),
164            blocked: Vec::new(),
165            failed: Vec::new(),
166            done: Vec::new(),
167        });
168        *state = QueueState::Running(running);
169        drop(map);
170        drop(state);
171        match sem {
172            Semaphore::Linear => self.run_sync(),
173            #[cfg(feature = "tokio")]
174            Semaphore::Counting { inner: sem, .. } => {
175                tokio::task::spawn(self.clone().run_async(sem).in_current_span());
176            } //.in_current_span());}
177        }
178    }
179
180    #[inline]
181    fn run_sync(&self) {
182        while let Some((task, id)) = self.get_next() {
183            self.run_task(&task, id);
184        }
185        self.finish();
186    }
187
188    #[cfg(feature = "tokio")]
189    async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
190        loop {
191            let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
192                break;
193            };
194            let Some((task, id)) = self.get_next() else {
195                break;
196            };
197            let selfclone = self.clone();
198            let span = tracing::Span::current();
199            tokio::task::spawn_blocking(move || {
200                span.in_scope(move || selfclone.run_task_async(&task, id, permit));
201            });
202        }
203        loop {
204            if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
205            {
206                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
207            } else {
208                break;
209            }
210        }
211        self.finish();
212    }
213
214    fn finish(&self) {
215        let state = &mut *self.0.state.write();
216        let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
217            unreachable!()
218        };
219        let done = std::mem::take(done);
220        let failed = std::mem::take(failed);
221        self.0.sender.lazy_send(|| QueueMessage::Finished {
222            failed: failed.iter().map(BuildTask::as_message).collect(),
223            done: done.iter().map(BuildTask::as_message).collect(),
224        });
225        *state = QueueState::Finished(FinishedQueue { done, failed });
226    }
227
228    #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
229    pub fn requeue_failed(&self) {
230        let mut state = self.0.state.write();
231        let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
232            return;
233        };
234        let failed = std::mem::take(failed);
235        *state = QueueState::Idle;
236        drop(state);
237        if failed.is_empty() {
238            return;
239        }
240        let map = &mut *self.0.map.write();
241        map.dependents.clear();
242        map.counter = unsafe { NonZeroU32::new_unchecked(1) };
243        map.total = failed.iter().map(|t| t.0.steps.len()).sum();
244        map.map.clear();
245        for t in failed {
246            for s in &t.0.steps {
247                *s.0.state.write() = TaskState::None;
248            }
249            map.map.insert(
250                (t.archive().id().clone(), t.0.rel_path.clone()),
251                BuildTask(Arc::new(super::BuildTaskI {
252                    id: BuildTaskId(map.counter),
253                    rel_path: t.0.rel_path.clone(),
254                    archive: t.0.archive.clone(),
255                    steps: t.0.steps.clone(),
256                    source: t.0.source.clone(),
257                })),
258            );
259            map.counter = map.counter.saturating_add(1);
260        }
261        self.0.sender.lazy_send(|| {
262            QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
263        });
264    }
265
266    #[cfg(feature = "tokio")]
267    #[inline]
268    fn run_task_async(
269        &self,
270        task: &BuildTask,
271        target: BuildTargetId,
272        permit: tokio::sync::OwnedSemaphorePermit,
273    ) {
274        self.run_task(task, target);
275        drop(permit);
276    }
277
278    #[allow(clippy::cast_possible_truncation)]
279    #[allow(clippy::significant_drop_tightening)]
280    fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
281        self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
282            id: task.0.id,
283            target,
284        });
285        let BuildResult { log, result } = tracing::info_span!(target:"buildqueue","Running task",
286          archive = %task.0.archive.archive_id(),
287          rel_path = %task.0.rel_path,
288          format = %target
289        )
290        .in_scope(|| (target.run())(&self.0.backend, task));
291        let (idx, _) = task
292            .steps()
293            .iter()
294            .enumerate()
295            .find(|(_, s)| s.0.target == target)
296            .unwrap_or_else(|| unreachable!());
297        let mut lock = self.0.state.write();
298        let QueueState::Running(ref mut state) = &mut *lock else {
299            unreachable!()
300        };
301        state.running.retain(|t| t != task);
302        let eta = state.timer.update(1);
303
304        match result {
305            Err(_deps) => {
306                // TODO: handle dependencies
307                let mut found = false;
308                for s in task.steps() {
309                    if s.0.target == target {
310                        found = true;
311                    }
312                    if found {
313                        *s.0.state.write() = TaskState::Failed;
314                    }
315                }
316                state.failed.push(task.clone());
317                drop(lock);
318
319                self.0
320                    .backend
321                    .with_archive(task.archive().archive_id(), |a| {
322                        let Some(a) = a else { return };
323                        a.save(task.rel_path(), log, target, None);
324                    });
325                self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
326                    id: task.0.id,
327                    target,
328                    eta,
329                });
330            }
331            Ok(data) => {
332                let mut found = false;
333                let mut requeue = false;
334                for s in task.steps() {
335                    if s.0.target == target {
336                        found = true;
337                        *s.0.state.write() = TaskState::Done;
338                    } else if found {
339                        *s.0.state.write() = TaskState::Queued;
340                        requeue = true;
341                        break;
342                    }
343                }
344                if requeue {
345                    state.queue.push_front(task.clone());
346                } else {
347                    state.done.push(task.clone());
348                }
349                drop(lock);
350
351                self.0
352                    .backend
353                    .with_archive(task.archive().archive_id(), |a| {
354                        let Some(a) = a else { return };
355                        a.save(task.rel_path(), log, target, Some(data));
356                    });
357
358                self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
359                    id: task.0.id,
360                    target,
361                    eta,
362                });
363            }
364        }
365    }
366
367    fn maybe_restart(&self) {
368        let mut state = self.0.state.write();
369        if let QueueState::Finished(_) = &mut *state {
370            drop(state);
371            self.requeue_failed();
372        }
373    }
374
375    #[instrument(level = "info",
376    parent=&self.0.span,
377    target = "buildqueue",
378    name = "Queueing tasks",
379    skip_all
380  )]
381    pub fn enqueue_all(&self, target: FormatOrTargets, stale_only: bool, clean: bool) -> usize {
382        self.maybe_restart();
383        if let AnyBackend::Sandbox(b) = &self.0.backend {
384            b.0.repos.write().clear();
385        }
386        let mut acc = 0;
387        for a in &*GlobalBackend::get().all_archives() {
388            let Archive::Local(archive) = a else { continue };
389
390            if let AnyBackend::Sandbox(b) = &self.0.backend {
391                b.0.repos
392                    .write()
393                    .push(SandboxedRepository::Copy(archive.id().clone()));
394                b.copy_archive(archive);
395                if clean {
396                    let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
397                }
398            } else if clean {
399                let _ = std::fs::remove_dir_all(archive.out_dir());
400            }
401            acc += archive.with_sources(|d| {
402                let Some(d) = d.dfs() else { return 0 };
403                let map = &mut *self.0.map.write();
404                Self::enqueue(
405                    map,
406                    &self.0.backend,
407                    a,
408                    target,
409                    stale_only,
410                    d.filter_map(|e| match e {
411                        SourceEntry::Dir(_) => None,
412                        SourceEntry::File(f) => Some(f),
413                    }),
414                )
415            });
416        }
417        acc
418    }
419
420    #[instrument(level = "info",
421    parent=&self.0.span,
422    target = "buildqueue",
423    name = "Queueing tasks",
424    skip_all
425  )]
426    pub fn enqueue_group(
427        &self,
428        id: &ArchiveId,
429        target: FormatOrTargets,
430        stale_only: bool,
431        clean: bool,
432    ) -> usize {
433        self.maybe_restart();
434        if let AnyBackend::Sandbox(b) = &self.0.backend {
435            b.require(id);
436        }
437        self.0.backend.with_archive_or_group(id, |g| match g {
438            None => 0,
439            Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
440                let Some(archive) = a else { return 0 };
441                if clean {
442                    if let AnyBackend::Sandbox(b) = &self.0.backend {
443                        let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
444                    } else if let Archive::Local(a) = archive {
445                        let _ = std::fs::remove_dir_all(a.out_dir());
446                    }
447                }
448                archive.with_sources(|d| {
449                    let Some(d) = d.dfs() else { return 0 };
450                    let map = &mut *self.0.map.write();
451                    Self::enqueue(
452                        map,
453                        &self.0.backend,
454                        archive,
455                        target,
456                        stale_only,
457                        d.filter_map(|e| match e {
458                            SourceEntry::Dir(_) => None,
459                            SourceEntry::File(f) => Some(f),
460                        }),
461                    )
462                })
463            }),
464            Some(ArchiveOrGroup::Group(g)) => {
465                let Some(g) = g.dfs() else { return 0 };
466                let map = &mut *self.0.map.write();
467                let mut ret = 0;
468                for id in g.filter_map(|e| match e {
469                    ArchiveOrGroup::Archive(id) => Some(id),
470                    ArchiveOrGroup::Group(_) => None,
471                }) {
472                    ret += self.0.backend.with_archive(id, |a| {
473                        let Some(archive) = a else { return 0 };
474
475                        if clean {
476                            if let AnyBackend::Sandbox(b) = &self.0.backend {
477                                let _ = std::fs::remove_dir_all(
478                                    b.path_for(archive.id()).join(".flams"),
479                                );
480                            } else if let Archive::Local(a) = archive {
481                                let _ = std::fs::remove_dir_all(a.out_dir());
482                            }
483                        }
484                        archive.with_sources(|d| {
485                            let Some(d) = d.dfs() else { return 0 };
486                            Self::enqueue(
487                                map,
488                                &self.0.backend,
489                                archive,
490                                target,
491                                stale_only,
492                                d.filter_map(|e| match e {
493                                    SourceEntry::Dir(_) => None,
494                                    SourceEntry::File(f) => Some(f),
495                                }),
496                            )
497                        })
498                    });
499                }
500                ret
501            }
502        })
503    }
504
505    #[instrument(level = "info",
506    parent=&self.0.span,
507    target = "buildqueue",
508    name = "Queueing tasks",
509    skip_all
510  )]
511    pub fn enqueue_archive(
512        &self,
513        id: &ArchiveId,
514        target: FormatOrTargets,
515        stale_only: bool,
516        rel_path: Option<&str>,
517        clean: bool,
518    ) -> usize {
519        self.maybe_restart();
520        if let AnyBackend::Sandbox(b) = &self.0.backend {
521            b.require(id);
522        }
523        self.0.backend.with_archive(id, |archive| {
524            let Some(archive) = archive else { return 0 };
525            if clean {
526                if let AnyBackend::Sandbox(b) = &self.0.backend {
527                    let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
528                } else if let Archive::Local(a) = archive {
529                    let _ = std::fs::remove_dir_all(a.out_dir());
530                }
531            }
532            archive.with_sources(|d| match rel_path {
533                None => {
534                    let Some(d) = d.dfs() else { return 0 };
535                    let map = &mut *self.0.map.write();
536                    Self::enqueue(
537                        map,
538                        &self.0.backend,
539                        archive,
540                        target,
541                        stale_only,
542                        d.filter_map(|e| match e {
543                            SourceEntry::Dir(_) => None,
544                            SourceEntry::File(f) => Some(f),
545                        }),
546                    )
547                }
548                Some(p) => {
549                    let Some(d) = d.find(p) else { return 0 };
550                    match d {
551                        Either::Left(d) => {
552                            let Some(d) = d.dfs() else { return 0 };
553                            let map = &mut *self.0.map.write();
554                            Self::enqueue(
555                                map,
556                                &self.0.backend,
557                                archive,
558                                target,
559                                stale_only,
560                                d.filter_map(|e| match e {
561                                    SourceEntry::Dir(_) => None,
562                                    SourceEntry::File(f) => Some(f),
563                                }),
564                            )
565                        }
566                        Either::Right(f) => {
567                            let map = &mut *self.0.map.write();
568                            Self::enqueue(
569                                map,
570                                &self.0.backend,
571                                archive,
572                                target,
573                                stale_only,
574                                std::iter::once(f),
575                            )
576                        }
577                    }
578                }
579            })
580        })
581    }
582}
583
584#[derive(Debug)]
585pub struct RunningQueue {
586    pub(super) queue: VecDeque<BuildTask>,
587    pub(super) blocked: Vec<BuildTask>,
588    pub(super) done: Vec<BuildTask>,
589    pub(super) failed: Vec<BuildTask>,
590    pub(super) running: Vec<BuildTask>,
591    timer: Timer,
592}
593impl RunningQueue {
594    fn new(total: usize) -> Self {
595        Self {
596            queue: VecDeque::new(),
597            failed: Vec::new(),
598            blocked: Vec::new(),
599            done: Vec::new(),
600            running: Vec::new(),
601            timer: Timer::new(total),
602        }
603    }
604}
605
606#[derive(Debug)]
607pub struct FinishedQueue {
608    pub(super) done: Vec<BuildTask>,
609    pub(super) failed: Vec<BuildTask>,
610}
611
612#[derive(Debug)]
613struct Timer {
614    started: Timestamp,
615    steps: usize,
616    done: usize,
617}
618impl Timer {
619    fn new(total: usize) -> Self {
620        Self {
621            started: Timestamp::now(),
622            steps: total,
623            done: 0,
624        }
625    }
626    #[allow(clippy::cast_precision_loss)]
627    fn update(&mut self, dones: u8) -> Eta {
628        self.done += dones as usize;
629        let avg = self.started.since_now() * (1.0 / (self.done as f64));
630        let time_left = avg * ((self.steps - self.done) as f64);
631        Eta {
632            time_left,
633            done: self.done,
634            total: self.steps,
635        }
636    }
637}