Skip to main content

flams_system/building/
queue.rs

1use crate::{backend::backend, FlamsExtension};
2
3use super::{
4    queue_manager::{QueueId, Semaphore},
5    BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
6};
7use flams_math_archives::{
8    artifacts::FileOrString,
9    backend::{AnyBackend, LocalBackend},
10    formats::{BuildResult, BuildTargetId, FormatOrTargets},
11    manager::ArchiveOrGroup,
12    source_files::{SourceEntry, SourceEntryRef},
13    utils::path_ext::RelPath,
14    Archive, LocallyBuilt, MathArchive,
15};
16use flams_utils::{
17    change_listener::{ChangeListener, ChangeSender},
18    prelude::HMap,
19    triomphe::Arc,
20};
21use ftml_ontology::utils::{time::Timestamp, RefTree};
22use ftml_uris::{ArchiveId, UriPath, UriWithArchive};
23use parking_lot::RwLock;
24use std::{collections::VecDeque, num::NonZeroU32};
25use tracing::{instrument, Instrument};
26
27#[derive(Debug)]
28pub(super) struct TaskMap {
29    pub(super) map: HMap<(ArchiveId, UriPath), BuildTask>,
30    pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
31    pub(super) counter: NonZeroU32,
32    pub(super) total: usize,
33}
34
35impl Default for TaskMap {
36    fn default() -> Self {
37        Self {
38            map: HMap::default(),
39            dependents: HMap::default(),
40            counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
41            total: 0,
42        }
43    }
44}
45
46#[derive(Debug)]
47pub enum QueueState {
48    Running(RunningQueue),
49    Idle,
50    Finished(FinishedQueue),
51}
52
53#[derive(Debug, Clone)]
54pub enum QueueName {
55    Global,
56    Sandbox { name: std::sync::Arc<str>, idx: u16 },
57}
58impl std::fmt::Display for QueueName {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::Global => f.write_str("global"),
62            Self::Sandbox { name, idx } => {
63                f.write_str(name)?;
64                idx.fmt(f)
65            }
66        }
67    }
68}
69
70#[derive(Debug)]
71pub(super) struct QueueI {
72    backend: AnyBackend,
73    name: QueueName,
74    pub id: QueueId,
75    span: tracing::Span,
76    pub(super) map: RwLock<TaskMap>,
77    pub(super) sender: ChangeSender<QueueMessage>,
78    pub(super) state: RwLock<QueueState>,
79}
80
81#[derive(Debug, Clone)]
82pub struct Queue(pub(super) Arc<QueueI>);
83
84impl Queue {
85    pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
86        Self(Arc::new(QueueI {
87            id,
88            name,
89            backend,
90            span: tracing::Span::current(),
91            map: RwLock::default(),
92            sender: ChangeSender::new(32),
93            state: RwLock::new(QueueState::Idle),
94        }))
95    }
96
97    #[inline]
98    #[must_use]
99    pub fn backend(&self) -> &AnyBackend {
100        &self.0.backend
101    }
102
103    #[must_use]
104    pub fn listener(&self) -> ChangeListener<QueueMessage> {
105        self.0.sender.listener()
106    }
107
108    #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
109    pub fn state_message(&self) -> QueueMessage {
110        match &*self.0.state.read() {
111            QueueState::Running(RunningQueue {
112                running,
113                queue,
114                blocked,
115                failed,
116                done,
117                ..
118            }) => QueueMessage::Started {
119                running: running.iter().map(BuildTask::as_message).collect(),
120                queue: queue.iter().map(BuildTask::as_message).collect(),
121                blocked: blocked.iter().map(BuildTask::as_message).collect(),
122                failed: failed.iter().map(BuildTask::as_message).collect(),
123                done: done.iter().map(BuildTask::as_message).collect(),
124            },
125            QueueState::Idle => QueueMessage::Idle(
126                self.0
127                    .map
128                    .read()
129                    .map
130                    .values()
131                    .map(BuildTask::as_message)
132                    .collect(),
133            ),
134            QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
135                failed: failed.iter().map(BuildTask::as_message).collect(),
136                done: done.iter().map(BuildTask::as_message).collect(),
137            },
138        }
139    }
140
141    #[inline]
142    #[must_use]
143    pub fn name(&self) -> &QueueName {
144        &self.0.name
145    }
146
147    #[instrument(level = "info",
148    parent=&self.0.span,
149    target = "buildqueue",
150    name = "Running buildqueue",
151    skip_all
152  )]
153    pub fn start(&self, sem: Semaphore) {
154        let mut state = self.0.state.write();
155        if matches!(&*state, QueueState::Running(_)) {
156            return;
157        }
158        let map = self.0.map.read();
159        let mut running = RunningQueue::new(map.total);
160        tracing::info_span!("sorting...").in_scope(|| {
161            Self::sort(&map, &mut running);
162            tracing::info!("Done");
163        });
164        self.0.sender.lazy_send(|| QueueMessage::Started {
165            running: Vec::new(),
166            queue: running.queue.iter().map(BuildTask::as_message).collect(),
167            blocked: Vec::new(),
168            failed: Vec::new(),
169            done: Vec::new(),
170        });
171        *state = QueueState::Running(running);
172        drop(map);
173        drop(state);
174        match sem {
175            Semaphore::Linear => self.run_sync(),
176            #[cfg(feature = "tokio")]
177            Semaphore::Counting { inner: sem, .. } => {
178                tokio::task::spawn(self.clone().run_async(sem).in_current_span());
179            } //.in_current_span());}
180        }
181    }
182
183    #[inline]
184    fn run_sync(&self) {
185        while let Some((task, id)) = self.get_next() {
186            self.run_task(&task, id);
187        }
188        self.finish();
189    }
190
191    #[cfg(feature = "tokio")]
192    async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
193        loop {
194            let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
195                break;
196            };
197            let Some((task, id)) = self.get_next_async().await else {
198                break;
199            };
200            let selfclone = self.clone();
201            let span = tracing::Span::current();
202            tokio::task::spawn_blocking(move || {
203                span.in_scope(move || selfclone.run_task_async(&task, id, permit));
204            });
205        }
206        loop {
207            if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
208            {
209                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
210            } else {
211                break;
212            }
213        }
214        self.finish();
215    }
216
217    fn finish(&self) {
218        let state = &mut *self.0.state.write();
219        let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
220            unreachable!()
221        };
222        let done = std::mem::take(done);
223        let failed = std::mem::take(failed);
224        self.0.sender.lazy_send(|| QueueMessage::Finished {
225            failed: failed.iter().map(BuildTask::as_message).collect(),
226            done: done.iter().map(BuildTask::as_message).collect(),
227        });
228        *state = QueueState::Finished(FinishedQueue { done, failed });
229    }
230
231    #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
232    pub fn requeue_failed(&self) {
233        let mut state = self.0.state.write();
234        let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
235            return;
236        };
237        let failed = std::mem::take(failed);
238        *state = QueueState::Idle;
239        drop(state);
240        if failed.is_empty() {
241            return;
242        }
243        let map = &mut *self.0.map.write();
244        map.dependents.clear();
245        map.counter = unsafe { NonZeroU32::new_unchecked(1) };
246        map.total = failed.iter().map(|t| t.0.steps.len()).sum();
247        map.map.clear();
248        for t in failed {
249            for s in &t.0.steps {
250                s.0.state.set(TaskState::None);
251            }
252            map.map.insert(
253                (t.archive().archive_id().clone(), t.0.rel_path.clone()),
254                BuildTask(Arc::new(super::BuildTaskI {
255                    id: BuildTaskId(map.counter),
256                    rel_path: t.0.rel_path.clone(),
257                    uri: t.0.uri.clone(),
258                    steps: t.0.steps.clone(),
259                    source: t.0.source.clone(),
260                })),
261            );
262            map.counter = map.counter.saturating_add(1);
263        }
264        self.0.sender.lazy_send(|| {
265            QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
266        });
267    }
268
269    #[cfg(feature = "tokio")]
270    #[inline]
271    fn run_task_async(
272        &self,
273        task: &BuildTask,
274        target: BuildTargetId,
275        permit: tokio::sync::OwnedSemaphorePermit,
276    ) {
277        self.run_task(task, target);
278        drop(permit);
279    }
280
281    #[allow(clippy::cast_possible_truncation)]
282    #[allow(clippy::significant_drop_tightening)]
283    fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
284        self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
285            id: task.0.id,
286            target,
287        });
288        let spec = task.as_build_spec(&self.0.backend);
289        //println!("Running task {target}");
290        let BuildResult { log, mut result } =
291            tracing::info_span!(target:"buildqueue","Running task",
292              archive = %task.0.uri.archive_id(),
293              rel_path = %task.0.rel_path,
294              format = %target
295            )
296            .in_scope(|| (target.run)(spec));
297        //println!("Finished running task {target}");
298        /*let (idx, _) = task
299        .steps()
300        .iter()
301        .enumerate()
302        .find(|(_, s)| s.0.target == target)
303        .unwrap_or_else(|| unreachable!());*/
304        let mut lock = self.0.state.write();
305        let QueueState::Running(ref mut state) = &mut *lock else {
306            unreachable!()
307        };
308        state.running.retain(|t| t != task);
309        let eta = state.timer.update(1);
310        if let Ok(Some(data)) = result.as_ref() {
311            for e in inventory::iter::<FlamsExtension>() {
312                //println!("Passing {} to {}", data.kind(), e.name);
313                (e.on_build_result)(
314                    &self.0.backend,
315                    task.document_uri(),
316                    task.rel_path(),
317                    &**data,
318                );
319            }
320        }
321
322        if let Err(e) = self.0.backend.save(
323            task.document_uri(),
324            Some(task.rel_path()),
325            log,
326            target,
327            result.as_mut().map_or_else(|_| None, Option::take),
328        ) {
329            result = Err(Vec::new());
330            tracing::error!("Error saving build result: {e}");
331        }
332
333        match result {
334            Err(deps) => {
335                /*
336                let mut block = false;
337                for d in deps {
338                    match d {
339                        flams_math_archives::formats::TaskDependency::Physical { task, strict } => {
340                            if state.running.iter().chain(state.blocked.iter()).any(|t| task.archive == t.archive().id && task.rel_path == *t.rel_path() && t.get_step(task.target).is_some()) {
341                                block = true;
342                            }
343                        }
344                        flams_math_archives::formats::TaskDependency::Logical { uri, strict } => {
345                            self.backend().with_local_archive(uri.archive_id(), |a| if let Some(a) = a {
346                                a.do
347                            })
348                        }
349
350                    }
351                } */
352                let mut found = false;
353                if deps.is_empty() || state.queue.is_empty() {
354                    for s in task.steps() {
355                        if s.0.target == target {
356                            found = true;
357                        }
358                        if found {
359                            s.0.state.set(TaskState::Failed);
360                        }
361                    }
362                    state.failed.push(task.clone());
363                    self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
364                        id: task.0.id,
365                        target,
366                        eta,
367                    });
368                } else {
369                    // TODO: handle dependencies
370                    let mut found = false;
371                    for s in task.steps() {
372                        if s.0.target == target {
373                            found = true;
374                        }
375                        if found {
376                            s.0.state.set(TaskState::Blocked);
377                        }
378                    }
379                    state.blocked.push(task.clone());
380                    self.0.sender.lazy_send(|| QueueMessage::TaskBlocked {
381                        id: task.0.id,
382                        target,
383                        eta,
384                    });
385                }
386                drop(lock);
387            }
388            Ok(data) => {
389                let mut found = false;
390                let mut requeue = false;
391                for s in task.steps() {
392                    if s.0.target == target {
393                        found = true;
394                        s.0.state.set(TaskState::Done);
395                    } else if found {
396                        s.0.state.set(TaskState::Queued);
397                        requeue = true;
398                        break;
399                    }
400                }
401                if requeue {
402                    state.queue.push_front(task.clone());
403                } else {
404                    state.done.push(task.clone());
405                }
406                drop(lock);
407
408                self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
409                    id: task.0.id,
410                    target,
411                    eta,
412                });
413            }
414        }
415    }
416
417    fn maybe_restart(&self) {
418        let mut state = self.0.state.write();
419        if let QueueState::Finished(_) = &mut *state {
420            drop(state);
421            self.requeue_failed();
422        }
423    }
424
425    #[instrument(level = "info",
426    parent=&self.0.span,
427    target = "buildqueue",
428    name = "Queueing tasks",
429    skip_all
430  )]
431    #[deprecated(note = "needs refactoring: archive need not be local, etc.")]
432    pub fn enqueue_all(&self, target: FormatOrTargets<'_>, stale_only: bool, clean: bool) -> usize {
433        self.maybe_restart();
434        if let AnyBackend::Sandbox(b) = &self.0.backend {
435            b.clear();
436            backend().with_archives(|archives| {
437                for a in archives {
438                    let Archive::Local(archive) = a else { continue };
439                    b.maybe_copy(archive);
440                    if clean {
441                        let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
442                    }
443                }
444                b.load_all();
445            });
446        };
447        let mut acc = 0;
448        self.0.backend.with_archives(|archives| {
449            for a in archives {
450                let Archive::Local(archive) = a else { continue };
451                acc += archive.with_sources(|d| {
452                    let d = d.dfs();
453                    let map = &mut *self.0.map.write();
454                    Self::enqueue(
455                        map,
456                        &self.0.backend,
457                        a,
458                        target,
459                        stale_only,
460                        d.filter_map(|e| match e {
461                            SourceEntry::Dir(_) => None,
462                            SourceEntry::File(f) => Some(f),
463                        }),
464                    )
465                });
466            }
467        });
468        acc
469    }
470
471    #[instrument(level = "info",
472    parent=&self.0.span,
473    target = "buildqueue",
474    name = "Queueing tasks",
475    skip_all
476  )]
477    #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
478    pub fn enqueue_group(
479        &self,
480        id: &ArchiveId,
481        target: FormatOrTargets,
482        stale_only: bool,
483        clean: bool,
484    ) -> usize {
485        self.maybe_restart();
486        if let AnyBackend::Sandbox(b) = &self.0.backend {
487            b.require(id, false);
488        }
489        self.0.backend.with_archive_or_group(id, |g| match g {
490            None => 0,
491            Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
492                let Some(archive) = a else { return 0 };
493                if clean {
494                    if let AnyBackend::Sandbox(b) = &self.0.backend {
495                        let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
496                    } else if let Archive::Local(a) = archive {
497                        let _ = std::fs::remove_dir_all(a.out_dir());
498                    }
499                }
500                if let Archive::Local(a) = archive {
501                    a.with_sources(|d| {
502                        let map = &mut *self.0.map.write();
503                        Self::enqueue(
504                            map,
505                            &self.0.backend,
506                            archive,
507                            target,
508                            stale_only,
509                            d.dfs().filter_map(|e| match e {
510                                SourceEntry::Dir(_) => None,
511                                SourceEntry::File(f) => Some(f),
512                            }),
513                        )
514                    })
515                } else {
516                    0
517                }
518            }),
519            Some(ArchiveOrGroup::Group(g)) => {
520                let map = &mut *self.0.map.write();
521                let mut ret = 0;
522                for id in g.dfs().filter_map(|e| match e {
523                    ArchiveOrGroup::Archive(id) => Some(id),
524                    ArchiveOrGroup::Group(_) => None,
525                }) {
526                    ret += self.0.backend.with_archive(id, |a| {
527                        let Some(archive) = a else { return 0 };
528
529                        if clean {
530                            if let AnyBackend::Sandbox(b) = &self.0.backend {
531                                let _ = std::fs::remove_dir_all(
532                                    b.path_for(archive.id()).join(".flams"),
533                                );
534                            } else if let Archive::Local(a) = archive {
535                                let _ = std::fs::remove_dir_all(a.out_dir());
536                            }
537                        }
538                        if let Archive::Local(a) = archive {
539                            a.with_sources(|d| {
540                                Self::enqueue(
541                                    map,
542                                    &self.0.backend,
543                                    archive,
544                                    target,
545                                    stale_only,
546                                    d.dfs().filter_map(|e| match e {
547                                        SourceEntry::Dir(_) => None,
548                                        SourceEntry::File(f) => Some(f),
549                                    }),
550                                )
551                            })
552                        } else {
553                            0
554                        }
555                    });
556                }
557                ret
558            }
559        })
560    }
561
562    #[instrument(level = "info",
563    parent=&self.0.span,
564    target = "buildqueue",
565    name = "Queueing tasks",
566    skip_all
567  )]
568    #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
569    pub fn enqueue_archive(
570        &self,
571        id: &ArchiveId,
572        target: FormatOrTargets,
573        stale_only: bool,
574        rel_path: Option<RelPath<'_>>,
575        clean: bool,
576    ) -> usize {
577        self.maybe_restart();
578        if let AnyBackend::Sandbox(b) = &self.0.backend {
579            b.require(id, true);
580        }
581        self.0.backend.with_archive(id, |archive| {
582            let Some(archive) = archive else { return 0 };
583            if clean {
584                if let AnyBackend::Sandbox(b) = &self.0.backend {
585                    let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
586                } else if let Archive::Local(a) = archive {
587                    let _ = std::fs::remove_dir_all(a.out_dir());
588                }
589            }
590            if let Archive::Local(a) = archive {
591                a.with_sources(|d| match rel_path {
592                    None => {
593                        let map = &mut *self.0.map.write();
594                        Self::enqueue(
595                            map,
596                            &self.0.backend,
597                            archive,
598                            target,
599                            stale_only,
600                            d.dfs().filter_map(|e| match e {
601                                SourceEntry::Dir(_) => None,
602                                SourceEntry::File(f) => Some(f),
603                            }),
604                        )
605                    }
606                    Some(p) => {
607                        let Some(d) = d.find(p) else { return 0 };
608                        match d {
609                            SourceEntryRef::Dir(d) => {
610                                let map = &mut *self.0.map.write();
611                                Self::enqueue(
612                                    map,
613                                    &self.0.backend,
614                                    archive,
615                                    target,
616                                    stale_only,
617                                    d.dfs().filter_map(|e| match e {
618                                        SourceEntry::Dir(_) => None,
619                                        SourceEntry::File(f) => Some(f),
620                                    }),
621                                )
622                            }
623                            SourceEntryRef::File(f) => {
624                                let map = &mut *self.0.map.write();
625                                Self::enqueue(
626                                    map,
627                                    &self.0.backend,
628                                    archive,
629                                    target,
630                                    stale_only,
631                                    std::iter::once(f),
632                                )
633                            }
634                        }
635                    }
636                })
637            } else {
638                0
639            }
640        })
641    }
642}
643
644#[derive(Debug)]
645pub struct RunningQueue {
646    pub(super) queue: VecDeque<BuildTask>,
647    pub(super) blocked: Vec<BuildTask>,
648    pub(super) done: Vec<BuildTask>,
649    pub(super) failed: Vec<BuildTask>,
650    pub(super) running: Vec<BuildTask>,
651    timer: Timer,
652}
653impl RunningQueue {
654    fn new(total: usize) -> Self {
655        Self {
656            queue: VecDeque::new(),
657            failed: Vec::new(),
658            blocked: Vec::new(),
659            done: Vec::new(),
660            running: Vec::new(),
661            timer: Timer::new(total),
662        }
663    }
664}
665
666#[derive(Debug)]
667pub struct FinishedQueue {
668    pub(super) done: Vec<BuildTask>,
669    pub(super) failed: Vec<BuildTask>,
670}
671
672#[derive(Debug)]
673struct Timer {
674    started: Timestamp,
675    steps: usize,
676    done: usize,
677}
678impl Timer {
679    fn new(total: usize) -> Self {
680        Self {
681            started: Timestamp::now(),
682            steps: total,
683            done: 0,
684        }
685    }
686    #[allow(clippy::cast_precision_loss)]
687    fn update(&mut self, dones: u8) -> Eta {
688        self.done += dones as usize;
689        let avg = self.started.since_now() * (1.0 / (self.done as f64));
690        let time_left = avg * ((self.steps - self.done) as f64);
691        Eta {
692            time_left,
693            done: self.done,
694            total: self.steps,
695        }
696    }
697}