flams_system/building/
queueing.rs1use either::Either;
2use flams_math_archives::{
3 backend::AnyBackend,
4 formats::{BuildTargetId, FormatOrTargets},
5 source_files::{FileState, SourceFile},
6 Archive, MathArchive,
7};
8use flams_utils::{triomphe::Arc, vecmap::VecSet};
9use ftml_uris::UriWithArchive;
10use parking_lot::RwLock;
11use std::collections::hash_map::Entry;
12
13use super::{
14 queue::{Queue, QueueState, RunningQueue, TaskMap},
15 BuildStep, BuildStepI, BuildTask, BuildTaskId, Dependency, TaskState,
16};
17
18impl Queue {
19 #[allow(clippy::significant_drop_in_scrutinee)]
20 pub(super) fn sort(map: &TaskMap, state: &mut RunningQueue) {
21 let RunningQueue {
22 queue,
23 done,
24 blocked,
25 failed,
26 ..
27 } = state;
28 let mut tasks = map.map.values().cloned().collect::<Vec<_>>();
29 let mut weak = true;
30 while !tasks.is_empty() {
31 let mut changed = false;
32 for t in &tasks {
33 let mut has_failed = false;
34 let Some(step) = t.steps().iter().find(|s| {
35 let state = s.0.state.read();
36 if *state == TaskState::Failed {
37 has_failed = true;
38 return false;
39 }
40 !matches!(*state, TaskState::Done)
41 }) else {
42 if has_failed {
43 failed.push(t.clone());
44 } else {
45 done.push(t.clone());
46 }
47 continue;
48 };
49 let mut newstate = TaskState::Queued;
50 for d in step.0.requires.read().iter() {
51 match d {
52 Dependency::Resolved { task, strict, step } if *strict || weak => {
53 match *task
54 .get_step(*step)
55 .unwrap_or_else(|| unreachable!())
56 .0
57 .state
58 .read()
59 {
60 TaskState::Done
61 | TaskState::Queued
62 | TaskState::Failed
63 | TaskState::Running => (),
64 TaskState::Blocked => {
65 newstate = TaskState::Blocked;
66 }
67 TaskState::None => {
68 newstate = TaskState::None;
69 break;
70 }
71 }
72 }
73 _ => (),
74 }
75 }
76 let mut found = false;
77 if newstate == TaskState::None {
78 continue;
79 }
80 changed = true;
81 for s in t.steps() {
82 if s == step {
83 found = true;
84 *s.0.state.write() = newstate;
85 } else if found {
86 *s.0.state.write() = TaskState::Blocked;
87 }
88 }
89 match newstate {
90 TaskState::Blocked => blocked.push(t.clone()),
91 TaskState::Queued => queue.push_back(t.clone()),
92 _ => (),
93 }
94 }
95 if changed {
96 tasks.retain(|t| {
97 t.steps()
98 .iter()
99 .any(|s| *s.0.state.read() == TaskState::None)
100 });
101 } else if weak {
102 weak = false;
103 } else {
104 let tasks = std::mem::take(&mut tasks);
105 for t in tasks {
106 for s in t.steps() {
107 let mut s = s.0.state.write();
108 if *s == TaskState::None {
109 *s = TaskState::Blocked;
110 }
111 }
112 blocked.push(t);
113 }
114 }
115 }
116 }
117
118 pub(super) fn get_next(&self) -> Option<(BuildTask, BuildTargetId)> {
119 loop {
120 let mut state = self.0.state.write();
121 let QueueState::Running(RunningQueue {
122 queue,
123 blocked,
124 running,
125 ..
126 }) = &mut *state
127 else {
128 unreachable!()
129 };
130 if queue.is_empty() && blocked.is_empty() && running.is_empty() {
131 return None;
132 }
133 if let Some((i, target)) = queue
134 .iter()
135 .enumerate()
136 .find_map(|(next, e)| Self::can_be_next(e).map(|t| (next, t)))
137 {
138 let Some(task) = queue.remove(i) else {
139 unreachable!()
140 };
141 *task
142 .get_step(target)
143 .unwrap_or_else(|| unreachable!())
144 .0
145 .state
146 .write() = TaskState::Running;
147 running.push(task.clone());
148 return Some((task, target));
149 }
150 if !running.is_empty() {
151 drop(state);
152 std::thread::sleep(std::time::Duration::from_secs(1));
153 } else if !blocked.is_empty() {
154 todo!()
155 } else {
156 todo!()
157 }
158 }
159 }
160
161 fn can_be_next(e: &BuildTask) -> Option<BuildTargetId> {
162 let step =
163 e.0.steps
164 .iter()
165 .find(|step| *step.0.state.read() == TaskState::Queued)?;
166 for d in &step.0.requires.read().0 {
167 if let Dependency::Resolved { task, step, strict } = d {
168 if *strict
169 && *task
170 .get_step(*step)
171 .unwrap_or_else(|| unreachable!())
172 .0
173 .state
174 .read()
175 == TaskState::Running
176 {
177 return None;
178 }
179 }
180 }
181 Some(step.0.target)
182 }
183
184 #[deprecated(note = "assumes local archives")]
185 pub(super) fn enqueue<'a, I: Iterator<Item = &'a SourceFile>>(
186 map: &mut TaskMap,
187 backend: &AnyBackend,
188 archive: &Archive,
189 target: FormatOrTargets,
190 stale_only: bool,
191 files: I,
192 ) -> usize {
193 let targets = match target {
194 FormatOrTargets::Format(f) => f.targets,
195 FormatOrTargets::Targets(t) => t,
196 };
197 let has_target = |f: &SourceFile, tgt: BuildTargetId| {
198 f.target_state
199 .iter()
200 .find_map(|(k, v)| if *k == tgt { Some(v) } else { None })
201 .is_some_and(|t| !stale_only || matches!(t, FileState::Stale(_) | FileState::New))
202 };
203 let should_queue = |f: &SourceFile| targets.iter().any(|t| has_target(f, *t));
204 let mut count = 0;
205
206 for f in files.filter(|f| should_queue(f)) {
207 let key = (archive.id().clone(), f.relative_path.clone());
208 let task = match map.map.entry(key) {
209 Entry::Vacant(e) => {
210 count += 1;
211 let steps = targets
212 .iter()
213 .filter(|t| has_target(f, **t))
214 .map(|t| {
215 BuildStep(Arc::new(BuildStepI {
216 target: *t,
217 state: RwLock::new(TaskState::None),
218 requires: RwLock::new(VecSet::default()),
220 dependents: RwLock::new(Vec::new()),
221 }))
222 })
223 .collect::<Vec<_>>()
224 .into_boxed_slice();
225 map.total += steps.len();
226 let id = map.counter;
227 map.counter = map.counter.saturating_add(1);
228 let task = BuildTask::new(
229 BuildTaskId(id),
230 archive.uri().clone(),
231 steps,
232 match archive {
233 Archive::Local(archive) => {
234 Either::Left(archive.source_dir().join(f.relative_path.as_ref()))
235 }
236 Archive::Ext(..) => todo!("foreign archives"),
237 },
238 f.relative_path.clone(),
239 )
240 .expect("this is a bug");
241 e.insert(task.clone());
242 task
243 }
244 Entry::Occupied(o) => {
245 count += 1;
246 for s in o.get().steps() {
247 *s.0.state.write() = TaskState::None;
248 }
249 continue;
250 }
251 };
252 let spec = task.as_build_spec(backend);
253 if let FormatOrTargets::Format(fmt) = target {
254 for (f, d) in (fmt.dependencies)(spec) {
255 if let Some(step) = task.get_step(f) {
256 step.add_dependency(d.into());
257 }
258 }
259 Self::process_dependencies(&task, map);
260 }
261 }
262 count
263 }
264
265 fn process_dependencies(task: &BuildTask, map: &mut TaskMap) {
266 for s in task.steps() {
267 let key = task.as_task_ref(s.0.target);
268 if let Some(v) = map.dependents.remove(&key) {
269 for (d, i) in v {
270 if let Some(t) = d.get_step(s.0.target) {
271 let mut deps = t.0.requires.write();
272 for d in &mut deps.0 {
273 if let Dependency::Physical {
274 task: ref t,
275 strict,
276 } = d
277 {
278 if *t == key {
279 *d = Dependency::Resolved {
280 task: task.clone(),
281 step: i,
282 strict: *strict,
283 };
284 }
285 }
286 }
287 }
288 }
289 }
290 for dep in &mut s.0.requires.write().0 {
291 if let Dependency::Physical {
292 task: ref deptask,
293 strict,
294 } = dep
295 {
296 if deptask.archive == *task.0.uri.archive_id()
297 && deptask.rel_path == task.0.rel_path
298 {
299 continue;
300 }
302 let key = (deptask.archive.clone(), deptask.rel_path.clone());
303 if let Some(deptasks) = map.map.get(&key) {
304 if let Some(step) =
305 deptasks.steps().iter().find(|bt| bt.0.target == s.0.target)
306 {
307 step.0.dependents.write().push((task.0.id, s.0.target));
308 *dep = Dependency::Resolved {
309 task: deptasks.clone(),
310 step: s.0.target,
311 strict: *strict,
312 };
313 continue;
314 }
315 map.dependents
316 .entry(deptask.clone())
317 .or_default()
318 .push((task.clone(), s.0.target));
319 }
320 }
321 }
322 }
323 }
324}