1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_auto_cfg))]
4
5#[cfg(any(
6 all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7 not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_ontology::uris::ArchiveId;
12use flams_router_base::ws;
13use flams_router_base::{LoginState, require_login, ws::WebSocket};
14#[cfg(feature = "hydrate")]
15use flams_router_buildqueue_base::server_fns::get_log;
16use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
17use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
18use flams_utils::time::{Delta, Eta};
19use flams_utils::vecmap::VecMap;
20use flams_web_utils::{components::wait_and_then_fn, inject_css};
21use leptos::{either::EitherOf4, prelude::*};
22use leptos_router::hooks::use_params_map;
23use std::num::NonZeroU32;
24
25#[derive(Copy, Clone)]
26struct UpdateQueues(RwSignal<()>);
27
28#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
29pub struct Entry {
30 id: u32,
31 archive: ArchiveId,
32 rel_path: String,
33 #[cfg(feature = "hydrate")]
34 steps: RwSignal<VecMap<String, TaskState>>,
35 #[cfg(not(feature = "hydrate"))]
36 steps: VecMap<String, TaskState>,
37}
38
39impl Entry {
40 #[cfg(not(feature = "hydrate"))]
41 fn as_view(&self) -> impl IntoView + use<> {
42 view! {
43 <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
44 }
45 }
46
47 #[cfg(feature = "hydrate")]
48 fn as_view(&self) -> impl IntoView + use<> {
49 use flams_router_base::vscode_link;
50 use flams_web_utils::components::{Collapsible, Header};
51
52 let vscode = vscode_link(&self.archive, &self.rel_path);
53
54 let title = format!("[{}]{}", self.archive, self.rel_path);
55 let total = self.steps.with_untracked(|v| v.0.len());
56 let steps = self.steps;
57 let current = move || {
58 steps.with(|v| {
59 v.iter()
60 .enumerate()
61 .find_map(|(i, (e, s))| {
62 if *s == TaskState::Done {
63 None
64 } else {
65 Some((i + 1, e.clone()))
66 }
67 })
68 .unwrap_or_else(|| (total, "Done".to_string()))
69 })
70 };
71 let rel_path = self.rel_path.clone();
72 let archive = self.archive.clone();
73 view! {
74 <li><Collapsible>
75 <Header slot>
76 <b>
77 {title}
78 {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
79 " "{vscode}
80 </b>
81 </Header>
82 <ol>
83 {let rel_path = rel_path.clone();
84 let archive = archive.clone();
85 move || steps.get().iter().map(|(t,e)|
86 view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
87 ).collect_view()}
88 </ol>
89 </Collapsible></li>
90 }
91 }
92}
93
94#[cfg(feature = "ssr")]
95impl From<flams_system::building::QueueEntry> for Entry {
96 fn from(e: flams_system::building::QueueEntry) -> Self {
97 #[cfg(feature = "hydrate")]
98 {
99 unreachable!()
100 }
101 #[cfg(not(feature = "hydrate"))]
102 Self {
103 id: e.id.into(),
104 archive: e.archive,
105 rel_path: e.rel_path.to_string(),
106 steps: e
107 .steps
108 .into_iter()
109 .map(|(k, v)| (k.to_string(), v.into()))
110 .collect(),
111 }
112 }
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
116pub enum TaskState {
117 Running,
118 Queued,
119 Blocked,
120 Done,
121 Failed,
122 None,
123}
124impl TaskState {
125 #[cfg(feature = "hydrate")]
126 fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> impl IntoView + use<> {
127 use flams_web_utils::components::{Header, LazyCollapsible};
128 use thaw::Scrollbar;
129 match self {
130 Self::Running => EitherOf4::A(view! {<i style="color:yellow">{t}" (Running)"</i>}),
131 Self::Queued | Self::Blocked | Self::None => {
132 EitherOf4::B(view! {<span style="color:gray">{t}" (...)"</span>})
133 }
134 Self::Done => {
135 let archive = archive.clone();
136 let rel_path = rel_path.to_string();
137 let tc = t.clone();
138 EitherOf4::C(view! {
139 <LazyCollapsible>
140 <Header slot><span style="color:green">{t}" (Done)"</span></Header>
141 {
142 let archive = archive.clone();
143 let rel_path = rel_path.clone();
144 let tc = tc.clone();
145 let queue = expect_context::<AllQueues>().selected.get_untracked();
146 require_login(move || wait_and_then_fn(
147 move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
148 |s| {
149 view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
150 <pre style="width:fit-content;font-size:smaller;">{s}</pre>
151 </Scrollbar>}
152 }
153 ))
154 }
155 </LazyCollapsible>
156 })
157 }
158 Self::Failed => {
159 let archive = archive.clone();
160 let rel_path = rel_path.to_string();
161 let tc = t.clone();
162 EitherOf4::D(view! {
163 <LazyCollapsible>
164 <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
165 {
166 let archive = archive.clone();
167 let rel_path = rel_path.clone();
168 let tc = tc.clone();
169 let queue = expect_context::<AllQueues>().selected.get_untracked();
170 require_login(move || wait_and_then_fn(
171 move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
172 |s| {
173 view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
174 <pre style="width:fit-content;font-size:smaller;">{s}</pre>
175 </Scrollbar>}
176 }
177 ))
178 }
179 </LazyCollapsible>
180 })
181 }
182 }
183 }
184}
185
186#[cfg(feature = "ssr")]
187impl From<flams_system::building::TaskState> for TaskState {
188 fn from(e: flams_system::building::TaskState) -> Self {
189 use flams_system::building::TaskState;
190 match e {
191 TaskState::Running => Self::Running,
192 TaskState::Queued => Self::Queued,
193 TaskState::Blocked => Self::Blocked,
194 TaskState::Done => Self::Done,
195 TaskState::Failed => Self::Failed,
196 TaskState::None => Self::None,
197 }
198 }
199}
200
201#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
202pub enum QueueMessage {
203 Idle(Vec<Entry>),
204 Started {
205 running: Vec<Entry>,
206 queue: Vec<Entry>,
207 blocked: Vec<Entry>,
208 failed: Vec<Entry>,
209 done: Vec<Entry>,
210 },
211 Finished {
212 failed: Vec<Entry>,
213 done: Vec<Entry>,
214 },
215 TaskStarted {
216 id: u32,
217 target: String,
218 },
219 TaskSuccess {
220 id: u32,
221 target: String,
222 eta: Eta,
223 },
224 TaskFailed {
225 id: u32,
226 target: String,
227 eta: Eta,
228 },
229}
230#[cfg(feature = "ssr")]
231impl From<flams_system::building::QueueMessage> for QueueMessage {
232 fn from(e: flams_system::building::QueueMessage) -> Self {
233 use flams_system::building::QueueMessage;
234 match e {
235 QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
236 QueueMessage::Started {
237 running,
238 queue,
239 blocked,
240 failed,
241 done,
242 } => Self::Started {
243 running: running.into_iter().map(Into::into).collect(),
244 queue: queue.into_iter().map(Into::into).collect(),
245 blocked: blocked.into_iter().map(Into::into).collect(),
246 failed: failed.into_iter().map(Into::into).collect(),
247 done: done.into_iter().map(Into::into).collect(),
248 },
249 QueueMessage::Finished { failed, done } => Self::Finished {
250 failed: failed.into_iter().map(Into::into).collect(),
251 done: done.into_iter().map(Into::into).collect(),
252 },
253 QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
254 id: id.into(),
255 target: target.to_string(),
256 },
257 QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
258 id: id.into(),
259 target: target.to_string(),
260 eta,
261 },
262 QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
263 id: id.into(),
264 target: target.to_string(),
265 eta,
266 },
267 }
268 }
269}
270
271#[component]
274pub fn QueuesTop() -> impl IntoView {
275 use flams_web_utils::components::Spinner;
276 use thaw::{Divider, Layout, Tab, TabList};
277
278 let update = UpdateQueues(RwSignal::new(()));
279 provide_context(update);
280 move || {
281 let () = update.0.get();
282 let params = use_params_map();
283 let id = move || params.read().get("queue");
284
285 require_login(move || {
286 wait_and_then_fn(server_fns::get_queues, move |v| {
287 if v.is_empty() {
288 return leptos::either::Either::Left(view!(<div>"(No running queues)"</div>));
289 }
290 let queues = AllQueues::new(v);
291 if let Some(id) = id() {
292 if let Ok(id) = id.parse() {
293 queues.selected.update_untracked(|v| *v = id);
294 }
295 }
296 provide_context(queues);
297 let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
298 let _ = Effect::new(move |_| {
299 let value = selected_value.get();
300 let selected = queues.selected.get_untracked();
301 let value = value.parse().unwrap_or_else(|_| unreachable!());
302 if selected != value {
303 queues.selected.set(value);
304 }
305 });
306 inject_css(
307 "flams-fullscreen",
308 ".flams-fullscreen { width:100%; height:calc(100% - 44px - 21px) }",
309 );
310 leptos::either::Either::Right(view! {
311 <TabList selected_value>
312 <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
313 <Tab value=i.to_string()>{
314 queues.queue_names.get().get(&i).unwrap_or_else(|| unreachable!()).clone()
315 }</Tab>
316 }/>
317 </TabList>
318 <div style="margin:10px"><Divider/></div>
319 <Layout class="flams-fullscreen">{move || {
320 let curr = queues.selected.get();
321 queues.show.update_untracked(|v| *v = false);
322 QueueSocket::run(queues);
323 move || view! {
324 <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
325 let ls = *queues.queues.get_untracked().get(&curr).unwrap_or_else(|| unreachable!());
326 move || match ls.get() {
327 QueueData::Idle(v) => {
328 EitherOf4::A(idle(curr,v))
329 },
330 QueueData::Running(r) => {
331 EitherOf4::B(running(curr,r))
332 },
333 QueueData::Finished(failed,done) => EitherOf4::C(finished(curr,failed,done)),
334 QueueData::Empty => EitherOf4::D(view!(<div>"Other"</div>))
335 }
336 }</Show>
337 }
338 }}</Layout>
339 })
340 })
341 })
342 }
343}
344
345#[allow(clippy::too_many_lines)]
346fn repos(queue_id: NonZeroU32, allowed: bool) -> impl IntoView {
347 use flams_web_utils::components::{Collapsible, Header};
348 use thaw::{
349 Caption1Strong, Table, TableBody, TableCell, TableCellLayout, TableHeader, TableHeaderCell,
350 TableRow,
351 };
352 if matches!(LoginState::get(), LoginState::NoAccounts) {
353 return None;
354 }
355 let queues: AllQueues = expect_context();
356 let repos = queues
357 .queue_repos
358 .with_untracked(|v| v.get(&queue_id).cloned())
359 .flatten()?;
360 if repos.is_empty() {
361 return None;
362 }
363 let style = if allowed { "" } else { "color:gray;" };
364 inject_css("flams-repo-table", include_str!("repo-table.css"));
365 Some(
366 view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
367 <Header slot><Caption1Strong>"Archives"</Caption1Strong></Header>
368 <Table class="flams-repo-table">
369 <TableHeader><TableRow>
370 <TableHeaderCell><Caption1Strong>"Archive"</Caption1Strong></TableHeaderCell>
371 <TableHeaderCell><Caption1Strong>"Branch"</Caption1Strong></TableHeaderCell>
372 <TableHeaderCell><Caption1Strong>"Commit"</Caption1Strong></TableHeaderCell>
373 </TableRow></TableHeader>
374 <TableBody>{
375 repos.into_iter().map(|d| match d {
376 RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
377 <TableRow>
378 <TableCell><TableCellLayout><span style=style>{id.to_string()}</span></TableCellLayout></TableCell>
379 <TableCell><TableCellLayout>"(Copied from MathHub)"</TableCellLayout></TableCell>
380 </TableRow>
381 }),
382 RepoInfo::Git{id,branch,commit,remote} => leptos::either::Either::Right({
383 let updates = RwSignal::<Option<Vec<(String,flams_git::Commit)>>>::new(None);
384 let style = move || if allowed {
385 updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
386 "color:green;"
387 } else {
388 "color:yellowgreen;"
389 }))
390 } else {style};
391 let idstr = id.to_string();
392 view!{
393 <TableRow>
394 <TableCell><TableCellLayout><span style=style>{idstr}</span></TableCellLayout></TableCell>
395 <TableCell><TableCellLayout>{branch}</TableCellLayout></TableCell>
396 <TableCell><TableCellLayout>
397 {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
398 {if allowed {Some(move || updates.with(|up| {
399 let Some(up) = up else {
400 let aid = id.clone();
401 let toaster = thaw::ToasterInjection::expect_context();
402 let get_updates = Action::new(move |()| {
403 let id = aid.clone();
404 async move {
405 match get_new_commits(Some(queue_id),id).await {
406 Ok(v) => updates.set(Some(v)),
407 Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
408 }
409 }
410 });
411 return leptos::either::EitherOf3::A(view!{
412 <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
413 });
414 };
415 if up.is_empty() {
416 return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
417 }
418 let updates = up.clone();
419
420 leptos::either::EitherOf3::C({
421 use thaw::{Button,ButtonSize,Combobox,ComboboxOption};
422 let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
423 let branch = RwSignal::new(first.clone());
424 let _ = Effect::new(move || if branch.with(String::is_empty) {
425 branch.set(first.clone());
426 });
427 let update : UpdateQueues = expect_context();
428 let archive = id.clone();
431 let remote = remote.clone();
432 let act = flams_web_utils::components::message_action(
433 move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
434 move |(i,_)| {
435 update.0.set(());
436 format!("{i} jobs queued")
437 }
438 );
439 view!{
440 <span style="color:green">
441 " Updates available: "
442 </span>
443 <div style="margin-left:10px">
444 <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
445 " from branch: "
446 <div style="display:inline-block;"><Combobox selected_options=branch>{
447 updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
448 <ComboboxOption text=vname.clone() value=vname>
449 {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
450 </ComboboxOption>
451 }}).collect_view()
452 }</Combobox></div>
453 </div>
454 }
455 })
456 }))} else {None}}
457 </TableCellLayout></TableCell>
458 </TableRow>
459 }
460 }),
461 }).collect_view()
462 }</TableBody>
463 </Table>
464 </Collapsible></div>},
465 )
466}
467
468fn delete_action(id: NonZeroU32) -> Action<(), ()> {
469 use thaw::ToasterInjection;
470 let update: UpdateQueues = expect_context();
471 let toaster = ToasterInjection::expect_context();
472 Action::new(move |()| async move {
473 match flams_router_buildqueue_base::server_fns::delete(id).await {
474 Ok(()) => update.0.set(()),
475 Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
476 }
477 })
478}
479
480fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> impl IntoView {
481 use thaw::Button;
482 let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
483 flams_router_buildqueue_base::server_fns::run(id)
484 });
485 let del = delete_action(id);
486 view! {
487 <div style="width:100%"><div style="position:fixed;right:20px">
488 <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
489 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
490 </div></div>
491 {repos(id,true)}
492 <ol reversed style="margin-left:30px">
493 <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
494 </ol>
495 }
496}
497
498fn running(id: NonZeroU32, queue: RunningQueue) -> impl IntoView {
499 use flams_web_utils::components::{Anchor, AnchorLink, Header};
500 use thaw::{Button, Layout};
501 let del = delete_action(id);
502 let RunningQueue {
503 running,
504 queue,
505 blocked,
506 failed,
507 done,
508 eta,
509 } = queue;
510 view! {
511 <div style="position:fixed;right:20px;z-index:5"><Anchor>
512 <AnchorLink href="#running"><Header slot>"Running"</Header></AnchorLink>
513 <AnchorLink href="#queued"><Header slot>"Queued"</Header></AnchorLink>
514 <AnchorLink href="#blocked"><Header slot>"Blocked"</Header></AnchorLink>
515 <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
516 <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
517 </Anchor></div>
518 {repos(id,false)}
519 <Layout content_style="text-align:left;">
520 {eta.into_view()}
521 <div style="width:100%"><div style="position:fixed;right:20px">
522 <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
523 </div></div>
524 <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
525 <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
526 <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
527 <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
528 <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
529 <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
530 <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
531 <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
532 <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
533 <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
534 </Layout>
535 }
536}
537
538fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> impl IntoView {
539 use flams_web_utils::components::{Anchor, AnchorLink, Header};
540 use thaw::{Button, Layout};
541 let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
542 let num_failed = failed.len();
543 let num_done = done.len();
544 let del = delete_action(id);
545 view! {
546 <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
547 {if num_failed > 0 {Some(view!(
548 <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
549 ))} else { None }}
550 {migrate_button(id,num_failed)}
551 <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
552 </div></div>
553 <div style="position:fixed;right:20px;z-index:5"><Anchor>
554 <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
555 <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
556 </Anchor></div>
557 {repos(id,true)}
558 <Layout content_style="text-align:left;">
559 <h3 id="failed">"Failed ("{num_failed}")"</h3>
560 <ul style="margin-left:30px">{
561 failed.iter().map(Entry::as_view).collect_view()
562 }</ul>
563 <h3 id="finished">"Finished ("{num_done}")"</h3>
564 <ul style="margin-left:30px">{
565 done.iter().map(Entry::as_view).collect_view()
566 }</ul>
567 </Layout>
568 }
569}
570
571fn migrate_button(id: NonZeroU32, num_failed: usize) -> impl IntoView {
572 use leptos::either::EitherOf3;
573 use thaw::{Button, Caption1Strong, Dialog, DialogBody, DialogContent, DialogSurface, Divider};
574 if matches!(LoginState::get(), LoginState::NoAccounts) {
575 return EitherOf3::A(());
576 }
577 let update: UpdateQueues = expect_context();
578 let migrate = flams_web_utils::components::message_action(
579 move |()| flams_router_buildqueue_base::server_fns::migrate(id),
580 move |i| {
581 update.0.set(());
582 format!("{i} archives migrated")
583 },
584 );
585 if num_failed == 0 {
586 EitherOf3::B(view! {
587 <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
588 })
589 } else {
590 let clicked = RwSignal::new(false);
591 EitherOf3::C(view! {
592 <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
593 <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
594 <Caption1Strong><span style="color:red">WARNING</span></Caption1Strong>
595 <Divider/>
596 <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
597 <div>
598 <div style="width:min-content;margin-left:auto;">
599 <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
600 </div>
601 </div>
602 </DialogContent></DialogBody></DialogSurface></Dialog>
603 })
604 }
605}
606
607#[derive(Clone)]
608pub struct QueueSocket {
609 #[cfg(feature = "ssr")]
610 listener:
611 Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
612 #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
613 socket: leptos::web_sys::WebSocket,
614 #[cfg(feature = "docs-only")]
615 socket: (),
616 #[cfg(feature = "hydrate")]
617 _running: RwSignal<bool>,
618}
619impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
620 const SERVER_ENDPOINT: &'static str = "/ws/queue";
621}
622
623#[cfg(feature = "ssr")]
624#[async_trait::async_trait]
625impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
626 async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
627 match account {
628 LoginState::Admin
629 | LoginState::NoAccounts
630 | LoginState::User { is_admin: true, .. } => {
631 let listener = None; Some(Self {
633 listener,
634 #[cfg(feature = "hydrate")]
635 _running: RwSignal::new(false),
636 #[cfg(feature = "hydrate")]
637 socket: unreachable!(),
638 })
639 }
640 _ => None,
641 }
642 }
643 async fn next(&mut self) -> Option<QueueMessage> {
644 loop {
645 match &mut self.listener {
646 None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
647 Some(l) => return l.read().await.map(Into::into),
648 }
649 }
650 }
651 async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
652 let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
653 .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
654 self.listener = Some(lst);
655 Some(msg.into())
656 }
657 async fn on_start(&mut self, _: &mut ws::AxumWS) {}
658}
659
660#[cfg(feature = "hydrate")]
661impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
662 fn new(ws: leptos::web_sys::WebSocket) -> Self {
663 Self {
664 #[cfg(not(feature = "docs-only"))]
665 socket: ws,
666 #[cfg(feature = "docs-only")]
667 socket: (),
668 _running: RwSignal::new(false),
669 #[cfg(feature = "ssr")]
670 listener: unreachable!(),
671 }
672 }
673 fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
674 #[cfg(not(feature = "docs-only"))]
675 {
676 &mut self.socket
677 }
678 #[cfg(feature = "docs-only")]
679 {
680 unreachable!()
681 }
682 }
683 #[allow(clippy::used_underscore_binding)]
684 fn on_open(&self) -> Option<Box<dyn FnMut()>> {
685 let running = self._running;
686 Some(Box::new(move || {
687 running.set(true);
688 }))
689 }
690}
691
692#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
693impl QueueSocket {
694 fn run(_: AllQueues) {
695 Self::force_start_server();
696 }
697}
698
699#[cfg(feature = "hydrate")]
700impl QueueSocket {
701 fn run(queues: AllQueues) {
702 use ws::WebSocketClient;
703 Self::force_start_client(
704 move |msg| {
705 let current = queues.selected.get_untracked();
707 queues.queues.with_untracked(|queues| {
708 let Some(queue) = queues.get(¤t) else {
709 tracing::error!("Queue not found: {current}");
710 return;
711 };
712 Self::do_msg(*queue, msg);
713 });
714 if !queues.show.get_untracked() {
715 queues.show.set(true);
716 }
717 None
718 },
719 move |mut socket| {
720 #[allow(clippy::used_underscore_binding)]
721 Effect::new(move |_| {
722 if socket._running.get() {
723 let current = queues.selected.get_untracked();
724 socket.send(¤t);
725 }
726 });
727 },
728 );
729 }
730 fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
731 match msg {
732 QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
733 QueueMessage::Started {
734 running,
735 queue: actual_queue,
736 blocked,
737 failed,
738 done,
739 } => queue.set(QueueData::Running(RunningQueue {
740 running: RwSignal::new(running),
741 queue: RwSignal::new(actual_queue),
742 blocked: RwSignal::new(blocked),
743 failed: RwSignal::new(failed),
744 done: RwSignal::new(done),
745 eta: WrappedEta(RwSignal::new(Eta::default())),
746 })),
747 QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
748 QueueMessage::TaskStarted { id, target } => queue.with_untracked(|queue| {
749 if let QueueData::Running(RunningQueue { queue, running, .. }) = queue {
750 queue.update(|v| {
751 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
752 return;
753 };
754 let e = v.remove(i);
755 e.steps.update(|m| m.insert(target, TaskState::Running));
756 running.update(|running| running.push(e));
757 });
758 }
759 }),
760 QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
761 if let QueueData::Running(RunningQueue {
762 queue,
763 running,
764 done,
765 eta: etasignal,
766 ..
767 }) = queue
768 {
769 etasignal.0.set(eta);
770 running.update(|v| {
771 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
772 return;
773 };
774 let e = v.remove(i);
775 e.steps.update(|m| m.insert(target, TaskState::Done));
776 if e.steps.with_untracked(|v| {
777 v.iter()
778 .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
779 }) {
780 queue.update(|v| v.push(e));
781 } else {
782 done.update(|v| v.push(e));
783 }
784 });
785 }
786 }),
787 QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
788 if let QueueData::Running(RunningQueue {
789 running,
790 failed,
791 eta: etasignal,
792 ..
793 }) = queue
794 {
795 etasignal.0.set(eta);
796 running.update(|v| {
797 let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
798 return;
799 };
800 let e = v.remove(i);
801 e.steps.update(|m| m.insert(target, TaskState::Failed));
802 failed.update(|v| v.push(e));
803 });
804 }
805 }),
806 }
807 }
808}
809
810#[derive(Clone, Copy)]
811struct AllQueues {
812 show: RwSignal<bool>,
813 selected: RwSignal<NonZeroU32>,
814 queue_names: RwSignal<VecMap<NonZeroU32, String>>,
815 queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
816 queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
817}
818
819impl AllQueues {
820 fn new(ids: Vec<QueueInfo>) -> Self {
821 let queues = RwSignal::new(
822 ids.iter()
823 .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
824 .collect(),
825 );
826 let selected = ids.first().map_or_else(
827 || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
828 |v| v.id,
829 );
830 let mut queue_names = VecMap::default();
831 let mut queue_repos = VecMap::default();
832 for d in ids {
833 queue_names.insert(d.id, d.name);
834 queue_repos.insert(d.id, d.archives);
835 }
836 Self {
837 show: RwSignal::new(false),
838 selected: RwSignal::new(selected),
839 queues,
840 queue_names: RwSignal::new(queue_names),
841 queue_repos: RwSignal::new(queue_repos),
842 }
843 }
844}
845
846#[derive(Clone)]
847#[allow(dead_code)]
848enum QueueData {
849 Idle(RwSignal<Vec<Entry>>),
850 Running(RunningQueue),
851 Empty,
852 Finished(Vec<Entry>, Vec<Entry>),
853}
854
855#[derive(Clone, Copy)] #[allow(dead_code)]
857struct RunningQueue {
858 running: RwSignal<Vec<Entry>>,
859 queue: RwSignal<Vec<Entry>>,
860 blocked: RwSignal<Vec<Entry>>,
861 failed: RwSignal<Vec<Entry>>,
862 done: RwSignal<Vec<Entry>>,
863 eta: WrappedEta,
864}
865
866#[derive(Clone, Copy)]
867struct WrappedEta(RwSignal<flams_utils::time::Eta>);
868
869#[allow(clippy::cast_precision_loss)]
870impl WrappedEta {
871 fn into_view(self) -> impl IntoView {
872 use thaw::ProgressBar;
873 inject_css(
874 "flams-eta",
875 r"
876.flams-progress-bar {height:10px;}
877 ",
878 );
879 let pctg = Memo::new(move |_| {
880 let eta = self.0.get();
881 ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
882 });
883 let time_left = move || {
884 let eta = self.0.get();
885 if eta.time_left == Delta::default() {
886 "N/A".to_string()
887 } else {
888 eta.time_left.max_seconds().to_string()
889 }
890 };
891 view! {
892 <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
893 {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
894 </div>
895 }
896 }
897}