flams_system/building/
queueing.rs

1use either::Either;
2use flams_math_archives::{
3    backend::AnyBackend,
4    formats::{BuildTargetId, FormatOrTargets},
5    source_files::{FileState, SourceFile},
6    Archive, MathArchive,
7};
8use flams_utils::{triomphe::Arc, vecmap::VecSet};
9use ftml_uris::UriWithArchive;
10use parking_lot::RwLock;
11use std::collections::hash_map::Entry;
12
13use super::{
14    queue::{Queue, QueueState, RunningQueue, TaskMap},
15    BuildStep, BuildStepI, BuildTask, BuildTaskId, Dependency, TaskState,
16};
17
18impl Queue {
19    #[allow(clippy::significant_drop_in_scrutinee)]
20    pub(super) fn sort(map: &TaskMap, state: &mut RunningQueue) {
21        let RunningQueue {
22            queue,
23            done,
24            blocked,
25            failed,
26            ..
27        } = state;
28        let mut tasks = map.map.values().cloned().collect::<Vec<_>>();
29        let mut weak = true;
30        while !tasks.is_empty() {
31            let mut changed = false;
32            for t in &tasks {
33                let mut has_failed = false;
34                let Some(step) = t.steps().iter().find(|s| {
35                    let state = s.0.state.read();
36                    if *state == TaskState::Failed {
37                        has_failed = true;
38                        return false;
39                    }
40                    !matches!(*state, TaskState::Done)
41                }) else {
42                    if has_failed {
43                        failed.push(t.clone());
44                    } else {
45                        done.push(t.clone());
46                    }
47                    continue;
48                };
49                let mut newstate = TaskState::Queued;
50                for d in step.0.requires.read().iter() {
51                    match d {
52                        Dependency::Resolved { task, strict, step } if *strict || weak => {
53                            match *task
54                                .get_step(*step)
55                                .unwrap_or_else(|| unreachable!())
56                                .0
57                                .state
58                                .read()
59                            {
60                                TaskState::Done
61                                | TaskState::Queued
62                                | TaskState::Failed
63                                | TaskState::Running => (),
64                                TaskState::Blocked => {
65                                    newstate = TaskState::Blocked;
66                                }
67                                TaskState::None => {
68                                    newstate = TaskState::None;
69                                    break;
70                                }
71                            }
72                        }
73                        _ => (),
74                    }
75                }
76                let mut found = false;
77                if newstate == TaskState::None {
78                    continue;
79                }
80                changed = true;
81                for s in t.steps() {
82                    if s == step {
83                        found = true;
84                        *s.0.state.write() = newstate;
85                    } else if found {
86                        *s.0.state.write() = TaskState::Blocked;
87                    }
88                }
89                match newstate {
90                    TaskState::Blocked => blocked.push(t.clone()),
91                    TaskState::Queued => queue.push_back(t.clone()),
92                    _ => (),
93                }
94            }
95            if changed {
96                tasks.retain(|t| {
97                    t.steps()
98                        .iter()
99                        .any(|s| *s.0.state.read() == TaskState::None)
100                });
101            } else if weak {
102                weak = false;
103            } else {
104                let tasks = std::mem::take(&mut tasks);
105                for t in tasks {
106                    for s in t.steps() {
107                        let mut s = s.0.state.write();
108                        if *s == TaskState::None {
109                            *s = TaskState::Blocked;
110                        }
111                    }
112                    blocked.push(t);
113                }
114            }
115        }
116    }
117
118    pub(super) fn get_next(&self) -> Option<(BuildTask, BuildTargetId)> {
119        loop {
120            let mut state = self.0.state.write();
121            let QueueState::Running(RunningQueue {
122                queue,
123                blocked,
124                running,
125                ..
126            }) = &mut *state
127            else {
128                unreachable!()
129            };
130            if queue.is_empty() && blocked.is_empty() && running.is_empty() {
131                return None;
132            }
133            if let Some((i, target)) = queue
134                .iter()
135                .enumerate()
136                .find_map(|(next, e)| Self::can_be_next(e).map(|t| (next, t)))
137            {
138                let Some(task) = queue.remove(i) else {
139                    unreachable!()
140                };
141                *task
142                    .get_step(target)
143                    .unwrap_or_else(|| unreachable!())
144                    .0
145                    .state
146                    .write() = TaskState::Running;
147                running.push(task.clone());
148                return Some((task, target));
149            }
150            if !running.is_empty() {
151                drop(state);
152                std::thread::sleep(std::time::Duration::from_secs(1));
153            } else if !blocked.is_empty() {
154                todo!()
155            } else {
156                todo!()
157            }
158        }
159    }
160
161    fn can_be_next(e: &BuildTask) -> Option<BuildTargetId> {
162        let step =
163            e.0.steps
164                .iter()
165                .find(|step| *step.0.state.read() == TaskState::Queued)?;
166        for d in &step.0.requires.read().0 {
167            if let Dependency::Resolved { task, step, strict } = d {
168                if *strict
169                    && *task
170                        .get_step(*step)
171                        .unwrap_or_else(|| unreachable!())
172                        .0
173                        .state
174                        .read()
175                        == TaskState::Running
176                {
177                    return None;
178                }
179            }
180        }
181        Some(step.0.target)
182    }
183
184    #[deprecated(note = "assumes local archives")]
185    pub(super) fn enqueue<'a, I: Iterator<Item = &'a SourceFile>>(
186        map: &mut TaskMap,
187        backend: &AnyBackend,
188        archive: &Archive,
189        target: FormatOrTargets,
190        stale_only: bool,
191        files: I,
192    ) -> usize {
193        let targets = match target {
194            FormatOrTargets::Format(f) => f.targets,
195            FormatOrTargets::Targets(t) => t,
196        };
197        let has_target = |f: &SourceFile, tgt: BuildTargetId| {
198            f.target_state
199                .iter()
200                .find_map(|(k, v)| if *k == tgt { Some(v) } else { None })
201                .is_some_and(|t| !stale_only || matches!(t, FileState::Stale(_) | FileState::New))
202        };
203        let should_queue = |f: &SourceFile| targets.iter().any(|t| has_target(f, *t));
204        let mut count = 0;
205
206        for f in files.filter(|f| should_queue(f)) {
207            let key = (archive.id().clone(), f.relative_path.clone());
208            let task = match map.map.entry(key) {
209                Entry::Vacant(e) => {
210                    count += 1;
211                    let steps = targets
212                        .iter()
213                        .filter(|t| has_target(f, **t))
214                        .map(|t| {
215                            BuildStep(Arc::new(BuildStepI {
216                                target: *t,
217                                state: RwLock::new(TaskState::None),
218                                //yields:RwLock::new(Vec::new()),
219                                requires: RwLock::new(VecSet::default()),
220                                dependents: RwLock::new(Vec::new()),
221                            }))
222                        })
223                        .collect::<Vec<_>>()
224                        .into_boxed_slice();
225                    map.total += steps.len();
226                    let id = map.counter;
227                    map.counter = map.counter.saturating_add(1);
228                    let task = BuildTask::new(
229                        BuildTaskId(id),
230                        archive.uri().clone(),
231                        steps,
232                        match archive {
233                            Archive::Local(archive) => {
234                                Either::Left(archive.source_dir().join(f.relative_path.as_ref()))
235                            }
236                            Archive::Ext(..) => todo!("foreign archives"),
237                        },
238                        f.relative_path.clone(),
239                    )
240                    .expect("this is a bug");
241                    e.insert(task.clone());
242                    task
243                }
244                Entry::Occupied(o) => {
245                    count += 1;
246                    for s in o.get().steps() {
247                        *s.0.state.write() = TaskState::None;
248                    }
249                    continue;
250                }
251            };
252            let spec = task.as_build_spec(backend);
253            if let FormatOrTargets::Format(fmt) = target {
254                for (f, d) in (fmt.dependencies)(spec) {
255                    if let Some(step) = task.get_step(f) {
256                        step.add_dependency(d.into());
257                    }
258                }
259                Self::process_dependencies(&task, map);
260            }
261        }
262        count
263    }
264
265    fn process_dependencies(task: &BuildTask, map: &mut TaskMap) {
266        for s in task.steps() {
267            let key = task.as_task_ref(s.0.target);
268            if let Some(v) = map.dependents.remove(&key) {
269                for (d, i) in v {
270                    if let Some(t) = d.get_step(s.0.target) {
271                        let mut deps = t.0.requires.write();
272                        for d in &mut deps.0 {
273                            if let Dependency::Physical {
274                                task: ref t,
275                                strict,
276                            } = d
277                            {
278                                if *t == key {
279                                    *d = Dependency::Resolved {
280                                        task: task.clone(),
281                                        step: i,
282                                        strict: *strict,
283                                    };
284                                }
285                            }
286                        }
287                    }
288                }
289            }
290            for dep in &mut s.0.requires.write().0 {
291                if let Dependency::Physical {
292                    task: ref deptask,
293                    strict,
294                } = dep
295                {
296                    if deptask.archive == *task.0.uri.archive_id()
297                        && deptask.rel_path == task.0.rel_path
298                    {
299                        continue;
300                        // TODO check for more
301                    }
302                    let key = (deptask.archive.clone(), deptask.rel_path.clone());
303                    if let Some(deptasks) = map.map.get(&key) {
304                        if let Some(step) =
305                            deptasks.steps().iter().find(|bt| bt.0.target == s.0.target)
306                        {
307                            step.0.dependents.write().push((task.0.id, s.0.target));
308                            *dep = Dependency::Resolved {
309                                task: deptasks.clone(),
310                                step: s.0.target,
311                                strict: *strict,
312                            };
313                            continue;
314                        }
315                        map.dependents
316                            .entry(deptask.clone())
317                            .or_default()
318                            .push((task.clone(), s.0.target));
319                    }
320                }
321            }
322        }
323    }
324}