flams_system/building/
queueing.rs1use std::collections::hash_map::Entry;
2
3use either::Either;
4use flams_ontology::uris::{ArchiveURITrait, URIRefTrait};
5use flams_utils::{triomphe::Arc, vecmap::VecSet};
6use parking_lot::RwLock;
7
8use crate::{
9 backend::{
10 archives::{
11 source_files::{FileState, SourceFile},
12 Archive,
13 },
14 AnyBackend,
15 },
16 formats::{BuildTargetId, FormatOrTargets},
17};
18
19use super::{
20 queue::{Queue, QueueState, RunningQueue, TaskMap},
21 BuildStep, BuildStepI, BuildTask, BuildTaskI, BuildTaskId, Dependency, TaskState,
22};
23
24impl Queue {
25 #[allow(clippy::significant_drop_in_scrutinee)]
26 pub(super) fn sort(map: &TaskMap, state: &mut RunningQueue) {
27 let RunningQueue {
28 queue,
29 done,
30 blocked,
31 failed,
32 ..
33 } = state;
34 let mut tasks = map.map.values().cloned().collect::<Vec<_>>();
35 let mut weak = true;
36 while !tasks.is_empty() {
37 let mut changed = false;
38 for t in &tasks {
39 let mut has_failed = false;
40 let Some(step) = t.steps().iter().find(|s| {
41 let state = s.0.state.read();
42 if *state == TaskState::Failed {
43 has_failed = true;
44 return false;
45 }
46 !matches!(*state, TaskState::Done)
47 }) else {
48 if has_failed {
49 failed.push(t.clone())
50 } else {
51 done.push(t.clone());
52 }
53 continue;
54 };
55 let mut newstate = TaskState::Queued;
56 for d in step.0.requires.read().iter() {
57 match d {
58 Dependency::Resolved { task, strict, step } if *strict || weak => {
59 match *task
60 .get_step(*step)
61 .unwrap_or_else(|| unreachable!())
62 .0
63 .state
64 .read()
65 {
66 TaskState::Done
67 | TaskState::Queued
68 | TaskState::Failed
69 | TaskState::Running => (),
70 TaskState::Blocked => {
71 newstate = TaskState::Blocked;
72 }
73 TaskState::None => {
74 newstate = TaskState::None;
75 break;
76 }
77 }
78 }
79 _ => (),
80 }
81 }
82 let mut found = false;
83 if newstate == TaskState::None {
84 continue;
85 }
86 changed = true;
87 for s in t.steps() {
88 if s == step {
89 found = true;
90 *s.0.state.write() = newstate;
91 } else if found {
92 *s.0.state.write() = TaskState::Blocked;
93 }
94 }
95 match newstate {
96 TaskState::Blocked => blocked.push(t.clone()),
97 TaskState::Queued => queue.push_back(t.clone()),
98 _ => (),
99 }
100 }
101 if changed {
102 tasks.retain(|t| {
103 t.steps()
104 .iter()
105 .any(|s| *s.0.state.read() == TaskState::None)
106 });
107 } else if weak {
108 weak = false;
109 } else {
110 let tasks = std::mem::take(&mut tasks);
111 for t in tasks {
112 for s in t.steps() {
113 let mut s = s.0.state.write();
114 if *s == TaskState::None {
115 *s = TaskState::Blocked;
116 }
117 }
118 blocked.push(t);
119 }
120 }
121 }
122 }
123
124 pub(super) fn get_next(&self) -> Option<(BuildTask, BuildTargetId)> {
125 loop {
126 let mut state = self.0.state.write();
127 let QueueState::Running(RunningQueue {
128 queue,
129 blocked,
130 running,
131 ..
132 }) = &mut *state
133 else {
134 unreachable!()
135 };
136 if queue.is_empty() && blocked.is_empty() && running.is_empty() {
137 return None;
138 }
139 if let Some((i, target)) = queue
140 .iter()
141 .enumerate()
142 .find_map(|(next, e)| Self::can_be_next(e).map(|t| (next, t)))
143 {
144 let Some(task) = queue.remove(i) else {
145 unreachable!()
146 };
147 *task
148 .get_step(target)
149 .unwrap_or_else(|| unreachable!())
150 .0
151 .state
152 .write() = TaskState::Running;
153 running.push(task.clone());
154 return Some((task, target));
155 }
156 if !running.is_empty() {
157 drop(state);
158 std::thread::sleep(std::time::Duration::from_secs(1));
159 } else if !blocked.is_empty() {
160 todo!()
161 } else {
162 todo!()
163 }
164 }
165 }
166
167 fn can_be_next(e: &BuildTask) -> Option<BuildTargetId> {
168 let step =
169 e.0.steps
170 .iter()
171 .find(|step| *step.0.state.read() == TaskState::Queued)?;
172 for d in &step.0.requires.read().0 {
173 if let Dependency::Resolved { task, step, strict } = d {
174 if *strict
175 && *task
176 .get_step(*step)
177 .unwrap_or_else(|| unreachable!())
178 .0
179 .state
180 .read()
181 == TaskState::Running
182 {
183 return None;
184 }
185 }
186 }
187 Some(step.0.target)
188 }
189
190 pub(super) fn enqueue<'a, I: Iterator<Item = &'a SourceFile>>(
191 map: &mut TaskMap,
192 backend: &AnyBackend,
193 archive: &Archive,
194 target: FormatOrTargets,
195 stale_only: bool,
196 files: I,
197 ) -> usize {
198 let targets = match target {
199 FormatOrTargets::Format(f) => f.targets(),
200 FormatOrTargets::Targets(t) => t,
201 };
202 let has_target = |f: &SourceFile, tgt: BuildTargetId| {
203 f.target_state
204 .get(&tgt)
205 .is_some_and(|t| !stale_only || matches!(t, FileState::Stale(_) | FileState::New))
206 };
207 let should_queue = |f: &SourceFile| targets.iter().any(|t| has_target(f, *t));
208 let mut count = 0;
209
210 for f in files.filter(|f| should_queue(f)) {
211 let key = (archive.id().clone(), f.relative_path.clone());
212 let task = match map.map.entry(key) {
213 Entry::Vacant(e) => {
214 count += 1;
215 let steps = targets
216 .iter()
217 .filter(|t| has_target(f, **t))
218 .map(|t| {
219 BuildStep(Arc::new(BuildStepI {
220 target: *t,
221 state: RwLock::new(TaskState::None),
222 requires: RwLock::new(VecSet::default()),
224 dependents: RwLock::new(Vec::new()),
225 }))
226 })
227 .collect::<Vec<_>>()
228 .into_boxed_slice();
229 map.total += steps.len();
230 let id = map.counter;
231 map.counter = map.counter.saturating_add(1);
232 let task_i = Arc::new(BuildTaskI {
233 id: BuildTaskId(id),
234 archive: archive.uri().owned(),
235 steps,
236 source: match archive {
237 Archive::Local(archive) => {
238 Either::Left(archive.source_dir().join(&*f.relative_path))
239 }
240 },
241 rel_path: f.relative_path.clone(),
242 });
243 e.insert(BuildTask(task_i.clone()));
244 BuildTask(task_i)
245 }
246 Entry::Occupied(o) => {
247 count += 1;
248 for s in o.get().steps() {
249 *s.0.state.write() = TaskState::None;
250 }
251 continue;
252 }
253 };
254 if let FormatOrTargets::Format(fmt) = target {
255 (fmt.dependencies())(backend, &task);
256 Self::process_dependencies(&task, map);
257 }
258 }
259 count
260 }
261
262 fn process_dependencies(task: &BuildTask, map: &mut TaskMap) {
263 for s in task.steps() {
264 let key = task.get_task_ref(s.0.target);
265 if let Some(v) = map.dependents.remove(&key) {
266 for (d, i) in v {
267 if let Some(t) = d.get_step(s.0.target) {
268 let mut deps = t.0.requires.write();
269 for d in &mut deps.0 {
270 if let Dependency::Physical {
271 task: ref t,
272 strict,
273 } = d
274 {
275 if *t == key {
276 *d = Dependency::Resolved {
277 task: task.clone(),
278 step: i,
279 strict: *strict,
280 };
281 }
282 }
283 }
284 }
285 }
286 }
287 for dep in &mut s.0.requires.write().0 {
288 if let Dependency::Physical {
289 task: ref deptask,
290 strict,
291 } = dep
292 {
293 if deptask.archive == *task.0.archive.archive_id()
294 && deptask.rel_path == task.0.rel_path
295 {
296 continue;
297 }
299 let key = (deptask.archive.clone(), deptask.rel_path.clone());
300 if let Some(deptasks) = map.map.get(&key) {
301 if let Some(step) =
302 deptasks.steps().iter().find(|bt| bt.0.target == s.0.target)
303 {
304 step.0.dependents.write().push((task.0.id, s.0.target));
305 *dep = Dependency::Resolved {
306 task: deptasks.clone(),
307 step: s.0.target,
308 strict: *strict,
309 };
310 continue;
311 }
312 map.dependents
313 .entry(deptask.clone())
314 .or_default()
315 .push((task.clone(), s.0.target));
316 }
317 }
318 }
319 }
320 }
321}