flams_system/building/
queue.rs

1use crate::{backend::backend, FlamsExtension};
2
3use super::{
4    queue_manager::{QueueId, Semaphore},
5    BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
6};
7use flams_math_archives::{
8    backend::{AnyBackend, LocalBackend},
9    formats::{BuildResult, BuildTargetId, FormatOrTargets},
10    manager::ArchiveOrGroup,
11    source_files::{SourceEntry, SourceEntryRef},
12    utils::path_ext::RelPath,
13    Archive, LocallyBuilt, MathArchive,
14};
15use flams_utils::{
16    change_listener::{ChangeListener, ChangeSender},
17    prelude::HMap,
18    triomphe::Arc,
19};
20use ftml_ontology::utils::{time::Timestamp, RefTree};
21use ftml_uris::{ArchiveId, UriPath, UriWithArchive};
22use parking_lot::RwLock;
23use std::{collections::VecDeque, num::NonZeroU32};
24use tracing::{instrument, Instrument};
25
26#[derive(Debug)]
27pub(super) struct TaskMap {
28    pub(super) map: HMap<(ArchiveId, UriPath), BuildTask>,
29    pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
30    pub(super) counter: NonZeroU32,
31    pub(super) total: usize,
32}
33
34impl Default for TaskMap {
35    fn default() -> Self {
36        Self {
37            map: HMap::default(),
38            dependents: HMap::default(),
39            counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
40            total: 0,
41        }
42    }
43}
44
45#[derive(Debug)]
46pub enum QueueState {
47    Running(RunningQueue),
48    Idle,
49    Finished(FinishedQueue),
50}
51
52#[derive(Debug, Clone)]
53pub enum QueueName {
54    Global,
55    Sandbox { name: std::sync::Arc<str>, idx: u16 },
56}
57impl std::fmt::Display for QueueName {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            Self::Global => f.write_str("global"),
61            Self::Sandbox { name, idx } => {
62                f.write_str(name)?;
63                idx.fmt(f)
64            }
65        }
66    }
67}
68
69#[derive(Debug)]
70pub(super) struct QueueI {
71    backend: AnyBackend,
72    name: QueueName,
73    pub id: QueueId,
74    span: tracing::Span,
75    pub(super) map: RwLock<TaskMap>,
76    sender: ChangeSender<QueueMessage>,
77    pub(super) state: RwLock<QueueState>,
78}
79
80#[derive(Debug, Clone)]
81pub struct Queue(pub(super) Arc<QueueI>);
82
83impl Queue {
84    pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
85        Self(Arc::new(QueueI {
86            id,
87            name,
88            backend,
89            span: tracing::Span::current(),
90            map: RwLock::default(),
91            sender: ChangeSender::new(32),
92            state: RwLock::new(QueueState::Idle),
93        }))
94    }
95
96    #[inline]
97    #[must_use]
98    pub fn backend(&self) -> &AnyBackend {
99        &self.0.backend
100    }
101
102    #[must_use]
103    pub fn listener(&self) -> ChangeListener<QueueMessage> {
104        self.0.sender.listener()
105    }
106
107    #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
108    pub fn state_message(&self) -> QueueMessage {
109        match &*self.0.state.read() {
110            QueueState::Running(RunningQueue {
111                running,
112                queue,
113                blocked,
114                failed,
115                done,
116                ..
117            }) => QueueMessage::Started {
118                running: running.iter().map(BuildTask::as_message).collect(),
119                queue: queue.iter().map(BuildTask::as_message).collect(),
120                blocked: blocked.iter().map(BuildTask::as_message).collect(),
121                failed: failed.iter().map(BuildTask::as_message).collect(),
122                done: done.iter().map(BuildTask::as_message).collect(),
123            },
124            QueueState::Idle => QueueMessage::Idle(
125                self.0
126                    .map
127                    .read()
128                    .map
129                    .values()
130                    .map(BuildTask::as_message)
131                    .collect(),
132            ),
133            QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
134                failed: failed.iter().map(BuildTask::as_message).collect(),
135                done: done.iter().map(BuildTask::as_message).collect(),
136            },
137        }
138    }
139
140    #[inline]
141    #[must_use]
142    pub fn name(&self) -> &QueueName {
143        &self.0.name
144    }
145
146    #[instrument(level = "info",
147    parent=&self.0.span,
148    target = "buildqueue",
149    name = "Running buildqueue",
150    skip_all
151  )]
152    pub fn start(&self, sem: Semaphore) {
153        let mut state = self.0.state.write();
154        if matches!(&*state, QueueState::Running(_)) {
155            return;
156        }
157        let map = self.0.map.read();
158        let mut running = RunningQueue::new(map.total);
159        tracing::info_span!("sorting...").in_scope(|| {
160            Self::sort(&map, &mut running);
161            tracing::info!("Done");
162        });
163        self.0.sender.lazy_send(|| QueueMessage::Started {
164            running: Vec::new(),
165            queue: running.queue.iter().map(BuildTask::as_message).collect(),
166            blocked: Vec::new(),
167            failed: Vec::new(),
168            done: Vec::new(),
169        });
170        *state = QueueState::Running(running);
171        drop(map);
172        drop(state);
173        match sem {
174            Semaphore::Linear => self.run_sync(),
175            #[cfg(feature = "tokio")]
176            Semaphore::Counting { inner: sem, .. } => {
177                tokio::task::spawn(self.clone().run_async(sem).in_current_span());
178            } //.in_current_span());}
179        }
180    }
181
182    #[inline]
183    fn run_sync(&self) {
184        while let Some((task, id)) = self.get_next() {
185            self.run_task(&task, id);
186        }
187        self.finish();
188    }
189
190    #[cfg(feature = "tokio")]
191    async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
192        loop {
193            let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
194                break;
195            };
196            let Some((task, id)) = self.get_next() else {
197                break;
198            };
199            let selfclone = self.clone();
200            let span = tracing::Span::current();
201            tokio::task::spawn_blocking(move || {
202                span.in_scope(move || selfclone.run_task_async(&task, id, permit));
203            });
204        }
205        loop {
206            if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
207            {
208                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
209            } else {
210                break;
211            }
212        }
213        self.finish();
214    }
215
216    fn finish(&self) {
217        let state = &mut *self.0.state.write();
218        let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
219            unreachable!()
220        };
221        let done = std::mem::take(done);
222        let failed = std::mem::take(failed);
223        self.0.sender.lazy_send(|| QueueMessage::Finished {
224            failed: failed.iter().map(BuildTask::as_message).collect(),
225            done: done.iter().map(BuildTask::as_message).collect(),
226        });
227        *state = QueueState::Finished(FinishedQueue { done, failed });
228    }
229
230    #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
231    pub fn requeue_failed(&self) {
232        let mut state = self.0.state.write();
233        let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
234            return;
235        };
236        let failed = std::mem::take(failed);
237        *state = QueueState::Idle;
238        drop(state);
239        if failed.is_empty() {
240            return;
241        }
242        let map = &mut *self.0.map.write();
243        map.dependents.clear();
244        map.counter = unsafe { NonZeroU32::new_unchecked(1) };
245        map.total = failed.iter().map(|t| t.0.steps.len()).sum();
246        map.map.clear();
247        for t in failed {
248            for s in &t.0.steps {
249                *s.0.state.write() = TaskState::None;
250            }
251            map.map.insert(
252                (t.archive().archive_id().clone(), t.0.rel_path.clone()),
253                BuildTask(Arc::new(super::BuildTaskI {
254                    id: BuildTaskId(map.counter),
255                    rel_path: t.0.rel_path.clone(),
256                    uri: t.0.uri.clone(),
257                    steps: t.0.steps.clone(),
258                    source: t.0.source.clone(),
259                })),
260            );
261            map.counter = map.counter.saturating_add(1);
262        }
263        self.0.sender.lazy_send(|| {
264            QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
265        });
266    }
267
268    #[cfg(feature = "tokio")]
269    #[inline]
270    fn run_task_async(
271        &self,
272        task: &BuildTask,
273        target: BuildTargetId,
274        permit: tokio::sync::OwnedSemaphorePermit,
275    ) {
276        self.run_task(task, target);
277        drop(permit);
278    }
279
280    #[allow(clippy::cast_possible_truncation)]
281    #[allow(clippy::significant_drop_tightening)]
282    fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
283        self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
284            id: task.0.id,
285            target,
286        });
287        let spec = task.as_build_spec(&self.0.backend);
288        let BuildResult { log, result } = tracing::info_span!(target:"buildqueue","Running task",
289          archive = %task.0.uri.archive_id(),
290          rel_path = %task.0.rel_path,
291          format = %target
292        )
293        .in_scope(|| (target.run)(spec));
294        let (idx, _) = task
295            .steps()
296            .iter()
297            .enumerate()
298            .find(|(_, s)| s.0.target == target)
299            .unwrap_or_else(|| unreachable!());
300        let mut lock = self.0.state.write();
301        let QueueState::Running(ref mut state) = &mut *lock else {
302            unreachable!()
303        };
304        state.running.retain(|t| t != task);
305        let eta = state.timer.update(1);
306
307        match result {
308            Err(_deps) => {
309                // TODO: handle dependencies
310                let mut found = false;
311                for s in task.steps() {
312                    if s.0.target == target {
313                        found = true;
314                    }
315                    if found {
316                        *s.0.state.write() = TaskState::Failed;
317                    }
318                }
319                state.failed.push(task.clone());
320                drop(lock);
321
322                let _ = self.0.backend.save(
323                    task.document_uri(),
324                    Some(task.rel_path()),
325                    log,
326                    target,
327                    None,
328                );
329                self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
330                    id: task.0.id,
331                    target,
332                    eta,
333                });
334            }
335            Ok(data) => {
336                let mut found = false;
337                let mut requeue = false;
338                for s in task.steps() {
339                    if s.0.target == target {
340                        found = true;
341                        *s.0.state.write() = TaskState::Done;
342                    } else if found {
343                        *s.0.state.write() = TaskState::Queued;
344                        requeue = true;
345                        break;
346                    }
347                }
348                if requeue {
349                    state.queue.push_front(task.clone());
350                } else {
351                    state.done.push(task.clone());
352                }
353                drop(lock);
354                if let Some(data) = data.as_ref() {
355                    for e in inventory::iter::<FlamsExtension>() {
356                        (e.on_build_result)(
357                            &self.0.backend,
358                            task.document_uri(),
359                            task.rel_path(),
360                            &**data,
361                        );
362                    }
363                }
364
365                let _ = self.0.backend.save(
366                    task.document_uri(),
367                    Some(task.rel_path()),
368                    log,
369                    target,
370                    data,
371                );
372
373                self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
374                    id: task.0.id,
375                    target,
376                    eta,
377                });
378            }
379        }
380    }
381
382    fn maybe_restart(&self) {
383        let mut state = self.0.state.write();
384        if let QueueState::Finished(_) = &mut *state {
385            drop(state);
386            self.requeue_failed();
387        }
388    }
389
390    #[instrument(level = "info",
391    parent=&self.0.span,
392    target = "buildqueue",
393    name = "Queueing tasks",
394    skip_all
395  )]
396    #[deprecated(note = "needs refactoring: archive need not be local, etc.")]
397    pub fn enqueue_all(&self, target: FormatOrTargets<'_>, stale_only: bool, clean: bool) -> usize {
398        self.maybe_restart();
399        if let AnyBackend::Sandbox(b) = &self.0.backend {
400            b.clear();
401            backend().with_archives(|archives| {
402                for a in archives {
403                    let Archive::Local(archive) = a else { continue };
404                    b.maybe_copy(archive);
405                    if clean {
406                        let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
407                    }
408                }
409                b.load_all();
410            });
411        };
412        let mut acc = 0;
413        self.0.backend.with_archives(|archives| {
414            for a in archives {
415                let Archive::Local(archive) = a else { continue };
416                acc += archive.with_sources(|d| {
417                    let d = d.dfs();
418                    let map = &mut *self.0.map.write();
419                    Self::enqueue(
420                        map,
421                        &self.0.backend,
422                        a,
423                        target,
424                        stale_only,
425                        d.filter_map(|e| match e {
426                            SourceEntry::Dir(_) => None,
427                            SourceEntry::File(f) => Some(f),
428                        }),
429                    )
430                });
431            }
432        });
433        acc
434    }
435
436    #[instrument(level = "info",
437    parent=&self.0.span,
438    target = "buildqueue",
439    name = "Queueing tasks",
440    skip_all
441  )]
442    #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
443    pub fn enqueue_group(
444        &self,
445        id: &ArchiveId,
446        target: FormatOrTargets,
447        stale_only: bool,
448        clean: bool,
449    ) -> usize {
450        self.maybe_restart();
451        if let AnyBackend::Sandbox(b) = &self.0.backend {
452            b.require(id);
453        }
454        self.0.backend.with_archive_or_group(id, |g| match g {
455            None => 0,
456            Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
457                let Some(archive) = a else { return 0 };
458                if clean {
459                    if let AnyBackend::Sandbox(b) = &self.0.backend {
460                        let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
461                    } else if let Archive::Local(a) = archive {
462                        let _ = std::fs::remove_dir_all(a.out_dir());
463                    }
464                }
465                if let Archive::Local(a) = archive {
466                    a.with_sources(|d| {
467                        let map = &mut *self.0.map.write();
468                        Self::enqueue(
469                            map,
470                            &self.0.backend,
471                            archive,
472                            target,
473                            stale_only,
474                            d.dfs().filter_map(|e| match e {
475                                SourceEntry::Dir(_) => None,
476                                SourceEntry::File(f) => Some(f),
477                            }),
478                        )
479                    })
480                } else {
481                    0
482                }
483            }),
484            Some(ArchiveOrGroup::Group(g)) => {
485                let map = &mut *self.0.map.write();
486                let mut ret = 0;
487                for id in g.dfs().filter_map(|e| match e {
488                    ArchiveOrGroup::Archive(id) => Some(id),
489                    ArchiveOrGroup::Group(_) => None,
490                }) {
491                    ret += self.0.backend.with_archive(id, |a| {
492                        let Some(archive) = a else { return 0 };
493
494                        if clean {
495                            if let AnyBackend::Sandbox(b) = &self.0.backend {
496                                let _ = std::fs::remove_dir_all(
497                                    b.path_for(archive.id()).join(".flams"),
498                                );
499                            } else if let Archive::Local(a) = archive {
500                                let _ = std::fs::remove_dir_all(a.out_dir());
501                            }
502                        }
503                        if let Archive::Local(a) = archive {
504                            a.with_sources(|d| {
505                                Self::enqueue(
506                                    map,
507                                    &self.0.backend,
508                                    archive,
509                                    target,
510                                    stale_only,
511                                    d.dfs().filter_map(|e| match e {
512                                        SourceEntry::Dir(_) => None,
513                                        SourceEntry::File(f) => Some(f),
514                                    }),
515                                )
516                            })
517                        } else {
518                            0
519                        }
520                    });
521                }
522                ret
523            }
524        })
525    }
526
527    #[instrument(level = "info",
528    parent=&self.0.span,
529    target = "buildqueue",
530    name = "Queueing tasks",
531    skip_all
532  )]
533    #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
534    pub fn enqueue_archive(
535        &self,
536        id: &ArchiveId,
537        target: FormatOrTargets,
538        stale_only: bool,
539        rel_path: Option<RelPath<'_>>,
540        clean: bool,
541    ) -> usize {
542        self.maybe_restart();
543        if let AnyBackend::Sandbox(b) = &self.0.backend {
544            b.require(id);
545        }
546        self.0.backend.with_archive(id, |archive| {
547            let Some(archive) = archive else { return 0 };
548            if clean {
549                if let AnyBackend::Sandbox(b) = &self.0.backend {
550                    let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
551                } else if let Archive::Local(a) = archive {
552                    let _ = std::fs::remove_dir_all(a.out_dir());
553                }
554            }
555            if let Archive::Local(a) = archive {
556                a.with_sources(|d| match rel_path {
557                    None => {
558                        let map = &mut *self.0.map.write();
559                        Self::enqueue(
560                            map,
561                            &self.0.backend,
562                            archive,
563                            target,
564                            stale_only,
565                            d.dfs().filter_map(|e| match e {
566                                SourceEntry::Dir(_) => None,
567                                SourceEntry::File(f) => Some(f),
568                            }),
569                        )
570                    }
571                    Some(p) => {
572                        let Some(d) = d.find(p) else { return 0 };
573                        match d {
574                            SourceEntryRef::Dir(d) => {
575                                let map = &mut *self.0.map.write();
576                                Self::enqueue(
577                                    map,
578                                    &self.0.backend,
579                                    archive,
580                                    target,
581                                    stale_only,
582                                    d.dfs().filter_map(|e| match e {
583                                        SourceEntry::Dir(_) => None,
584                                        SourceEntry::File(f) => Some(f),
585                                    }),
586                                )
587                            }
588                            SourceEntryRef::File(f) => {
589                                let map = &mut *self.0.map.write();
590                                Self::enqueue(
591                                    map,
592                                    &self.0.backend,
593                                    archive,
594                                    target,
595                                    stale_only,
596                                    std::iter::once(f),
597                                )
598                            }
599                        }
600                    }
601                })
602            } else {
603                0
604            }
605        })
606    }
607}
608
609#[derive(Debug)]
610pub struct RunningQueue {
611    pub(super) queue: VecDeque<BuildTask>,
612    pub(super) blocked: Vec<BuildTask>,
613    pub(super) done: Vec<BuildTask>,
614    pub(super) failed: Vec<BuildTask>,
615    pub(super) running: Vec<BuildTask>,
616    timer: Timer,
617}
618impl RunningQueue {
619    fn new(total: usize) -> Self {
620        Self {
621            queue: VecDeque::new(),
622            failed: Vec::new(),
623            blocked: Vec::new(),
624            done: Vec::new(),
625            running: Vec::new(),
626            timer: Timer::new(total),
627        }
628    }
629}
630
631#[derive(Debug)]
632pub struct FinishedQueue {
633    pub(super) done: Vec<BuildTask>,
634    pub(super) failed: Vec<BuildTask>,
635}
636
637#[derive(Debug)]
638struct Timer {
639    started: Timestamp,
640    steps: usize,
641    done: usize,
642}
643impl Timer {
644    fn new(total: usize) -> Self {
645        Self {
646            started: Timestamp::now(),
647            steps: total,
648            done: 0,
649        }
650    }
651    #[allow(clippy::cast_precision_loss)]
652    fn update(&mut self, dones: u8) -> Eta {
653        self.done += dones as usize;
654        let avg = self.started.since_now() * (1.0 / (self.done as f64));
655        let time_left = avg * ((self.steps - self.done) as f64);
656        Eta {
657            time_left,
658            done: self.done,
659            total: self.steps,
660        }
661    }
662}