flams_system/building/
queueing.rs

1use std::collections::hash_map::Entry;
2
3use either::Either;
4use flams_ontology::uris::{ArchiveURITrait, URIRefTrait};
5use flams_utils::{triomphe::Arc, vecmap::VecSet};
6use parking_lot::RwLock;
7
8use crate::{
9    backend::{
10        archives::{
11            source_files::{FileState, SourceFile},
12            Archive,
13        },
14        AnyBackend,
15    },
16    formats::{BuildTargetId, FormatOrTargets},
17};
18
19use super::{
20    queue::{Queue, QueueState, RunningQueue, TaskMap},
21    BuildStep, BuildStepI, BuildTask, BuildTaskI, BuildTaskId, Dependency, TaskState,
22};
23
24impl Queue {
25    #[allow(clippy::significant_drop_in_scrutinee)]
26    pub(super) fn sort(map: &TaskMap, state: &mut RunningQueue) {
27        let RunningQueue {
28            queue,
29            done,
30            blocked,
31            failed,
32            ..
33        } = state;
34        let mut tasks = map.map.values().cloned().collect::<Vec<_>>();
35        let mut weak = true;
36        while !tasks.is_empty() {
37            let mut changed = false;
38            for t in &tasks {
39                let mut has_failed = false;
40                let Some(step) = t.steps().iter().find(|s| {
41                    let state = s.0.state.read();
42                    if *state == TaskState::Failed {
43                        has_failed = true;
44                        return false;
45                    }
46                    !matches!(*state, TaskState::Done)
47                }) else {
48                    if has_failed {
49                        failed.push(t.clone())
50                    } else {
51                        done.push(t.clone());
52                    }
53                    continue;
54                };
55                let mut newstate = TaskState::Queued;
56                for d in step.0.requires.read().iter() {
57                    match d {
58                        Dependency::Resolved { task, strict, step } if *strict || weak => {
59                            match *task
60                                .get_step(*step)
61                                .unwrap_or_else(|| unreachable!())
62                                .0
63                                .state
64                                .read()
65                            {
66                                TaskState::Done
67                                | TaskState::Queued
68                                | TaskState::Failed
69                                | TaskState::Running => (),
70                                TaskState::Blocked => {
71                                    newstate = TaskState::Blocked;
72                                }
73                                TaskState::None => {
74                                    newstate = TaskState::None;
75                                    break;
76                                }
77                            }
78                        }
79                        _ => (),
80                    }
81                }
82                let mut found = false;
83                if newstate == TaskState::None {
84                    continue;
85                }
86                changed = true;
87                for s in t.steps() {
88                    if s == step {
89                        found = true;
90                        *s.0.state.write() = newstate;
91                    } else if found {
92                        *s.0.state.write() = TaskState::Blocked;
93                    }
94                }
95                match newstate {
96                    TaskState::Blocked => blocked.push(t.clone()),
97                    TaskState::Queued => queue.push_back(t.clone()),
98                    _ => (),
99                }
100            }
101            if changed {
102                tasks.retain(|t| {
103                    t.steps()
104                        .iter()
105                        .any(|s| *s.0.state.read() == TaskState::None)
106                });
107            } else if weak {
108                weak = false;
109            } else {
110                let tasks = std::mem::take(&mut tasks);
111                for t in tasks {
112                    for s in t.steps() {
113                        let mut s = s.0.state.write();
114                        if *s == TaskState::None {
115                            *s = TaskState::Blocked;
116                        }
117                    }
118                    blocked.push(t);
119                }
120            }
121        }
122    }
123
124    pub(super) fn get_next(&self) -> Option<(BuildTask, BuildTargetId)> {
125        loop {
126            let mut state = self.0.state.write();
127            let QueueState::Running(RunningQueue {
128                queue,
129                blocked,
130                running,
131                ..
132            }) = &mut *state
133            else {
134                unreachable!()
135            };
136            if queue.is_empty() && blocked.is_empty() && running.is_empty() {
137                return None;
138            }
139            if let Some((i, target)) = queue
140                .iter()
141                .enumerate()
142                .find_map(|(next, e)| Self::can_be_next(e).map(|t| (next, t)))
143            {
144                let Some(task) = queue.remove(i) else {
145                    unreachable!()
146                };
147                *task
148                    .get_step(target)
149                    .unwrap_or_else(|| unreachable!())
150                    .0
151                    .state
152                    .write() = TaskState::Running;
153                running.push(task.clone());
154                return Some((task, target));
155            }
156            if !running.is_empty() {
157                drop(state);
158                std::thread::sleep(std::time::Duration::from_secs(1));
159            } else if !blocked.is_empty() {
160                todo!()
161            } else {
162                todo!()
163            }
164        }
165    }
166
167    fn can_be_next(e: &BuildTask) -> Option<BuildTargetId> {
168        let step =
169            e.0.steps
170                .iter()
171                .find(|step| *step.0.state.read() == TaskState::Queued)?;
172        for d in &step.0.requires.read().0 {
173            if let Dependency::Resolved { task, step, strict } = d {
174                if *strict
175                    && *task
176                        .get_step(*step)
177                        .unwrap_or_else(|| unreachable!())
178                        .0
179                        .state
180                        .read()
181                        == TaskState::Running
182                {
183                    return None;
184                }
185            }
186        }
187        Some(step.0.target)
188    }
189
190    pub(super) fn enqueue<'a, I: Iterator<Item = &'a SourceFile>>(
191        map: &mut TaskMap,
192        backend: &AnyBackend,
193        archive: &Archive,
194        target: FormatOrTargets,
195        stale_only: bool,
196        files: I,
197    ) -> usize {
198        let targets = match target {
199            FormatOrTargets::Format(f) => f.targets(),
200            FormatOrTargets::Targets(t) => t,
201        };
202        let has_target = |f: &SourceFile, tgt: BuildTargetId| {
203            f.target_state
204                .get(&tgt)
205                .is_some_and(|t| !stale_only || matches!(t, FileState::Stale(_) | FileState::New))
206        };
207        let should_queue = |f: &SourceFile| targets.iter().any(|t| has_target(f, *t));
208        let mut count = 0;
209
210        for f in files.filter(|f| should_queue(f)) {
211            let key = (archive.id().clone(), f.relative_path.clone());
212            let task = match map.map.entry(key) {
213                Entry::Vacant(e) => {
214                    count += 1;
215                    let steps = targets
216                        .iter()
217                        .filter(|t| has_target(f, **t))
218                        .map(|t| {
219                            BuildStep(Arc::new(BuildStepI {
220                                target: *t,
221                                state: RwLock::new(TaskState::None),
222                                //yields:RwLock::new(Vec::new()),
223                                requires: RwLock::new(VecSet::default()),
224                                dependents: RwLock::new(Vec::new()),
225                            }))
226                        })
227                        .collect::<Vec<_>>()
228                        .into_boxed_slice();
229                    map.total += steps.len();
230                    let id = map.counter;
231                    map.counter = map.counter.saturating_add(1);
232                    let task_i = Arc::new(BuildTaskI {
233                        id: BuildTaskId(id),
234                        archive: archive.uri().owned(),
235                        steps,
236                        source: match archive {
237                            Archive::Local(archive) => {
238                                Either::Left(archive.source_dir().join(&*f.relative_path))
239                            }
240                        },
241                        rel_path: f.relative_path.clone(),
242                    });
243                    e.insert(BuildTask(task_i.clone()));
244                    BuildTask(task_i)
245                }
246                Entry::Occupied(o) => {
247                    count += 1;
248                    for s in o.get().steps() {
249                        *s.0.state.write() = TaskState::None;
250                    }
251                    continue;
252                }
253            };
254            if let FormatOrTargets::Format(fmt) = target {
255                (fmt.dependencies())(backend, &task);
256                Self::process_dependencies(&task, map);
257            }
258        }
259        count
260    }
261
262    fn process_dependencies(task: &BuildTask, map: &mut TaskMap) {
263        for s in task.steps() {
264            let key = task.get_task_ref(s.0.target);
265            if let Some(v) = map.dependents.remove(&key) {
266                for (d, i) in v {
267                    if let Some(t) = d.get_step(s.0.target) {
268                        let mut deps = t.0.requires.write();
269                        for d in &mut deps.0 {
270                            if let Dependency::Physical {
271                                task: ref t,
272                                strict,
273                            } = d
274                            {
275                                if *t == key {
276                                    *d = Dependency::Resolved {
277                                        task: task.clone(),
278                                        step: i,
279                                        strict: *strict,
280                                    };
281                                }
282                            }
283                        }
284                    }
285                }
286            }
287            for dep in &mut s.0.requires.write().0 {
288                if let Dependency::Physical {
289                    task: ref deptask,
290                    strict,
291                } = dep
292                {
293                    if deptask.archive == *task.0.archive.archive_id()
294                        && deptask.rel_path == task.0.rel_path
295                    {
296                        continue;
297                        // TODO check for more
298                    }
299                    let key = (deptask.archive.clone(), deptask.rel_path.clone());
300                    if let Some(deptasks) = map.map.get(&key) {
301                        if let Some(step) =
302                            deptasks.steps().iter().find(|bt| bt.0.target == s.0.target)
303                        {
304                            step.0.dependents.write().push((task.0.id, s.0.target));
305                            *dep = Dependency::Resolved {
306                                task: deptasks.clone(),
307                                step: s.0.target,
308                                strict: *strict,
309                            };
310                            continue;
311                        }
312                        map.dependents
313                            .entry(deptask.clone())
314                            .or_default()
315                            .push((task.clone(), s.0.target));
316                    }
317                }
318            }
319        }
320    }
321}