1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4
5#[cfg(any(
6 all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7 not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_router_base::{LoginState, require_login, ws::WebSocket};
12use flams_router_base::{maybe_lazy, ws};
13#[cfg(feature = "hydrate")]
14use flams_router_buildqueue_base::server_fns::get_log;
15use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
16use flams_router_content::checks::{DocumentCheckResult, ResultExt};
17use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
18use flams_utils::vecmap::VecMap;
19use flams_web_utils::components::wait_and_then_fn;
20use ftml_component_utils::{Collapsible, Header, LazyCollapsible};
21use ftml_dom::utils::css::inject_css;
22use ftml_ontology::utils::time::{Delta, Eta};
23use ftml_uris::{ArchiveId, DocumentUri};
24use leptos::{either::EitherOf4, prelude::*};
25use leptos_router::hooks::use_params_map;
26use std::num::NonZeroU32;
27
28#[derive(Copy, Clone)]
29struct UpdateQueues(RwSignal<()>);
30
31#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
32pub struct Entry {
33 id: u32,
34 archive: ArchiveId,
35 rel_path: String,
36 #[cfg(feature = "hydrate")]
37 steps: RwSignal<VecMap<String, TaskState>>,
38 #[cfg(not(feature = "hydrate"))]
39 steps: VecMap<String, TaskState>,
40}
41
42impl Entry {
43 #[cfg(not(feature = "hydrate"))]
44 fn as_view(&self) -> impl IntoView + use<> {
45 view! {
46 <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
47 }
48 }
49
50 #[cfg(feature = "hydrate")]
51 fn as_view(&self) -> AnyView {
52 use flams_router_base::vscode_link;
53
54 let vscode = vscode_link(&self.archive, &self.rel_path);
55
56 let title = format!("[{}]{}", self.archive, self.rel_path);
57 let total = self.steps.with_untracked(|v| v.0.len());
58 let steps = self.steps;
59 let current = move || {
60 steps.with(|v| {
61 v.iter()
62 .enumerate()
63 .find_map(|(i, (e, s))| {
64 if *s == TaskState::Done {
65 None
66 } else {
67 Some((i + 1, e.clone()))
68 }
69 })
70 .unwrap_or_else(|| (total, "Done".to_string()))
71 })
72 };
73 let rel_path = self.rel_path.clone();
74 let archive = self.archive.clone();
75 view! {
76 <li><Collapsible>
77 <Header slot>
78 <b>
79 {title}
80 {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
81 " "{vscode}
82 </b>
83 </Header>
84 <ol>
85 {let rel_path = rel_path.clone();
86 let archive = archive.clone();
87 move || steps.get().iter().map(|(t,e)|
88 view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
89 ).collect_view()}
90 </ol>
91 </Collapsible></li>
92 }
93 .into_any()
94 }
95}
96
97#[cfg(feature = "ssr")]
98impl From<flams_system::building::QueueEntry> for Entry {
99 fn from(e: flams_system::building::QueueEntry) -> Self {
100 #[cfg(feature = "hydrate")]
101 {
102 unreachable!()
103 }
104 #[cfg(not(feature = "hydrate"))]
105 Self {
106 id: e.id.into(),
107 archive: e.archive,
108 rel_path: e.rel_path.to_string(),
109 steps: e
110 .steps
111 .into_iter()
112 .map(|(k, v)| (k.to_string(), v.into()))
113 .collect(),
114 }
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
119pub enum TaskState {
120 Running,
121 Queued,
122 Blocked,
123 Done,
124 Failed,
125 None,
126}
127impl TaskState {
128 #[cfg(feature = "hydrate")]
129 fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> AnyView {
130 match self {
131 Self::Running => view! {<i style="color:yellow">{t}" (Running)"</i>}.into_any(),
132 Self::Queued | Self::Blocked | Self::None => {
133 view! {<span style="color:gray">{t}" (...)"</span>}.into_any()
134 }
135 Self::Done => {
136 let archive = archive.clone();
137 let rel_path = rel_path.to_string();
138 let tc = t.clone();
139 view! {
140 <LazyCollapsible>
141 <Header slot><span style="color:green">{t}" (Done)"</span></Header>
142 {
143 let archive = archive.clone();
144 let rel_path = rel_path.clone();
145 let tc = tc.clone();
146 let queue = expect_context::<AllQueues>().selected.get_untracked();
147 require_login(Box::new(move || wait_and_then_fn(
148 move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
149 |s| do_log(s)
150 )))
151 }
152 </LazyCollapsible>
153 }
154 .into_any()
155 }
156 Self::Failed => {
157 let archive = archive.clone();
158 let rel_path = rel_path.to_string();
159 let tc = t.clone();
160 view! {
161 <LazyCollapsible>
162 <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
163 {
164 let archive = archive.clone();
165 let rel_path = rel_path.clone();
166 let tc = tc.clone();
167 let queue = expect_context::<AllQueues>().selected.get_untracked();
168 require_login(Box::new(move || wait_and_then_fn(
169 move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
170 do_log
171 )))
172 }
173 </LazyCollapsible>
174 }
175 .into_any()
176 }
177 }
178 }
179}
180
181fn do_log(s: either::Either<String, String>) -> AnyView {
182 use ftml_component_utils::Scrollbar;
183 view! {<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">{
184 match s {
185 either::Left(s) => leptos::either::Either::Left(view!{
186 <pre style="width:fit-content;font-size:smaller;">{s}</pre>
187 }),
188 either::Right(v) => leptos::either::Either::Right({
189 ftml_solver_trace::results::DocumentCheckResult::from_json(&v)
190 .map_or_else(|_| view!{<pre>{v}</pre>}.into_any(),|e| e.render())
191 })
192 }
193 }</Scrollbar>}
194 .into_any()
195}
196
197#[cfg(feature = "ssr")]
198impl From<flams_system::building::TaskState> for TaskState {
199 fn from(e: flams_system::building::TaskState) -> Self {
200 use flams_system::building::TaskState;
201 match e {
202 TaskState::Running => Self::Running,
203 TaskState::Queued => Self::Queued,
204 TaskState::Blocked => Self::Blocked,
205 TaskState::Done => Self::Done,
206 TaskState::Failed => Self::Failed,
207 TaskState::None => Self::None,
208 }
209 }
210}
211
212#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
213pub enum QueueMessage {
214 Idle(Vec<Entry>),
215 Started {
216 running: Vec<Entry>,
217 queue: Vec<Entry>,
218 blocked: Vec<Entry>,
219 failed: Vec<Entry>,
220 done: Vec<Entry>,
221 },
222 Finished {
223 failed: Vec<Entry>,
224 done: Vec<Entry>,
225 },
226 TaskStarted {
227 id: u32,
228 target: String,
229 },
230 TaskSuccess {
231 id: u32,
232 target: String,
233 eta: Eta,
234 },
235 TaskFailed {
236 id: u32,
237 target: String,
238 eta: Eta,
239 },
240 TaskBlocked {
241 id: u32,
242 target: String,
243 eta: Eta,
244 },
245}
246#[cfg(feature = "ssr")]
247impl From<flams_system::building::QueueMessage> for QueueMessage {
248 fn from(e: flams_system::building::QueueMessage) -> Self {
249 use flams_system::building::QueueMessage;
250 match e {
251 QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
252 QueueMessage::Started {
253 running,
254 queue,
255 blocked,
256 failed,
257 done,
258 } => Self::Started {
259 running: running.into_iter().map(Into::into).collect(),
260 queue: queue.into_iter().map(Into::into).collect(),
261 blocked: blocked.into_iter().map(Into::into).collect(),
262 failed: failed.into_iter().map(Into::into).collect(),
263 done: done.into_iter().map(Into::into).collect(),
264 },
265 QueueMessage::Finished { failed, done } => Self::Finished {
266 failed: failed.into_iter().map(Into::into).collect(),
267 done: done.into_iter().map(Into::into).collect(),
268 },
269 QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
270 id: id.into(),
271 target: target.to_string(),
272 },
273 QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
274 id: id.into(),
275 target: target.to_string(),
276 eta,
277 },
278 QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
279 id: id.into(),
280 target: target.to_string(),
281 eta,
282 },
283 QueueMessage::TaskBlocked { id, target, eta } => Self::TaskBlocked {
284 id: id.into(),
285 target: target.to_string(),
286 eta,
287 },
288 }
289 }
290}
291
292maybe_lazy!(QueuesTop = queues_top());
295
296pub fn queues_top() -> AnyView {
298 use flams_web_utils::components::{Layout, LayoutHeader};
299 use ftml_component_utils::{Divider, Spinner, Tab, TabList};
300
301 let update = UpdateQueues(RwSignal::new(()));
302 provide_context(update);
303 (move || {
304 let () = update.0.get();
305 let params = use_params_map();
306 let id = move || params.read().get("queue");
307
308 require_login(Box::new(move || {
309 wait_and_then_fn(server_fns::get_queues, move |v| {
310 if v.is_empty() {
311 return view!(<div>"(No running queues)"</div>).into_any();
312 }
313 let queues = AllQueues::new(v);
314 if let Some(id) = params.read_untracked().get("queue") && let Ok(id) = id.parse() {
315 queues.selected.update_untracked(|v| *v = id);
316 }
317 provide_context(queues);
318 let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
319 let _ = Effect::new(move |_| {
320 let value = selected_value.get();
321 let selected = queues.selected.get_untracked();
322 let value = value.parse().unwrap_or_else(|_| unreachable!());
323 if selected != value {
324 queues.selected.set(value);
325 }
326 });
327 view! {<Layout>
328 <LayoutHeader slot>
329 <TabList selected_value>
330 <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
331 <Tab value=i.to_string()>{
332 queues.queue_names.with_untracked(|m| m.get(&i).cloned()).unwrap_or_else(|| unreachable!())
333 }</Tab>
334 }/>
335 </TabList>
336 <div style="margin:10px"><Divider/></div>
337 </LayoutHeader>
338 {move || {
339 queues.show.update_untracked(|v| *v = false);
341 QueueSocket::run(queues);
342 move || view! {
343 <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
344 let ls = move || {
345 let curr = queues.selected.get();
346 (curr,queues.queues.with(|m| m.get(&curr).copied()).unwrap_or_else(|| unreachable!()))
347 };
348 move || {
349 let (curr,ls) = ls();
350 match ls.get() {
351 QueueData::Idle(v) => {
352 idle(curr,v)
353 },
354 QueueData::Running(r) => {
355 running(curr,r)
356 },
357 QueueData::Finished(failed,done) => finished(curr,failed,done),
358 QueueData::Empty => view!(<div>"Other"</div>).into_any()
359 }
360 }
361 }</Show>
362 }
363 }}
364 </Layout>}.into_any()
365 })
366 }) as _)
367 }).into_any()
368}
369
370#[allow(clippy::too_many_lines)]
371fn repos(queue_id: NonZeroU32, allowed: bool) -> AnyView {
372 use ftml_component_utils::{BoldCaption, Table, TableCell, TableHeader, TableRow};
373 if matches!(LoginState::get(), LoginState::NoAccounts) {
374 return ().into_any();
375 }
376 let queues: AllQueues = expect_context();
377 let Some(repos) = queues
378 .queue_repos
379 .with_untracked(|v| v.get(&queue_id).cloned())
380 .flatten()
381 else {
382 return ().into_any();
383 };
384 if repos.is_empty() {
385 return ().into_any();
386 }
387 let style = if allowed { "" } else { "color:gray;" };
388 inject_css("flams-repo-table", include_str!("repo-table.css"));
389 view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
390 <Header slot><BoldCaption>"Archives"</BoldCaption></Header>
391 <Table class="flams-repo-table">
392 <TableHeader slot>
393 <TableCell><BoldCaption>"Archive"</BoldCaption></TableCell>
394 <TableCell><BoldCaption>"Branch"</BoldCaption></TableCell>
395 <TableCell><BoldCaption>"Commit"</BoldCaption></TableCell>
396 </TableHeader>
397 {
398 repos.into_iter().map(|d| match d {
399 RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
400 <TableRow>
401 <TableCell><span style=style>{id.to_string()}</span></TableCell>
402 <TableCell>"(Copied from MathHub)"</TableCell>
403 </TableRow>
404 }),
405 RepoInfo::Git{id,branch,commit,remote} => leptos::either::Either::Right({
406 let updates = RwSignal::<Option<Vec<(String,flams_backend_types::git::Commit)>>>::new(None);
407 let style = move || if allowed {
408 updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
409 "color:green;"
410 } else {
411 "color:yellowgreen;"
412 }))
413 } else {style};
414 let idstr = id.to_string();
415 view!{
416 <TableRow>
417 <TableCell><span style=style>{idstr}</span></TableCell>
418 <TableCell>{branch}</TableCell>
419 <TableCell>
420 {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
421 {if allowed {Some(move || updates.with(|up| {
422 let Some(up) = up else {
423 let aid = id.clone();
424 let toaster = ftml_component_utils::toasts::ToasterInjection::expect_context();
425 let get_updates = Action::new(move |()| {
426 let id = aid.clone();
427 async move {
428 match get_new_commits(Some(queue_id),id).await {
429 Ok(v) => updates.set(Some(v)),
430 Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
431 }
432 }
433 });
434 return leptos::either::EitherOf3::A(view!{
435 <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
436 });
437 };
438 if up.is_empty() {
439 return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
440 }
441 let updates = up.clone();
442
443 leptos::either::EitherOf3::C({
444 use ftml_component_utils::{Button,ButtonSize,Combobox,ComboboxOption};
445 let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
446 let branch = RwSignal::new(first.clone());
447 let _ = Effect::new(move || if branch.with(String::is_empty) {
448 branch.set(first.clone());
449 });
450 let update : UpdateQueues = expect_context();
451 let archive = id.clone();
453 let remote = remote.clone();
454 let act = flams_web_utils::components::message_action(
455 move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
456 move |(i,_)| {
457 update.0.set(());
458 format!("{i} jobs queued")
459 }
460 );
461 view!{
462 <span style="color:green">
463 " Updates available: "
464 </span>
465 <div style="margin-left:10px">
466 <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
467 " from branch: "
468 <div style="display:inline-block;"><Combobox selected_options=branch>{
469 updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
470 <ComboboxOption text=vname.clone() value=vname>
471 {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
472 </ComboboxOption>
473 }}).collect_view()
474 }</Combobox></div>
475 </div>
476 }
477 })
478 }))} else {None}}
479 </TableCell>
480 </TableRow>
481 }
482 }),
483 }).collect_view()
484 }
485 </Table>
486 </Collapsible></div>
487 }.into_any()
488}
489
490fn delete_action(id: NonZeroU32) -> Action<(), ()> {
491 use ftml_component_utils::toasts::ToasterInjection;
492 let update: UpdateQueues = expect_context();
493 let toaster = ToasterInjection::expect_context();
494 Action::new(move |()| async move {
495 match flams_router_buildqueue_base::server_fns::delete(id).await {
496 Ok(()) => update.0.set(()),
497 Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
498 }
499 })
500}
501
502fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> AnyView {
503 use ftml_component_utils::Button;
504 let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
505 flams_router_buildqueue_base::server_fns::run(id)
506 });
507 let del = delete_action(id);
508 view! {
509 <div style="width:100%"><div style="position:fixed;right:20px">
510 <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
511 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
512 </div></div>
513 {repos(id,true)}
514 <ol reversed style="margin-left:30px">
515 <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
516 </ol>
517 }
518 .into_any()
519}
520
521fn running(id: NonZeroU32, queue: RunningQueue) -> AnyView {
522 use ftml_component_utils::{AnchorMenu, AnchorMenuEntry, Button};
523 let del = delete_action(id);
524 let RunningQueue {
525 running,
526 queue,
527 blocked,
528 failed,
529 done,
530 eta,
531 } = queue;
532 view! {
533 <div style="position:fixed;right:20px;z-index:5"><AnchorMenu>
534 <AnchorMenuEntry href="#running">"Running"</AnchorMenuEntry>
535 <AnchorMenuEntry href="#queued">"Queued"</AnchorMenuEntry>
536 <AnchorMenuEntry href="#blocked">"Blocked"</AnchorMenuEntry>
537 <AnchorMenuEntry href="#failed">"Failed"</AnchorMenuEntry>
538 <AnchorMenuEntry href="#finished">"Finished"</AnchorMenuEntry>
539 </AnchorMenu></div>
540 {repos(id,false)}
541 <div style="text-align:left;">
542 {eta.into_view()}
543 <div style="width:100%"><div style="position:fixed;right:20px">
544 <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
545 </div></div>
546 <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
547 <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
548 <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
549 <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
550 <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
551 <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
552 <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
553 <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
554 <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
555 <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
556 </div>
557 }.into_any()
558}
559
560fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> AnyView {
561 use ftml_component_utils::{AnchorMenu, AnchorMenuEntry, Button};
562 let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
563 let num_failed = failed.len();
564 let num_done = done.len();
565 let del = delete_action(id);
566 view! {
567 <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
568 {if num_failed > 0 {Some(view!(
569 <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
570 ))} else { None }}
571 {migrate_button(id,num_failed)}
572 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
573 </div></div>
574 <div style="position:fixed;right:20px;z-index:5"><AnchorMenu>
575 <AnchorMenuEntry href="#failed">"Failed"</AnchorMenuEntry>
576 <AnchorMenuEntry href="#finished">"Finished"</AnchorMenuEntry>
577 </AnchorMenu></div>
578 {repos(id,true)}
579 <div style="text-align:left;">
580 <h3 id="failed">"Failed ("{num_failed}")"</h3>
581 <ul style="margin-left:30px">{
582 failed.iter().map(Entry::as_view).collect_view()
583 }</ul>
584 <h3 id="finished">"Finished ("{num_done}")"</h3>
585 <ul style="margin-left:30px">{
586 done.iter().map(Entry::as_view).collect_view()
587 }</ul>
588 </div>
589 }
590 .into_any()
591}
592
593fn migrate_button(id: NonZeroU32, num_failed: usize) -> AnyView {
594 use ftml_component_utils::{
595 BoldCaption, Button, Dialog, DialogBody, DialogContent, DialogSurface, Divider,
596 };
597 use leptos::either::EitherOf3;
598 if matches!(LoginState::get(), LoginState::NoAccounts) {
599 return ().into_any();
600 }
601 let update: UpdateQueues = expect_context();
602 let migrate = flams_web_utils::components::message_action(
603 move |()| flams_router_buildqueue_base::server_fns::migrate(id),
604 move |i| {
605 update.0.set(());
606 format!("{i} archives migrated")
607 },
608 );
609 if num_failed == 0 {
610 view! {
611 <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
612 }
613 .into_any()
614 } else {
615 let clicked = RwSignal::new(false);
616 view! {
617 <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
618 <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
619 <BoldCaption><span style="color:red">WARNING</span></BoldCaption>
620 <Divider/>
621 <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
622 <div>
623 <div style="width:min-content;margin-left:auto;">
624 <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
625 </div>
626 </div>
627 </DialogContent></DialogBody></DialogSurface></Dialog>
628 }
629 .into_any()
630 }
631}
632
633#[derive(Clone)]
634pub struct QueueSocket {
635 #[cfg(feature = "ssr")]
636 listener:
637 Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
638 #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
639 socket: leptos::web_sys::WebSocket,
640 #[cfg(feature = "docs-only")]
641 socket: (),
642 #[cfg(feature = "hydrate")]
643 _running: RwSignal<bool>,
644}
645impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
646 const SERVER_ENDPOINT: &'static str = "/ws/queue";
647}
648
649#[cfg(feature = "ssr")]
650#[async_trait::async_trait]
651impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
652 async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
653 match account {
654 LoginState::Admin
655 | LoginState::NoAccounts
656 | LoginState::User { is_admin: true, .. } => {
657 let listener = None; Some(Self {
659 listener,
660 #[cfg(feature = "hydrate")]
661 _running: RwSignal::new(false),
662 #[cfg(feature = "hydrate")]
663 socket: unreachable!(),
664 })
665 }
666 _ => None,
667 }
668 }
669 async fn next(&mut self) -> Option<QueueMessage> {
670 loop {
671 match &mut self.listener {
672 None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
673 Some(l) => return l.read().await.map(Into::into),
674 }
675 }
676 }
677 async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
678 let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
679 .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
680 self.listener = Some(lst);
681 Some(msg.into())
682 }
683 async fn on_start(&mut self, _: &mut ws::AxumWS) {}
684}
685
686#[cfg(feature = "hydrate")]
687impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
688 fn new(ws: leptos::web_sys::WebSocket) -> Self {
689 Self {
690 #[cfg(not(feature = "docs-only"))]
691 socket: ws,
692 #[cfg(feature = "docs-only")]
693 socket: (),
694 _running: RwSignal::new(false),
695 #[cfg(feature = "ssr")]
696 listener: unreachable!(),
697 }
698 }
699 fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
700 #[cfg(not(feature = "docs-only"))]
701 {
702 &mut self.socket
703 }
704 #[cfg(feature = "docs-only")]
705 {
706 unreachable!()
707 }
708 }
709 #[allow(clippy::used_underscore_binding)]
710 fn on_open(&self) -> Option<Box<dyn FnMut()>> {
711 let running = self._running;
712 Some(Box::new(move || {
713 running.set(true);
714 }))
715 }
716}
717
718#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
719impl QueueSocket {
720 fn run(_: AllQueues) {
721 Self::force_start_server();
722 }
723}
724
725#[cfg(feature = "hydrate")]
726impl QueueSocket {
727 fn run(queues: AllQueues) {
728 use ws::WebSocketClient;
729 Self::force_start_client(
730 move |msg| {
731 let current = queues.selected.get_untracked();
733 queues.queues.with_untracked(|queues| {
734 let Some(queue) = queues.get(¤t) else {
735 tracing::error!("Queue not found: {current}");
736 return;
737 };
738 Self::do_msg(*queue, msg);
739 });
740 if !queues.show.get_untracked() {
741 queues.show.set(true);
742 }
743 None
744 },
745 move |mut socket| {
746 #[allow(clippy::used_underscore_binding)]
747 Effect::new(move |_| {
748 if socket._running.get() {
749 let current = queues.selected.get_untracked();
750 socket.send(¤t);
751 }
752 });
753 },
754 );
755 }
756 fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
757 match msg {
758 QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
759 QueueMessage::Started {
760 running,
761 queue: actual_queue,
762 blocked,
763 failed,
764 done,
765 } => queue.set(QueueData::Running(RunningQueue {
766 running: RwSignal::new(running),
767 queue: RwSignal::new(actual_queue),
768 blocked: RwSignal::new(blocked),
769 failed: RwSignal::new(failed),
770 done: RwSignal::new(done),
771 eta: WrappedEta(RwSignal::new(Eta::default())),
772 })),
773 QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
774 QueueMessage::TaskStarted { id, mut target } => queue.with_untracked(|queue| {
775 if let QueueData::Running(RunningQueue {
776 queue,
777 running,
778 blocked,
779 ..
780 }) = queue
781 {
782 let mut worked = false;
783 queue.update(|v| {
784 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
785 return;
786 };
787 worked = true;
788 let e = v.remove(i);
789 e.steps
790 .update(|m| m.insert(std::mem::take(&mut target), TaskState::Running));
791 running.update(|running| running.push(e));
792 });
793 if !worked {
794 blocked.update(|v| {
795 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id)
796 else {
797 return;
798 };
799 worked = true;
800 let e = v.remove(i);
801 e.steps.update(|m| m.insert(target, TaskState::Running));
802 running.update(|running| running.push(e));
803 });
804 }
805 }
806 }),
807 QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
808 if let QueueData::Running(RunningQueue {
809 queue,
810 running,
811 done,
812 eta: etasignal,
813 ..
814 }) = queue
815 {
816 etasignal.0.set(eta);
817 running.update(|v| {
818 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
819 return;
820 };
821 let e = v.remove(i);
822 e.steps.update(|m| m.insert(target, TaskState::Done));
823 if e.steps.with_untracked(|v| {
824 v.iter()
825 .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
826 }) {
827 queue.update(|v| v.push(e));
828 } else {
829 done.update(|v| v.push(e));
830 }
831 });
832 }
833 }),
834 QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
835 if let QueueData::Running(RunningQueue {
836 running,
837 failed,
838 eta: etasignal,
839 ..
840 }) = queue
841 {
842 etasignal.0.set(eta);
843 running.update(|v| {
844 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
845 return;
846 };
847 let e = v.remove(i);
848 e.steps.update(|m| m.insert(target, TaskState::Failed));
849 failed.update(|v| v.push(e));
850 });
851 }
852 }),
853 QueueMessage::TaskBlocked { id, target, eta } => queue.with_untracked(|queue| {
854 if let QueueData::Running(RunningQueue {
855 running,
856 blocked,
857 eta: etasignal,
858 ..
859 }) = queue
860 {
861 etasignal.0.set(eta);
862 running.update(|v| {
863 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
864 return;
865 };
866 let e = v.remove(i);
867 e.steps.update(|m| m.insert(target, TaskState::Blocked));
868 blocked.update(|v| v.push(e));
869 });
870 }
871 }),
872 }
873 }
874}
875
876#[derive(Clone, Copy)]
877struct AllQueues {
878 show: RwSignal<bool>,
879 selected: RwSignal<NonZeroU32>,
880 queue_names: RwSignal<VecMap<NonZeroU32, String>>,
881 queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
882 queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
883}
884
885impl AllQueues {
886 fn new(ids: Vec<QueueInfo>) -> Self {
887 let queues = RwSignal::new(
888 ids.iter()
889 .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
890 .collect(),
891 );
892 let selected = ids.first().map_or_else(
893 || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
894 |v| v.id,
895 );
896 let mut queue_names = VecMap::default();
897 let mut queue_repos = VecMap::default();
898 for d in ids {
899 queue_names.insert(d.id, d.name);
900 queue_repos.insert(d.id, d.archives);
901 }
902 Self {
903 show: RwSignal::new(false),
904 selected: RwSignal::new(selected),
905 queues,
906 queue_names: RwSignal::new(queue_names),
907 queue_repos: RwSignal::new(queue_repos),
908 }
909 }
910}
911
912#[derive(Clone)]
913#[allow(dead_code)]
914enum QueueData {
915 Idle(RwSignal<Vec<Entry>>),
916 Running(RunningQueue),
917 Empty,
918 Finished(Vec<Entry>, Vec<Entry>),
919}
920
921#[derive(Clone, Copy)] #[allow(dead_code)]
923struct RunningQueue {
924 running: RwSignal<Vec<Entry>>,
925 queue: RwSignal<Vec<Entry>>,
926 blocked: RwSignal<Vec<Entry>>,
927 failed: RwSignal<Vec<Entry>>,
928 done: RwSignal<Vec<Entry>>,
929 eta: WrappedEta,
930}
931
932#[derive(Clone, Copy)]
933struct WrappedEta(RwSignal<ftml_ontology::utils::time::Eta>);
934
935#[allow(clippy::cast_precision_loss)]
936impl WrappedEta {
937 fn into_view(self) -> impl IntoView {
938 use ftml_component_utils::ProgressBar;
939 inject_css(
940 "flams-eta",
941 r"
942.flams-progress-bar {height:10px;}
943 ",
944 );
945 let pctg = Memo::new(move |_| {
946 let eta = self.0.get();
947 ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
948 });
949 let time_left = move || {
950 let eta = self.0.get();
951 if eta.time_left == Delta::default() {
952 "N/A".to_string()
953 } else {
954 eta.time_left.max_seconds().to_string()
955 }
956 };
957 view! {
958 <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
959 {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
960 </div>
961 }
962 }
963}