1use crate::{backend::backend, FlamsExtension};
2
3use super::{
4 queue_manager::{QueueId, Semaphore},
5 BuildTask, BuildTaskId, Eta, QueueMessage, TaskRef, TaskState,
6};
7use flams_math_archives::{
8 backend::{AnyBackend, LocalBackend},
9 formats::{BuildResult, BuildTargetId, FormatOrTargets},
10 manager::ArchiveOrGroup,
11 source_files::{SourceEntry, SourceEntryRef},
12 utils::path_ext::RelPath,
13 Archive, LocallyBuilt, MathArchive,
14};
15use flams_utils::{
16 change_listener::{ChangeListener, ChangeSender},
17 prelude::HMap,
18 triomphe::Arc,
19};
20use ftml_ontology::utils::{time::Timestamp, RefTree};
21use ftml_uris::{ArchiveId, UriPath, UriWithArchive};
22use parking_lot::RwLock;
23use std::{collections::VecDeque, num::NonZeroU32};
24use tracing::{instrument, Instrument};
25
26#[derive(Debug)]
27pub(super) struct TaskMap {
28 pub(super) map: HMap<(ArchiveId, UriPath), BuildTask>,
29 pub(super) dependents: HMap<TaskRef, Vec<(BuildTask, BuildTargetId)>>,
30 pub(super) counter: NonZeroU32,
31 pub(super) total: usize,
32}
33
34impl Default for TaskMap {
35 fn default() -> Self {
36 Self {
37 map: HMap::default(),
38 dependents: HMap::default(),
39 counter: NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
40 total: 0,
41 }
42 }
43}
44
45#[derive(Debug)]
46pub enum QueueState {
47 Running(RunningQueue),
48 Idle,
49 Finished(FinishedQueue),
50}
51
52#[derive(Debug, Clone)]
53pub enum QueueName {
54 Global,
55 Sandbox { name: std::sync::Arc<str>, idx: u16 },
56}
57impl std::fmt::Display for QueueName {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 Self::Global => f.write_str("global"),
61 Self::Sandbox { name, idx } => {
62 f.write_str(name)?;
63 idx.fmt(f)
64 }
65 }
66 }
67}
68
69#[derive(Debug)]
70pub(super) struct QueueI {
71 backend: AnyBackend,
72 name: QueueName,
73 pub id: QueueId,
74 span: tracing::Span,
75 pub(super) map: RwLock<TaskMap>,
76 sender: ChangeSender<QueueMessage>,
77 pub(super) state: RwLock<QueueState>,
78}
79
80#[derive(Debug, Clone)]
81pub struct Queue(pub(super) Arc<QueueI>);
82
83impl Queue {
84 pub(crate) fn new(id: QueueId, name: QueueName, backend: AnyBackend) -> Self {
85 Self(Arc::new(QueueI {
86 id,
87 name,
88 backend,
89 span: tracing::Span::current(),
90 map: RwLock::default(),
91 sender: ChangeSender::new(32),
92 state: RwLock::new(QueueState::Idle),
93 }))
94 }
95
96 #[inline]
97 #[must_use]
98 pub fn backend(&self) -> &AnyBackend {
99 &self.0.backend
100 }
101
102 #[must_use]
103 pub fn listener(&self) -> ChangeListener<QueueMessage> {
104 self.0.sender.listener()
105 }
106
107 #[instrument(level="info",parent=&self.0.span,skip_all,name="Collecting queue state")]
108 pub fn state_message(&self) -> QueueMessage {
109 match &*self.0.state.read() {
110 QueueState::Running(RunningQueue {
111 running,
112 queue,
113 blocked,
114 failed,
115 done,
116 ..
117 }) => QueueMessage::Started {
118 running: running.iter().map(BuildTask::as_message).collect(),
119 queue: queue.iter().map(BuildTask::as_message).collect(),
120 blocked: blocked.iter().map(BuildTask::as_message).collect(),
121 failed: failed.iter().map(BuildTask::as_message).collect(),
122 done: done.iter().map(BuildTask::as_message).collect(),
123 },
124 QueueState::Idle => QueueMessage::Idle(
125 self.0
126 .map
127 .read()
128 .map
129 .values()
130 .map(BuildTask::as_message)
131 .collect(),
132 ),
133 QueueState::Finished(FinishedQueue { done, failed }) => QueueMessage::Finished {
134 failed: failed.iter().map(BuildTask::as_message).collect(),
135 done: done.iter().map(BuildTask::as_message).collect(),
136 },
137 }
138 }
139
140 #[inline]
141 #[must_use]
142 pub fn name(&self) -> &QueueName {
143 &self.0.name
144 }
145
146 #[instrument(level = "info",
147 parent=&self.0.span,
148 target = "buildqueue",
149 name = "Running buildqueue",
150 skip_all
151 )]
152 pub fn start(&self, sem: Semaphore) {
153 let mut state = self.0.state.write();
154 if matches!(&*state, QueueState::Running(_)) {
155 return;
156 }
157 let map = self.0.map.read();
158 let mut running = RunningQueue::new(map.total);
159 tracing::info_span!("sorting...").in_scope(|| {
160 Self::sort(&map, &mut running);
161 tracing::info!("Done");
162 });
163 self.0.sender.lazy_send(|| QueueMessage::Started {
164 running: Vec::new(),
165 queue: running.queue.iter().map(BuildTask::as_message).collect(),
166 blocked: Vec::new(),
167 failed: Vec::new(),
168 done: Vec::new(),
169 });
170 *state = QueueState::Running(running);
171 drop(map);
172 drop(state);
173 match sem {
174 Semaphore::Linear => self.run_sync(),
175 #[cfg(feature = "tokio")]
176 Semaphore::Counting { inner: sem, .. } => {
177 tokio::task::spawn(self.clone().run_async(sem).in_current_span());
178 } }
180 }
181
182 #[inline]
183 fn run_sync(&self) {
184 while let Some((task, id)) = self.get_next() {
185 self.run_task(&task, id);
186 }
187 self.finish();
188 }
189
190 #[cfg(feature = "tokio")]
191 async fn run_async(self, sem: std::sync::Arc<tokio::sync::Semaphore>) {
192 loop {
193 let Ok(permit) = tokio::sync::Semaphore::acquire_owned(sem.clone()).await else {
194 break;
195 };
196 let Some((task, id)) = self.get_next() else {
197 break;
198 };
199 let selfclone = self.clone();
200 let span = tracing::Span::current();
201 tokio::task::spawn_blocking(move || {
202 span.in_scope(move || selfclone.run_task_async(&task, id, permit));
203 });
204 }
205 loop {
206 if matches!(&*self.0.state.read(),QueueState::Running(RunningQueue{running,..}) if !running.is_empty())
207 {
208 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
209 } else {
210 break;
211 }
212 }
213 self.finish();
214 }
215
216 fn finish(&self) {
217 let state = &mut *self.0.state.write();
218 let QueueState::Running(RunningQueue { done, failed, .. }) = state else {
219 unreachable!()
220 };
221 let done = std::mem::take(done);
222 let failed = std::mem::take(failed);
223 self.0.sender.lazy_send(|| QueueMessage::Finished {
224 failed: failed.iter().map(BuildTask::as_message).collect(),
225 done: done.iter().map(BuildTask::as_message).collect(),
226 });
227 *state = QueueState::Finished(FinishedQueue { done, failed });
228 }
229
230 #[instrument(level="info",parent=&self.0.span,skip_all,name="Requeueing failed")]
231 pub fn requeue_failed(&self) {
232 let mut state = self.0.state.write();
233 let QueueState::Finished(FinishedQueue { failed, .. }) = &mut *state else {
234 return;
235 };
236 let failed = std::mem::take(failed);
237 *state = QueueState::Idle;
238 drop(state);
239 if failed.is_empty() {
240 return;
241 }
242 let map = &mut *self.0.map.write();
243 map.dependents.clear();
244 map.counter = unsafe { NonZeroU32::new_unchecked(1) };
245 map.total = failed.iter().map(|t| t.0.steps.len()).sum();
246 map.map.clear();
247 for t in failed {
248 for s in &t.0.steps {
249 *s.0.state.write() = TaskState::None;
250 }
251 map.map.insert(
252 (t.archive().archive_id().clone(), t.0.rel_path.clone()),
253 BuildTask(Arc::new(super::BuildTaskI {
254 id: BuildTaskId(map.counter),
255 rel_path: t.0.rel_path.clone(),
256 uri: t.0.uri.clone(),
257 steps: t.0.steps.clone(),
258 source: t.0.source.clone(),
259 })),
260 );
261 map.counter = map.counter.saturating_add(1);
262 }
263 self.0.sender.lazy_send(|| {
264 QueueMessage::Idle(map.map.values().map(BuildTask::as_message).collect())
265 });
266 }
267
268 #[cfg(feature = "tokio")]
269 #[inline]
270 fn run_task_async(
271 &self,
272 task: &BuildTask,
273 target: BuildTargetId,
274 permit: tokio::sync::OwnedSemaphorePermit,
275 ) {
276 self.run_task(task, target);
277 drop(permit);
278 }
279
280 #[allow(clippy::cast_possible_truncation)]
281 #[allow(clippy::significant_drop_tightening)]
282 fn run_task(&self, task: &BuildTask, target: BuildTargetId) {
283 self.0.sender.lazy_send(|| QueueMessage::TaskStarted {
284 id: task.0.id,
285 target,
286 });
287 let spec = task.as_build_spec(&self.0.backend);
288 let BuildResult { log, result } = tracing::info_span!(target:"buildqueue","Running task",
289 archive = %task.0.uri.archive_id(),
290 rel_path = %task.0.rel_path,
291 format = %target
292 )
293 .in_scope(|| (target.run)(spec));
294 let (idx, _) = task
295 .steps()
296 .iter()
297 .enumerate()
298 .find(|(_, s)| s.0.target == target)
299 .unwrap_or_else(|| unreachable!());
300 let mut lock = self.0.state.write();
301 let QueueState::Running(ref mut state) = &mut *lock else {
302 unreachable!()
303 };
304 state.running.retain(|t| t != task);
305 let eta = state.timer.update(1);
306
307 match result {
308 Err(_deps) => {
309 let mut found = false;
311 for s in task.steps() {
312 if s.0.target == target {
313 found = true;
314 }
315 if found {
316 *s.0.state.write() = TaskState::Failed;
317 }
318 }
319 state.failed.push(task.clone());
320 drop(lock);
321
322 let _ = self.0.backend.save(
323 task.document_uri(),
324 Some(task.rel_path()),
325 log,
326 target,
327 None,
328 );
329 self.0.sender.lazy_send(|| QueueMessage::TaskFailed {
330 id: task.0.id,
331 target,
332 eta,
333 });
334 }
335 Ok(data) => {
336 let mut found = false;
337 let mut requeue = false;
338 for s in task.steps() {
339 if s.0.target == target {
340 found = true;
341 *s.0.state.write() = TaskState::Done;
342 } else if found {
343 *s.0.state.write() = TaskState::Queued;
344 requeue = true;
345 break;
346 }
347 }
348 if requeue {
349 state.queue.push_front(task.clone());
350 } else {
351 state.done.push(task.clone());
352 }
353 drop(lock);
354 if let Some(data) = data.as_ref() {
355 for e in inventory::iter::<FlamsExtension>() {
356 (e.on_build_result)(
357 &self.0.backend,
358 task.document_uri(),
359 task.rel_path(),
360 &**data,
361 );
362 }
363 }
364
365 let _ = self.0.backend.save(
366 task.document_uri(),
367 Some(task.rel_path()),
368 log,
369 target,
370 data,
371 );
372
373 self.0.sender.lazy_send(|| QueueMessage::TaskSuccess {
374 id: task.0.id,
375 target,
376 eta,
377 });
378 }
379 }
380 }
381
382 fn maybe_restart(&self) {
383 let mut state = self.0.state.write();
384 if let QueueState::Finished(_) = &mut *state {
385 drop(state);
386 self.requeue_failed();
387 }
388 }
389
390 #[instrument(level = "info",
391 parent=&self.0.span,
392 target = "buildqueue",
393 name = "Queueing tasks",
394 skip_all
395 )]
396 #[deprecated(note = "needs refactoring: archive need not be local, etc.")]
397 pub fn enqueue_all(&self, target: FormatOrTargets<'_>, stale_only: bool, clean: bool) -> usize {
398 self.maybe_restart();
399 if let AnyBackend::Sandbox(b) = &self.0.backend {
400 b.clear();
401 backend().with_archives(|archives| {
402 for a in archives {
403 let Archive::Local(archive) = a else { continue };
404 b.maybe_copy(archive);
405 if clean {
406 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
407 }
408 }
409 b.load_all();
410 });
411 };
412 let mut acc = 0;
413 self.0.backend.with_archives(|archives| {
414 for a in archives {
415 let Archive::Local(archive) = a else { continue };
416 acc += archive.with_sources(|d| {
417 let d = d.dfs();
418 let map = &mut *self.0.map.write();
419 Self::enqueue(
420 map,
421 &self.0.backend,
422 a,
423 target,
424 stale_only,
425 d.filter_map(|e| match e {
426 SourceEntry::Dir(_) => None,
427 SourceEntry::File(f) => Some(f),
428 }),
429 )
430 });
431 }
432 });
433 acc
434 }
435
436 #[instrument(level = "info",
437 parent=&self.0.span,
438 target = "buildqueue",
439 name = "Queueing tasks",
440 skip_all
441 )]
442 #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
443 pub fn enqueue_group(
444 &self,
445 id: &ArchiveId,
446 target: FormatOrTargets,
447 stale_only: bool,
448 clean: bool,
449 ) -> usize {
450 self.maybe_restart();
451 if let AnyBackend::Sandbox(b) = &self.0.backend {
452 b.require(id);
453 }
454 self.0.backend.with_archive_or_group(id, |g| match g {
455 None => 0,
456 Some(ArchiveOrGroup::Archive(id)) => self.0.backend.with_archive(id, |a| {
457 let Some(archive) = a else { return 0 };
458 if clean {
459 if let AnyBackend::Sandbox(b) = &self.0.backend {
460 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
461 } else if let Archive::Local(a) = archive {
462 let _ = std::fs::remove_dir_all(a.out_dir());
463 }
464 }
465 if let Archive::Local(a) = archive {
466 a.with_sources(|d| {
467 let map = &mut *self.0.map.write();
468 Self::enqueue(
469 map,
470 &self.0.backend,
471 archive,
472 target,
473 stale_only,
474 d.dfs().filter_map(|e| match e {
475 SourceEntry::Dir(_) => None,
476 SourceEntry::File(f) => Some(f),
477 }),
478 )
479 })
480 } else {
481 0
482 }
483 }),
484 Some(ArchiveOrGroup::Group(g)) => {
485 let map = &mut *self.0.map.write();
486 let mut ret = 0;
487 for id in g.dfs().filter_map(|e| match e {
488 ArchiveOrGroup::Archive(id) => Some(id),
489 ArchiveOrGroup::Group(_) => None,
490 }) {
491 ret += self.0.backend.with_archive(id, |a| {
492 let Some(archive) = a else { return 0 };
493
494 if clean {
495 if let AnyBackend::Sandbox(b) = &self.0.backend {
496 let _ = std::fs::remove_dir_all(
497 b.path_for(archive.id()).join(".flams"),
498 );
499 } else if let Archive::Local(a) = archive {
500 let _ = std::fs::remove_dir_all(a.out_dir());
501 }
502 }
503 if let Archive::Local(a) = archive {
504 a.with_sources(|d| {
505 Self::enqueue(
506 map,
507 &self.0.backend,
508 archive,
509 target,
510 stale_only,
511 d.dfs().filter_map(|e| match e {
512 SourceEntry::Dir(_) => None,
513 SourceEntry::File(f) => Some(f),
514 }),
515 )
516 })
517 } else {
518 0
519 }
520 });
521 }
522 ret
523 }
524 })
525 }
526
527 #[instrument(level = "info",
528 parent=&self.0.span,
529 target = "buildqueue",
530 name = "Queueing tasks",
531 skip_all
532 )]
533 #[deprecated(note = "needs refatoring: assumes LocalArchive everywhere")]
534 pub fn enqueue_archive(
535 &self,
536 id: &ArchiveId,
537 target: FormatOrTargets,
538 stale_only: bool,
539 rel_path: Option<RelPath<'_>>,
540 clean: bool,
541 ) -> usize {
542 self.maybe_restart();
543 if let AnyBackend::Sandbox(b) = &self.0.backend {
544 b.require(id);
545 }
546 self.0.backend.with_archive(id, |archive| {
547 let Some(archive) = archive else { return 0 };
548 if clean {
549 if let AnyBackend::Sandbox(b) = &self.0.backend {
550 let _ = std::fs::remove_dir_all(b.path_for(archive.id()).join(".flams"));
551 } else if let Archive::Local(a) = archive {
552 let _ = std::fs::remove_dir_all(a.out_dir());
553 }
554 }
555 if let Archive::Local(a) = archive {
556 a.with_sources(|d| match rel_path {
557 None => {
558 let map = &mut *self.0.map.write();
559 Self::enqueue(
560 map,
561 &self.0.backend,
562 archive,
563 target,
564 stale_only,
565 d.dfs().filter_map(|e| match e {
566 SourceEntry::Dir(_) => None,
567 SourceEntry::File(f) => Some(f),
568 }),
569 )
570 }
571 Some(p) => {
572 let Some(d) = d.find(p) else { return 0 };
573 match d {
574 SourceEntryRef::Dir(d) => {
575 let map = &mut *self.0.map.write();
576 Self::enqueue(
577 map,
578 &self.0.backend,
579 archive,
580 target,
581 stale_only,
582 d.dfs().filter_map(|e| match e {
583 SourceEntry::Dir(_) => None,
584 SourceEntry::File(f) => Some(f),
585 }),
586 )
587 }
588 SourceEntryRef::File(f) => {
589 let map = &mut *self.0.map.write();
590 Self::enqueue(
591 map,
592 &self.0.backend,
593 archive,
594 target,
595 stale_only,
596 std::iter::once(f),
597 )
598 }
599 }
600 }
601 })
602 } else {
603 0
604 }
605 })
606 }
607}
608
609#[derive(Debug)]
610pub struct RunningQueue {
611 pub(super) queue: VecDeque<BuildTask>,
612 pub(super) blocked: Vec<BuildTask>,
613 pub(super) done: Vec<BuildTask>,
614 pub(super) failed: Vec<BuildTask>,
615 pub(super) running: Vec<BuildTask>,
616 timer: Timer,
617}
618impl RunningQueue {
619 fn new(total: usize) -> Self {
620 Self {
621 queue: VecDeque::new(),
622 failed: Vec::new(),
623 blocked: Vec::new(),
624 done: Vec::new(),
625 running: Vec::new(),
626 timer: Timer::new(total),
627 }
628 }
629}
630
631#[derive(Debug)]
632pub struct FinishedQueue {
633 pub(super) done: Vec<BuildTask>,
634 pub(super) failed: Vec<BuildTask>,
635}
636
637#[derive(Debug)]
638struct Timer {
639 started: Timestamp,
640 steps: usize,
641 done: usize,
642}
643impl Timer {
644 fn new(total: usize) -> Self {
645 Self {
646 started: Timestamp::now(),
647 steps: total,
648 done: 0,
649 }
650 }
651 #[allow(clippy::cast_precision_loss)]
652 fn update(&mut self, dones: u8) -> Eta {
653 self.done += dones as usize;
654 let avg = self.started.since_now() * (1.0 / (self.done as f64));
655 let time_left = avg * ((self.steps - self.done) as f64);
656 Eta {
657 time_left,
658 done: self.done,
659 total: self.steps,
660 }
661 }
662}