1use crate::{backend::backend, FlamsExtension};
2
3use super::{
4 queue_manager::{QueueId, Semaphore},
5 BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
6};
7use flams_math_archives::{
8 artifacts::FileOrString,
9 backend::{AnyBackend, LocalBackend},
10 formats::{BuildResult, BuildTargetId, FormatOrTargets},
11 manager::ArchiveOrGroup,
12 source_files::{SourceEntry, SourceEntryRef},
13 utils::path_ext::RelPath,
14 Archive, LocallyBuilt, MathArchive,
15};
16use flams_utils::{
17 change_listener::{ChangeListener, ChangeSender},
18 prelude::HMap,
19 triomphe::Arc,
20};
21use ftml_ontology::utils::{time::Timestamp, RefTree};
22use ftml_uris::{ArchiveId, UriPath, UriWithArchive};
23use parking_lot::RwLock;
24use std::{collections::VecDeque, num::NonZeroU32};
25use tracing::{instrument, Instrument};
26
27#[derive(Debug)]
28pub(super) struct TaskMap {
29 pub(super) map: HMap<(ArchiveId, UriPath), BuildTask>,
30 pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
31 pub(super) counter: NonZeroU32,
32 pub(super) total: usize,
33}
34
35impl Default for TaskMap {
36 fn default() -> Self {
37 Self {
38 map: HMap::default(),
39 dependents: HMap::default(),
40 counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
41 total: 0,
42 }
43 }
44}
45
46#[derive(Debug)]
47pub enum QueueState {
48 Running(RunningQueue),
49 Idle,
50 Finished(FinishedQueue),
51}
52
53#[derive(Debug, Clone)]
54pub enum QueueName {
55 Global,
56 Sandbox { name: std::sync::Arc<str>, idx: u16 },
57}
58impl std::fmt::Display for QueueName {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 Self::Global => f.write_str("global"),
62 Self::Sandbox { name, idx } => {
63 f.write_str(name)?;
64 idx.fmt(f)
65 }
66 }
67 }
68}
69
70#[derive(Debug)]
71pub(super) struct QueueI {
72 backend: AnyBackend,
73 name: QueueName,
74 pub id: QueueId,
75 span: tracing::Span,
76 pub(super) map: RwLock<TaskMap>,
77 pub(super) sender: ChangeSender<QueueMessage>,
78 pub(super) state: RwLock<QueueState>,
79}
80
81#[derive(Debug, Clone)]
82pub struct Queue(pub(super) Arc<QueueI>);
83
84impl Queue {
85 pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
86 Self(Arc::new(QueueI {
87 id,
88 name,
89 backend,
90 span: tracing::Span::current(),
91 map: RwLock::default(),
92 sender: ChangeSender::new(32),
93 state: RwLock::new(QueueState::Idle),
94 }))
95 }
96
97 #[inline]
98 #[must_use]
99 pub fn backend(&self) -> &AnyBackend {
100 &self.0.backend
101 }
102
103 #[must_use]
104 pub fn listener(&self) -> ChangeListener<QueueMessage> {
105 self.0.sender.listener()
106 }
107
108 #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
109 pub fn state_message(&self) -> QueueMessage {
110 match &*self.0.state.read() {
111 QueueState::Running(RunningQueue {
112 running,
113 queue,
114 blocked,
115 failed,
116 done,
117 ..
118 }) => QueueMessage::Started {
119 running: running.iter().map(BuildTask::as_message).collect(),
120 queue: queue.iter().map(BuildTask::as_message).collect(),
121 blocked: blocked.iter().map(BuildTask::as_message).collect(),
122 failed: failed.iter().map(BuildTask::as_message).collect(),
123 done: done.iter().map(BuildTask::as_message).collect(),
124 },
125 QueueState::Idle => QueueMessage::Idle(
126 self.0
127 .map
128 .read()
129 .map
130 .values()
131 .map(BuildTask::as_message)
132 .collect(),
133 ),
134 QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
135 failed: failed.iter().map(BuildTask::as_message).collect(),
136 done: done.iter().map(BuildTask::as_message).collect(),
137 },
138 }
139 }
140
141 #[inline]
142 #[must_use]
143 pub fn name(&self) -> &QueueName {
144 &self.0.name
145 }
146
147 #[instrument(level = "info",
148 parent=&self.0.span,
149 target = "buildqueue",
150 name = "Running buildqueue",
151 skip_all
152 )]
153 pub fn start(&self, sem: Semaphore) {
154 let mut state = self.0.state.write();
155 if matches!(&*state, QueueState::Running(_)) {
156 return;
157 }
158 let map = self.0.map.read();
159 let mut running = RunningQueue::new(map.total);
160 tracing::info_span!("sorting...").in_scope(|| {
161 Self::sort(&map, &mut running);
162 tracing::info!("Done");
163 });
164 self.0.sender.lazy_send(|| QueueMessage::Started {
165 running: Vec::new(),
166 queue: running.queue.iter().map(BuildTask::as_message).collect(),
167 blocked: Vec::new(),
168 failed: Vec::new(),
169 done: Vec::new(),
170 });
171 *state = QueueState::Running(running);
172 drop(map);
173 drop(state);
174 match sem {
175 Semaphore::Linear => self.run_sync(),
176 #[cfg(feature = "tokio")]
177 Semaphore::Counting { inner: sem, .. } => {
178 tokio::task::spawn(self.clone().run_async(sem).in_current_span());
179 } }
181 }
182
183 #[inline]
184 fn run_sync(&self) {
185 while let Some((task, id)) = self.get_next() {
186 self.run_task(&task, id);
187 }
188 self.finish();
189 }
190
191 #[cfg(feature = "tokio")]
192 async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
193 loop {
194 let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
195 break;
196 };
197 let Some((task, id)) = self.get_next_async().await else {
198 break;
199 };
200 let selfclone = self.clone();
201 let span = tracing::Span::current();
202 tokio::task::spawn_blocking(move || {
203 span.in_scope(move || selfclone.run_task_async(&task, id, permit));
204 });
205 }
206 loop {
207 if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
208 {
209 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
210 } else {
211 break;
212 }
213 }
214 self.finish();
215 }
216
217 fn finish(&self) {
218 let state = &mut *self.0.state.write();
219 let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
220 unreachable!()
221 };
222 let done = std::mem::take(done);
223 let failed = std::mem::take(failed);
224 self.0.sender.lazy_send(|| QueueMessage::Finished {
225 failed: failed.iter().map(BuildTask::as_message).collect(),
226 done: done.iter().map(BuildTask::as_message).collect(),
227 });
228 *state = QueueState::Finished(FinishedQueue { done, failed });
229 }
230
231 #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
232 pub fn requeue_failed(&self) {
233 let mut state = self.0.state.write();
234 let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
235 return;
236 };
237 let failed = std::mem::take(failed);
238 *state = QueueState::Idle;
239 drop(state);
240 if failed.is_empty() {
241 return;
242 }
243 let map = &mut *self.0.map.write();
244 map.dependents.clear();
245 map.counter = unsafe { NonZeroU32::new_unchecked(1) };
246 map.total = failed.iter().map(|t| t.0.steps.len()).sum();
247 map.map.clear();
248 for t in failed {
249 for s in &t.0.steps {
250 s.0.state.set(TaskState::None);
251 }
252 map.map.insert(
253 (t.archive().archive_id().clone(), t.0.rel_path.clone()),
254 BuildTask(Arc::new(super::BuildTaskI {
255 id: BuildTaskId(map.counter),
256 rel_path: t.0.rel_path.clone(),
257 uri: t.0.uri.clone(),
258 steps: t.0.steps.clone(),
259 source: t.0.source.clone(),
260 })),
261 );
262 map.counter = map.counter.saturating_add(1);
263 }
264 self.0.sender.lazy_send(|| {
265 QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
266 });
267 }
268
269 #[cfg(feature = "tokio")]
270 #[inline]
271 fn run_task_async(
272 &self,
273 task: &BuildTask,
274 target: BuildTargetId,
275 permit: tokio::sync::OwnedSemaphorePermit,
276 ) {
277 self.run_task(task, target);
278 drop(permit);
279 }
280
281 #[allow(clippy::cast_possible_truncation)]
282 #[allow(clippy::significant_drop_tightening)]
283 fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
284 self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
285 id: task.0.id,
286 target,
287 });
288 let spec = task.as_build_spec(&self.0.backend);
289 let BuildResult { log, mut result } =
291 tracing::info_span!(target:"buildqueue","Running task",
292 archive = %task.0.uri.archive_id(),
293 rel_path = %task.0.rel_path,
294 format = %target
295 )
296 .in_scope(|| (target.run)(spec));
297 let mut lock = self.0.state.write();
305 let QueueState::Running(ref mut state) = &mut *lock else {
306 unreachable!()
307 };
308 state.running.retain(|t| t != task);
309 let eta = state.timer.update(1);
310 if let Ok(Some(data)) = result.as_ref() {
311 for e in inventory::iter::<FlamsExtension>() {
312 (e.on_build_result)(
314 &self.0.backend,
315 task.document_uri(),
316 task.rel_path(),
317 &**data,
318 );
319 }
320 }
321
322 if let Err(e) = self.0.backend.save(
323 task.document_uri(),
324 Some(task.rel_path()),
325 log,
326 target,
327 result.as_mut().map_or_else(|_| None, Option::take),
328 ) {
329 result = Err(Vec::new());
330 tracing::error!("Error saving build result: {e}");
331 }
332
333 match result {
334 Err(deps) => {
335 let mut found = false;
353 if deps.is_empty() || state.queue.is_empty() {
354 for s in task.steps() {
355 if s.0.target == target {
356 found = true;
357 }
358 if found {
359 s.0.state.set(TaskState::Failed);
360 }
361 }
362 state.failed.push(task.clone());
363 self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
364 id: task.0.id,
365 target,
366 eta,
367 });
368 } else {
369 let mut found = false;
371 for s in task.steps() {
372 if s.0.target == target {
373 found = true;
374 }
375 if found {
376 s.0.state.set(TaskState::Blocked);
377 }
378 }
379 state.blocked.push(task.clone());
380 self.0.sender.lazy_send(|| QueueMessage::TaskBlocked {
381 id: task.0.id,
382 target,
383 eta,
384 });
385 }
386 drop(lock);
387 }
388 Ok(data) => {
389 let mut found = false;
390 let mut requeue = false;
391 for s in task.steps() {
392 if s.0.target == target {
393 found = true;
394 s.0.state.set(TaskState::Done);
395 } else if found {
396 s.0.state.set(TaskState::Queued);
397 requeue = true;
398 break;
399 }
400 }
401 if requeue {
402 state.queue.push_front(task.clone());
403 } else {
404 state.done.push(task.clone());
405 }
406 drop(lock);
407
408 self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
409 id: task.0.id,
410 target,
411 eta,
412 });
413 }
414 }
415 }
416
417 fn maybe_restart(&self) {
418 let mut state = self.0.state.write();
419 if let QueueState::Finished(_) = &mut *state {
420 drop(state);
421 self.requeue_failed();
422 }
423 }
424
425 #[instrument(level = "info",
426 parent=&self.0.span,
427 target = "buildqueue",
428 name = "Queueing tasks",
429 skip_all
430 )]
431 #[deprecated(note = "needs refactoring: archive need not be local, etc.")]
432 pub fn enqueue_all(&self, target: FormatOrTargets<'_>, stale_only: bool, clean: bool) -> usize {
433 self.maybe_restart();
434 if let AnyBackend::Sandbox(b) = &self.0.backend {
435 b.clear();
436 backend().with_archives(|archives| {
437 for a in archives {
438 let Archive::Local(archive) = a else { continue };
439 b.maybe_copy(archive);
440 if clean {
441 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
442 }
443 }
444 b.load_all();
445 });
446 };
447 let mut acc = 0;
448 self.0.backend.with_archives(|archives| {
449 for a in archives {
450 let Archive::Local(archive) = a else { continue };
451 acc += archive.with_sources(|d| {
452 let d = d.dfs();
453 let map = &mut *self.0.map.write();
454 Self::enqueue(
455 map,
456 &self.0.backend,
457 a,
458 target,
459 stale_only,
460 d.filter_map(|e| match e {
461 SourceEntry::Dir(_) => None,
462 SourceEntry::File(f) => Some(f),
463 }),
464 )
465 });
466 }
467 });
468 acc
469 }
470
471 #[instrument(level = "info",
472 parent=&self.0.span,
473 target = "buildqueue",
474 name = "Queueing tasks",
475 skip_all
476 )]
477 #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
478 pub fn enqueue_group(
479 &self,
480 id: &ArchiveId,
481 target: FormatOrTargets,
482 stale_only: bool,
483 clean: bool,
484 ) -> usize {
485 self.maybe_restart();
486 if let AnyBackend::Sandbox(b) = &self.0.backend {
487 b.require(id, false);
488 }
489 self.0.backend.with_archive_or_group(id, |g| match g {
490 None => 0,
491 Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
492 let Some(archive) = a else { return 0 };
493 if clean {
494 if let AnyBackend::Sandbox(b) = &self.0.backend {
495 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
496 } else if let Archive::Local(a) = archive {
497 let _ = std::fs::remove_dir_all(a.out_dir());
498 }
499 }
500 if let Archive::Local(a) = archive {
501 a.with_sources(|d| {
502 let map = &mut *self.0.map.write();
503 Self::enqueue(
504 map,
505 &self.0.backend,
506 archive,
507 target,
508 stale_only,
509 d.dfs().filter_map(|e| match e {
510 SourceEntry::Dir(_) => None,
511 SourceEntry::File(f) => Some(f),
512 }),
513 )
514 })
515 } else {
516 0
517 }
518 }),
519 Some(ArchiveOrGroup::Group(g)) => {
520 let map = &mut *self.0.map.write();
521 let mut ret = 0;
522 for id in g.dfs().filter_map(|e| match e {
523 ArchiveOrGroup::Archive(id) => Some(id),
524 ArchiveOrGroup::Group(_) => None,
525 }) {
526 ret += self.0.backend.with_archive(id, |a| {
527 let Some(archive) = a else { return 0 };
528
529 if clean {
530 if let AnyBackend::Sandbox(b) = &self.0.backend {
531 let _ = std::fs::remove_dir_all(
532 b.path_for(archive.id()).join(".flams"),
533 );
534 } else if let Archive::Local(a) = archive {
535 let _ = std::fs::remove_dir_all(a.out_dir());
536 }
537 }
538 if let Archive::Local(a) = archive {
539 a.with_sources(|d| {
540 Self::enqueue(
541 map,
542 &self.0.backend,
543 archive,
544 target,
545 stale_only,
546 d.dfs().filter_map(|e| match e {
547 SourceEntry::Dir(_) => None,
548 SourceEntry::File(f) => Some(f),
549 }),
550 )
551 })
552 } else {
553 0
554 }
555 });
556 }
557 ret
558 }
559 })
560 }
561
562 #[instrument(level = "info",
563 parent=&self.0.span,
564 target = "buildqueue",
565 name = "Queueing tasks",
566 skip_all
567 )]
568 #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
569 pub fn enqueue_archive(
570 &self,
571 id: &ArchiveId,
572 target: FormatOrTargets,
573 stale_only: bool,
574 rel_path: Option<RelPath<'_>>,
575 clean: bool,
576 ) -> usize {
577 self.maybe_restart();
578 if let AnyBackend::Sandbox(b) = &self.0.backend {
579 b.require(id, true);
580 }
581 self.0.backend.with_archive(id, |archive| {
582 let Some(archive) = archive else { return 0 };
583 if clean {
584 if let AnyBackend::Sandbox(b) = &self.0.backend {
585 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
586 } else if let Archive::Local(a) = archive {
587 let _ = std::fs::remove_dir_all(a.out_dir());
588 }
589 }
590 if let Archive::Local(a) = archive {
591 a.with_sources(|d| match rel_path {
592 None => {
593 let map = &mut *self.0.map.write();
594 Self::enqueue(
595 map,
596 &self.0.backend,
597 archive,
598 target,
599 stale_only,
600 d.dfs().filter_map(|e| match e {
601 SourceEntry::Dir(_) => None,
602 SourceEntry::File(f) => Some(f),
603 }),
604 )
605 }
606 Some(p) => {
607 let Some(d) = d.find(p) else { return 0 };
608 match d {
609 SourceEntryRef::Dir(d) => {
610 let map = &mut *self.0.map.write();
611 Self::enqueue(
612 map,
613 &self.0.backend,
614 archive,
615 target,
616 stale_only,
617 d.dfs().filter_map(|e| match e {
618 SourceEntry::Dir(_) => None,
619 SourceEntry::File(f) => Some(f),
620 }),
621 )
622 }
623 SourceEntryRef::File(f) => {
624 let map = &mut *self.0.map.write();
625 Self::enqueue(
626 map,
627 &self.0.backend,
628 archive,
629 target,
630 stale_only,
631 std::iter::once(f),
632 )
633 }
634 }
635 }
636 })
637 } else {
638 0
639 }
640 })
641 }
642}
643
644#[derive(Debug)]
645pub struct RunningQueue {
646 pub(super) queue: VecDeque<BuildTask>,
647 pub(super) blocked: Vec<BuildTask>,
648 pub(super) done: Vec<BuildTask>,
649 pub(super) failed: Vec<BuildTask>,
650 pub(super) running: Vec<BuildTask>,
651 timer: Timer,
652}
653impl RunningQueue {
654 fn new(total: usize) -> Self {
655 Self {
656 queue: VecDeque::new(),
657 failed: Vec::new(),
658 blocked: Vec::new(),
659 done: Vec::new(),
660 running: Vec::new(),
661 timer: Timer::new(total),
662 }
663 }
664}
665
666#[derive(Debug)]
667pub struct FinishedQueue {
668 pub(super) done: Vec<BuildTask>,
669 pub(super) failed: Vec<BuildTask>,
670}
671
672#[derive(Debug)]
673struct Timer {
674 started: Timestamp,
675 steps: usize,
676 done: usize,
677}
678impl Timer {
679 fn new(total: usize) -> Self {
680 Self {
681 started: Timestamp::now(),
682 steps: total,
683 done: 0,
684 }
685 }
686 #[allow(clippy::cast_precision_loss)]
687 fn update(&mut self, dones: u8) -> Eta {
688 self.done += dones as usize;
689 let avg = self.started.since_now() * (1.0 / (self.done as f64));
690 let time_left = avg * ((self.steps - self.done) as f64);
691 Eta {
692 time_left,
693 done: self.done,
694 total: self.steps,
695 }
696 }
697}