1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4
5#[cfg(any(
6 all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7 not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_router_base::ws;
12use flams_router_base::{LoginState, require_login, ws::WebSocket};
13#[cfg(feature = "hydrate")]
14use flams_router_buildqueue_base::server_fns::get_log;
15use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
16use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
17use flams_utils::vecmap::VecMap;
18use flams_web_utils::components::wait_and_then_fn;
19use ftml_dom::utils::css::inject_css;
20use ftml_ontology::utils::time::{Delta, Eta};
21use ftml_uris::ArchiveId;
22use leptos::{either::EitherOf4, prelude::*};
23use leptos_router::hooks::use_params_map;
24use std::num::NonZeroU32;
25
26#[derive(Copy, Clone)]
27struct UpdateQueues(RwSignal<()>);
28
29#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
30pub struct Entry {
31 id: u32,
32 archive: ArchiveId,
33 rel_path: String,
34 #[cfg(feature = "hydrate")]
35 steps: RwSignal<VecMap<String, TaskState>>,
36 #[cfg(not(feature = "hydrate"))]
37 steps: VecMap<String, TaskState>,
38}
39
40impl Entry {
41 #[cfg(not(feature = "hydrate"))]
42 fn as_view(&self) -> impl IntoView + use<> {
43 view! {
44 <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
45 }
46 }
47
48 #[cfg(feature = "hydrate")]
49 fn as_view(&self) -> AnyView {
50 use flams_router_base::vscode_link;
51 use flams_web_utils::components::{Collapsible, Header};
52
53 let vscode = vscode_link(&self.archive, &self.rel_path);
54
55 let title = format!("[{}]{}", self.archive, self.rel_path);
56 let total = self.steps.with_untracked(|v| v.0.len());
57 let steps = self.steps;
58 let current = move || {
59 steps.with(|v| {
60 v.iter()
61 .enumerate()
62 .find_map(|(i, (e, s))| {
63 if *s == TaskState::Done {
64 None
65 } else {
66 Some((i + 1, e.clone()))
67 }
68 })
69 .unwrap_or_else(|| (total, "Done".to_string()))
70 })
71 };
72 let rel_path = self.rel_path.clone();
73 let archive = self.archive.clone();
74 view! {
75 <li><Collapsible>
76 <Header slot>
77 <b>
78 {title}
79 {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
80 " "{vscode}
81 </b>
82 </Header>
83 <ol>
84 {let rel_path = rel_path.clone();
85 let archive = archive.clone();
86 move || steps.get().iter().map(|(t,e)|
87 view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
88 ).collect_view()}
89 </ol>
90 </Collapsible></li>
91 }.into_any()
92 }
93}
94
95#[cfg(feature = "ssr")]
96impl From<flams_system::building::QueueEntry> for Entry {
97 fn from(e: flams_system::building::QueueEntry) -> Self {
98 #[cfg(feature = "hydrate")]
99 {
100 unreachable!()
101 }
102 #[cfg(not(feature = "hydrate"))]
103 Self {
104 id: e.id.into(),
105 archive: e.archive,
106 rel_path: e.rel_path.to_string(),
107 steps: e
108 .steps
109 .into_iter()
110 .map(|(k, v)| (k.to_string(), v.into()))
111 .collect(),
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
117pub enum TaskState {
118 Running,
119 Queued,
120 Blocked,
121 Done,
122 Failed,
123 None,
124}
125impl TaskState {
126 #[cfg(feature = "hydrate")]
127 fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> AnyView {
128 use flams_web_utils::components::{Header, LazyCollapsible};
129 use thaw::Scrollbar;
130 match self {
131 Self::Running => view! {<i style="color:yellow">{t}" (Running)"</i>}.into_any(),
132 Self::Queued | Self::Blocked | Self::None => {
133 view! {<span style="color:gray">{t}" (...)"</span>}.into_any()
134 }
135 Self::Done => {
136 let archive = archive.clone();
137 let rel_path = rel_path.to_string();
138 let tc = t.clone();
139 view! {
140 <LazyCollapsible>
141 <Header slot><span style="color:green">{t}" (Done)"</span></Header>
142 {
143 let archive = archive.clone();
144 let rel_path = rel_path.clone();
145 let tc = tc.clone();
146 let queue = expect_context::<AllQueues>().selected.get_untracked();
147 require_login(Box::new(move || wait_and_then_fn(
148 move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
149 |s| {
150 view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
151 <pre style="width:fit-content;font-size:smaller;">{s}</pre>
152 </Scrollbar>}.into_any()
153 }
154 )))
155 }
156 </LazyCollapsible>
157 }.into_any()
158 }
159 Self::Failed => {
160 let archive = archive.clone();
161 let rel_path = rel_path.to_string();
162 let tc = t.clone();
163 view! {
164 <LazyCollapsible>
165 <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
166 {
167 let archive = archive.clone();
168 let rel_path = rel_path.clone();
169 let tc = tc.clone();
170 let queue = expect_context::<AllQueues>().selected.get_untracked();
171 require_login(Box::new(move || wait_and_then_fn(
172 move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
173 |s| {
174 view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
175 <pre style="width:fit-content;font-size:smaller;">{s}</pre>
176 </Scrollbar>}
177 }.into_any()
178 )))
179 }
180 </LazyCollapsible>
181 }.into_any()
182 }
183 }
184 }
185}
186
187#[cfg(feature = "ssr")]
188impl From<flams_system::building::TaskState> for TaskState {
189 fn from(e: flams_system::building::TaskState) -> Self {
190 use flams_system::building::TaskState;
191 match e {
192 TaskState::Running => Self::Running,
193 TaskState::Queued => Self::Queued,
194 TaskState::Blocked => Self::Blocked,
195 TaskState::Done => Self::Done,
196 TaskState::Failed => Self::Failed,
197 TaskState::None => Self::None,
198 }
199 }
200}
201
202#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203pub enum QueueMessage {
204 Idle(Vec<Entry>),
205 Started {
206 running: Vec<Entry>,
207 queue: Vec<Entry>,
208 blocked: Vec<Entry>,
209 failed: Vec<Entry>,
210 done: Vec<Entry>,
211 },
212 Finished {
213 failed: Vec<Entry>,
214 done: Vec<Entry>,
215 },
216 TaskStarted {
217 id: u32,
218 target: String,
219 },
220 TaskSuccess {
221 id: u32,
222 target: String,
223 eta: Eta,
224 },
225 TaskFailed {
226 id: u32,
227 target: String,
228 eta: Eta,
229 },
230}
231#[cfg(feature = "ssr")]
232impl From<flams_system::building::QueueMessage> for QueueMessage {
233 fn from(e: flams_system::building::QueueMessage) -> Self {
234 use flams_system::building::QueueMessage;
235 match e {
236 QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
237 QueueMessage::Started {
238 running,
239 queue,
240 blocked,
241 failed,
242 done,
243 } => Self::Started {
244 running: running.into_iter().map(Into::into).collect(),
245 queue: queue.into_iter().map(Into::into).collect(),
246 blocked: blocked.into_iter().map(Into::into).collect(),
247 failed: failed.into_iter().map(Into::into).collect(),
248 done: done.into_iter().map(Into::into).collect(),
249 },
250 QueueMessage::Finished { failed, done } => Self::Finished {
251 failed: failed.into_iter().map(Into::into).collect(),
252 done: done.into_iter().map(Into::into).collect(),
253 },
254 QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
255 id: id.into(),
256 target: target.to_string(),
257 },
258 QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
259 id: id.into(),
260 target: target.to_string(),
261 eta,
262 },
263 QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
264 id: id.into(),
265 target: target.to_string(),
266 eta,
267 },
268 }
269 }
270}
271
272#[component]
275pub fn QueuesTop() -> AnyView {
276 use flams_web_utils::components::Spinner;
277 use thaw::{Divider, Layout, Tab, TabList};
278
279 let update = UpdateQueues(RwSignal::new(()));
280 provide_context(update);
281 (move || {
282 let () = update.0.get();
283 let params = use_params_map();
284 let id = move || params.read().get("queue");
285
286 require_login(Box::new(move || {
287 wait_and_then_fn(server_fns::get_queues, move |v| {
288 if v.is_empty() {
289 return view!(<div>"(No running queues)"</div>).into_any();
290 }
291 let queues = AllQueues::new(v);
292 if let Some(id) = id() {
293 if let Ok(id) = id.parse() {
294 queues.selected.update_untracked(|v| *v = id);
295 }
296 }
297 provide_context(queues);
298 let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
299 let _ = Effect::new(move |_| {
300 let value = selected_value.get();
301 let selected = queues.selected.get_untracked();
302 let value = value.parse().unwrap_or_else(|_| unreachable!());
303 if selected != value {
304 queues.selected.set(value);
305 }
306 });
307 inject_css(
308 "flams-fullscreen",
309 ".flams-fullscreen { width:100%; height:calc(100% - 44px - 21px) }",
310 );
311 view! {
312 <TabList selected_value>
313 <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
314 <Tab value=i.to_string()>{
315 queues.queue_names.get().get(&i).unwrap_or_else(|| unreachable!()).clone()
316 }</Tab>
317 }/>
318 </TabList>
319 <div style="margin:10px"><Divider/></div>
320 <Layout class="flams-fullscreen">{move || {
321 let curr = queues.selected.get();
322 queues.show.update_untracked(|v| *v = false);
323 QueueSocket::run(queues);
324 move || view! {
325 <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
326 let ls = *queues.queues.get_untracked().get(&curr).unwrap_or_else(|| unreachable!());
327 move || match ls.get() {
328 QueueData::Idle(v) => {
329 idle(curr,v)
330 },
331 QueueData::Running(r) => {
332 running(curr,r)
333 },
334 QueueData::Finished(failed,done) => finished(curr,failed,done),
335 QueueData::Empty => view!(<div>"Other"</div>).into_any()
336 }
337 }</Show>
338 }
339 }}</Layout>
340 }.into_any()
341 })
342 }) as _)
343 }).into_any()
344}
345
346#[allow(clippy::too_many_lines)]
347fn repos(queue_id: NonZeroU32, allowed: bool) -> AnyView {
348 use flams_web_utils::components::{Collapsible, Header};
349 use thaw::{
350 Caption1Strong, Table, TableBody, TableCell, TableCellLayout, TableHeader, TableHeaderCell,
351 TableRow,
352 };
353 if matches!(LoginState::get(), LoginState::NoAccounts) {
354 return ().into_any();
355 }
356 let queues: AllQueues = expect_context();
357 let Some(repos) = queues
358 .queue_repos
359 .with_untracked(|v| v.get(&queue_id).cloned())
360 .flatten() else {
361 return ().into_any()
362 };
363 if repos.is_empty() {
364 return ().into_any();
365 }
366 let style = if allowed { "" } else { "color:gray;" };
367 inject_css("flams-repo-table", include_str!("repo-table.css"));
368 view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
369 <Header slot><Caption1Strong>"Archives"</Caption1Strong></Header>
370 <Table class="flams-repo-table">
371 <TableHeader><TableRow>
372 <TableHeaderCell><Caption1Strong>"Archive"</Caption1Strong></TableHeaderCell>
373 <TableHeaderCell><Caption1Strong>"Branch"</Caption1Strong></TableHeaderCell>
374 <TableHeaderCell><Caption1Strong>"Commit"</Caption1Strong></TableHeaderCell>
375 </TableRow></TableHeader>
376 <TableBody>{
377 repos.into_iter().map(|d| match d {
378 RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
379 <TableRow>
380 <TableCell><TableCellLayout><span style=style>{id.to_string()}</span></TableCellLayout></TableCell>
381 <TableCell><TableCellLayout>"(Copied from MathHub)"</TableCellLayout></TableCell>
382 </TableRow>
383 }),
384 RepoInfo::Git{id,branch,commit,remote} => leptos::either::Either::Right({
385 let updates = RwSignal::<Option<Vec<(String,flams_backend_types::git::Commit)>>>::new(None);
386 let style = move || if allowed {
387 updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
388 "color:green;"
389 } else {
390 "color:yellowgreen;"
391 }))
392 } else {style};
393 let idstr = id.to_string();
394 view!{
395 <TableRow>
396 <TableCell><TableCellLayout><span style=style>{idstr}</span></TableCellLayout></TableCell>
397 <TableCell><TableCellLayout>{branch}</TableCellLayout></TableCell>
398 <TableCell><TableCellLayout>
399 {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
400 {if allowed {Some(move || updates.with(|up| {
401 let Some(up) = up else {
402 let aid = id.clone();
403 let toaster = thaw::ToasterInjection::expect_context();
404 let get_updates = Action::new(move |()| {
405 let id = aid.clone();
406 async move {
407 match get_new_commits(Some(queue_id),id).await {
408 Ok(v) => updates.set(Some(v)),
409 Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
410 }
411 }
412 });
413 return leptos::either::EitherOf3::A(view!{
414 <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
415 });
416 };
417 if up.is_empty() {
418 return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
419 }
420 let updates = up.clone();
421
422 leptos::either::EitherOf3::C({
423 use thaw::{Button,ButtonSize,Combobox,ComboboxOption};
424 let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
425 let branch = RwSignal::new(first.clone());
426 let _ = Effect::new(move || if branch.with(String::is_empty) {
427 branch.set(first.clone());
428 });
429 let update : UpdateQueues = expect_context();
430 let archive = id.clone();
432 let remote = remote.clone();
433 let act = flams_web_utils::components::message_action(
434 move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
435 move |(i,_)| {
436 update.0.set(());
437 format!("{i} jobs queued")
438 }
439 );
440 view!{
441 <span style="color:green">
442 " Updates available: "
443 </span>
444 <div style="margin-left:10px">
445 <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
446 " from branch: "
447 <div style="display:inline-block;"><Combobox selected_options=branch>{
448 updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
449 <ComboboxOption text=vname.clone() value=vname>
450 {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
451 </ComboboxOption>
452 }}).collect_view()
453 }</Combobox></div>
454 </div>
455 }
456 })
457 }))} else {None}}
458 </TableCellLayout></TableCell>
459 </TableRow>
460 }
461 }),
462 }).collect_view()
463 }</TableBody>
464 </Table>
465 </Collapsible></div>
466 }.into_any()
467}
468
469fn delete_action(id: NonZeroU32) -> Action<(), ()> {
470 use thaw::ToasterInjection;
471 let update: UpdateQueues = expect_context();
472 let toaster = ToasterInjection::expect_context();
473 Action::new(move |()| async move {
474 match flams_router_buildqueue_base::server_fns::delete(id).await {
475 Ok(()) => update.0.set(()),
476 Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
477 }
478 })
479}
480
481fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> AnyView {
482 use thaw::Button;
483 let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
484 flams_router_buildqueue_base::server_fns::run(id)
485 });
486 let del = delete_action(id);
487 view! {
488 <div style="width:100%"><div style="position:fixed;right:20px">
489 <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
490 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
491 </div></div>
492 {repos(id,true)}
493 <ol reversed style="margin-left:30px">
494 <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
495 </ol>
496 }.into_any()
497}
498
499fn running(id: NonZeroU32, queue: RunningQueue) -> AnyView {
500 use flams_web_utils::components::{Anchor, AnchorLink, Header};
501 use thaw::{Button, Layout};
502 let del = delete_action(id);
503 let RunningQueue {
504 running,
505 queue,
506 blocked,
507 failed,
508 done,
509 eta,
510 } = queue;
511 view! {
512 <div style="position:fixed;right:20px;z-index:5"><Anchor>
513 <AnchorLink href="#running"><Header slot>"Running"</Header></AnchorLink>
514 <AnchorLink href="#queued"><Header slot>"Queued"</Header></AnchorLink>
515 <AnchorLink href="#blocked"><Header slot>"Blocked"</Header></AnchorLink>
516 <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
517 <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
518 </Anchor></div>
519 {repos(id,false)}
520 <Layout content_style="text-align:left;">
521 {eta.into_view()}
522 <div style="width:100%"><div style="position:fixed;right:20px">
523 <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
524 </div></div>
525 <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
526 <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
527 <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
528 <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
529 <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
530 <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
531 <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
532 <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
533 <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
534 <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
535 </Layout>
536 }.into_any()
537}
538
539fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> AnyView {
540 use flams_web_utils::components::{Anchor, AnchorLink, Header};
541 use thaw::{Button, Layout};
542 let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
543 let num_failed = failed.len();
544 let num_done = done.len();
545 let del = delete_action(id);
546 view! {
547 <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
548 {if num_failed > 0 {Some(view!(
549 <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
550 ))} else { None }}
551 {migrate_button(id,num_failed)}
552 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
553 </div></div>
554 <div style="position:fixed;right:20px;z-index:5"><Anchor>
555 <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
556 <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
557 </Anchor></div>
558 {repos(id,true)}
559 <Layout content_style="text-align:left;">
560 <h3 id="failed">"Failed ("{num_failed}")"</h3>
561 <ul style="margin-left:30px">{
562 failed.iter().map(Entry::as_view).collect_view()
563 }</ul>
564 <h3 id="finished">"Finished ("{num_done}")"</h3>
565 <ul style="margin-left:30px">{
566 done.iter().map(Entry::as_view).collect_view()
567 }</ul>
568 </Layout>
569 }.into_any()
570}
571
572fn migrate_button(id: NonZeroU32, num_failed: usize) -> AnyView {
573 use leptos::either::EitherOf3;
574 use thaw::{Button, Caption1Strong, Dialog, DialogBody, DialogContent, DialogSurface, Divider};
575 if matches!(LoginState::get(), LoginState::NoAccounts) {
576 return ().into_any();
577 }
578 let update: UpdateQueues = expect_context();
579 let migrate = flams_web_utils::components::message_action(
580 move |()| flams_router_buildqueue_base::server_fns::migrate(id),
581 move |i| {
582 update.0.set(());
583 format!("{i} archives migrated")
584 },
585 );
586 if num_failed == 0 {
587 view! {
588 <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
589 }.into_any()
590 } else {
591 let clicked = RwSignal::new(false);
592 view! {
593 <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
594 <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
595 <Caption1Strong><span style="color:red">WARNING</span></Caption1Strong>
596 <Divider/>
597 <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
598 <div>
599 <div style="width:min-content;margin-left:auto;">
600 <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
601 </div>
602 </div>
603 </DialogContent></DialogBody></DialogSurface></Dialog>
604 }.into_any()
605 }
606}
607
608#[derive(Clone)]
609pub struct QueueSocket {
610 #[cfg(feature = "ssr")]
611 listener:
612 Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
613 #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
614 socket: leptos::web_sys::WebSocket,
615 #[cfg(feature = "docs-only")]
616 socket: (),
617 #[cfg(feature = "hydrate")]
618 _running: RwSignal<bool>,
619}
620impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
621 const SERVER_ENDPOINT: &'static str = "/ws/queue";
622}
623
624#[cfg(feature = "ssr")]
625#[async_trait::async_trait]
626impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
627 async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
628 match account {
629 LoginState::Admin
630 | LoginState::NoAccounts
631 | LoginState::User { is_admin: true, .. } => {
632 let listener = None; Some(Self {
634 listener,
635 #[cfg(feature = "hydrate")]
636 _running: RwSignal::new(false),
637 #[cfg(feature = "hydrate")]
638 socket: unreachable!(),
639 })
640 }
641 _ => None,
642 }
643 }
644 async fn next(&mut self) -> Option<QueueMessage> {
645 loop {
646 match &mut self.listener {
647 None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
648 Some(l) => return l.read().await.map(Into::into),
649 }
650 }
651 }
652 async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
653 let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
654 .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
655 self.listener = Some(lst);
656 Some(msg.into())
657 }
658 async fn on_start(&mut self, _: &mut ws::AxumWS) {}
659}
660
661#[cfg(feature = "hydrate")]
662impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
663 fn new(ws: leptos::web_sys::WebSocket) -> Self {
664 Self {
665 #[cfg(not(feature = "docs-only"))]
666 socket: ws,
667 #[cfg(feature = "docs-only")]
668 socket: (),
669 _running: RwSignal::new(false),
670 #[cfg(feature = "ssr")]
671 listener: unreachable!(),
672 }
673 }
674 fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
675 #[cfg(not(feature = "docs-only"))]
676 {
677 &mut self.socket
678 }
679 #[cfg(feature = "docs-only")]
680 {
681 unreachable!()
682 }
683 }
684 #[allow(clippy::used_underscore_binding)]
685 fn on_open(&self) -> Option<Box<dyn FnMut()>> {
686 let running = self._running;
687 Some(Box::new(move || {
688 running.set(true);
689 }))
690 }
691}
692
693#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
694impl QueueSocket {
695 fn run(_: AllQueues) {
696 Self::force_start_server();
697 }
698}
699
700#[cfg(feature = "hydrate")]
701impl QueueSocket {
702 fn run(queues: AllQueues) {
703 use ws::WebSocketClient;
704 Self::force_start_client(
705 move |msg| {
706 let current = queues.selected.get_untracked();
708 queues.queues.with_untracked(|queues| {
709 let Some(queue) = queues.get(¤t) else {
710 tracing::error!("Queue not found: {current}");
711 return;
712 };
713 Self::do_msg(*queue, msg);
714 });
715 if !queues.show.get_untracked() {
716 queues.show.set(true);
717 }
718 None
719 },
720 move |mut socket| {
721 #[allow(clippy::used_underscore_binding)]
722 Effect::new(move |_| {
723 if socket._running.get() {
724 let current = queues.selected.get_untracked();
725 socket.send(¤t);
726 }
727 });
728 },
729 );
730 }
731 fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
732 match msg {
733 QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
734 QueueMessage::Started {
735 running,
736 queue: actual_queue,
737 blocked,
738 failed,
739 done,
740 } => queue.set(QueueData::Running(RunningQueue {
741 running: RwSignal::new(running),
742 queue: RwSignal::new(actual_queue),
743 blocked: RwSignal::new(blocked),
744 failed: RwSignal::new(failed),
745 done: RwSignal::new(done),
746 eta: WrappedEta(RwSignal::new(Eta::default())),
747 })),
748 QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
749 QueueMessage::TaskStarted { id, target } => queue.with_untracked(|queue| {
750 if let QueueData::Running(RunningQueue { queue, running, .. }) = queue {
751 queue.update(|v| {
752 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
753 return;
754 };
755 let e = v.remove(i);
756 e.steps.update(|m| m.insert(target, TaskState::Running));
757 running.update(|running| running.push(e));
758 });
759 }
760 }),
761 QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
762 if let QueueData::Running(RunningQueue {
763 queue,
764 running,
765 done,
766 eta: etasignal,
767 ..
768 }) = queue
769 {
770 etasignal.0.set(eta);
771 running.update(|v| {
772 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
773 return;
774 };
775 let e = v.remove(i);
776 e.steps.update(|m| m.insert(target, TaskState::Done));
777 if e.steps.with_untracked(|v| {
778 v.iter()
779 .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
780 }) {
781 queue.update(|v| v.push(e));
782 } else {
783 done.update(|v| v.push(e));
784 }
785 });
786 }
787 }),
788 QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
789 if let QueueData::Running(RunningQueue {
790 running,
791 failed,
792 eta: etasignal,
793 ..
794 }) = queue
795 {
796 etasignal.0.set(eta);
797 running.update(|v| {
798 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
799 return;
800 };
801 let e = v.remove(i);
802 e.steps.update(|m| m.insert(target, TaskState::Failed));
803 failed.update(|v| v.push(e));
804 });
805 }
806 }),
807 }
808 }
809}
810
811#[derive(Clone, Copy)]
812struct AllQueues {
813 show: RwSignal<bool>,
814 selected: RwSignal<NonZeroU32>,
815 queue_names: RwSignal<VecMap<NonZeroU32, String>>,
816 queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
817 queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
818}
819
820impl AllQueues {
821 fn new(ids: Vec<QueueInfo>) -> Self {
822 let queues = RwSignal::new(
823 ids.iter()
824 .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
825 .collect(),
826 );
827 let selected = ids.first().map_or_else(
828 || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
829 |v| v.id,
830 );
831 let mut queue_names = VecMap::default();
832 let mut queue_repos = VecMap::default();
833 for d in ids {
834 queue_names.insert(d.id, d.name);
835 queue_repos.insert(d.id, d.archives);
836 }
837 Self {
838 show: RwSignal::new(false),
839 selected: RwSignal::new(selected),
840 queues,
841 queue_names: RwSignal::new(queue_names),
842 queue_repos: RwSignal::new(queue_repos),
843 }
844 }
845}
846
847#[derive(Clone)]
848#[allow(dead_code)]
849enum QueueData {
850 Idle(RwSignal<Vec<Entry>>),
851 Running(RunningQueue),
852 Empty,
853 Finished(Vec<Entry>, Vec<Entry>),
854}
855
856#[derive(Clone, Copy)] #[allow(dead_code)]
858struct RunningQueue {
859 running: RwSignal<Vec<Entry>>,
860 queue: RwSignal<Vec<Entry>>,
861 blocked: RwSignal<Vec<Entry>>,
862 failed: RwSignal<Vec<Entry>>,
863 done: RwSignal<Vec<Entry>>,
864 eta: WrappedEta,
865}
866
867#[derive(Clone, Copy)]
868struct WrappedEta(RwSignal<ftml_ontology::utils::time::Eta>);
869
870#[allow(clippy::cast_precision_loss)]
871impl WrappedEta {
872 fn into_view(self) -> impl IntoView {
873 use thaw::ProgressBar;
874 inject_css(
875 "flams-eta",
876 r"
877.flams-progress-bar {height:10px;}
878 ",
879 );
880 let pctg = Memo::new(move |_| {
881 let eta = self.0.get();
882 ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
883 });
884 let time_left = move || {
885 let eta = self.0.get();
886 if eta.time_left == Delta::default() {
887 "N/A".to_string()
888 } else {
889 eta.time_left.max_seconds().to_string()
890 }
891 };
892 view! {
893 <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
894 {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
895 </div>
896 }
897 }
898}