flams_router_buildqueue_components/
lib.rs

1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4
5#[cfg(any(
6    all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7    not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_router_base::ws;
12use flams_router_base::{LoginState, require_login, ws::WebSocket};
13#[cfg(feature = "hydrate")]
14use flams_router_buildqueue_base::server_fns::get_log;
15use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
16use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
17use flams_utils::vecmap::VecMap;
18use flams_web_utils::components::wait_and_then_fn;
19use ftml_dom::utils::css::inject_css;
20use ftml_ontology::utils::time::{Delta, Eta};
21use ftml_uris::ArchiveId;
22use leptos::{either::EitherOf4, prelude::*};
23use leptos_router::hooks::use_params_map;
24use std::num::NonZeroU32;
25
26#[derive(Copy, Clone)]
27struct UpdateQueues(RwSignal<()>);
28
29#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
30pub struct Entry {
31    id: u32,
32    archive: ArchiveId,
33    rel_path: String,
34    #[cfg(feature = "hydrate")]
35    steps: RwSignal<VecMap<String, TaskState>>,
36    #[cfg(not(feature = "hydrate"))]
37    steps: VecMap<String, TaskState>,
38}
39
40impl Entry {
41    #[cfg(not(feature = "hydrate"))]
42    fn as_view(&self) -> impl IntoView + use<> {
43        view! {
44          <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
45        }
46    }
47
48    #[cfg(feature = "hydrate")]
49    fn as_view(&self) -> AnyView {
50        use flams_router_base::vscode_link;
51        use flams_web_utils::components::{Collapsible, Header};
52
53        let vscode = vscode_link(&self.archive, &self.rel_path);
54
55        let title = format!("[{}]{}", self.archive, self.rel_path);
56        let total = self.steps.with_untracked(|v| v.0.len());
57        let steps = self.steps;
58        let current = move || {
59            steps.with(|v| {
60                v.iter()
61                    .enumerate()
62                    .find_map(|(i, (e, s))| {
63                        if *s == TaskState::Done {
64                            None
65                        } else {
66                            Some((i + 1, e.clone()))
67                        }
68                    })
69                    .unwrap_or_else(|| (total, "Done".to_string()))
70            })
71        };
72        let rel_path = self.rel_path.clone();
73        let archive = self.archive.clone();
74        view! {
75          <li><Collapsible>
76            <Header slot>
77              <b>
78                {title}
79                {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
80                " "{vscode}
81              </b>
82            </Header>
83            <ol>
84            {let rel_path = rel_path.clone();
85              let archive = archive.clone();
86              move || steps.get().iter().map(|(t,e)|
87              view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
88            ).collect_view()}
89            </ol>
90          </Collapsible></li>
91        }.into_any()
92    }
93}
94
95#[cfg(feature = "ssr")]
96impl From<flams_system::building::QueueEntry> for Entry {
97    fn from(e: flams_system::building::QueueEntry) -> Self {
98        #[cfg(feature = "hydrate")]
99        {
100            unreachable!()
101        }
102        #[cfg(not(feature = "hydrate"))]
103        Self {
104            id: e.id.into(),
105            archive: e.archive,
106            rel_path: e.rel_path.to_string(),
107            steps: e
108                .steps
109                .into_iter()
110                .map(|(k, v)| (k.to_string(), v.into()))
111                .collect(),
112        }
113    }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
117pub enum TaskState {
118    Running,
119    Queued,
120    Blocked,
121    Done,
122    Failed,
123    None,
124}
125impl TaskState {
126    #[cfg(feature = "hydrate")]
127    fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> AnyView {
128        use flams_web_utils::components::{Header, LazyCollapsible};
129        use thaw::Scrollbar;
130        match self {
131            Self::Running => view! {<i style="color:yellow">{t}" (Running)"</i>}.into_any(),
132            Self::Queued | Self::Blocked | Self::None => {
133                view! {<span style="color:gray">{t}" (...)"</span>}.into_any()
134            }
135            Self::Done => {
136                let archive = archive.clone();
137                let rel_path = rel_path.to_string();
138                let tc = t.clone();
139                view! {
140                  <LazyCollapsible>
141                    <Header slot><span style="color:green">{t}" (Done)"</span></Header>
142                    {
143                      let archive = archive.clone();
144                      let rel_path = rel_path.clone();
145                      let tc = tc.clone();
146                      let queue = expect_context::<AllQueues>().selected.get_untracked();
147                      require_login(Box::new(move || wait_and_then_fn(
148                          move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
149                          |s| {
150                            view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
151                                <pre style="width:fit-content;font-size:smaller;">{s}</pre>
152                            </Scrollbar>}.into_any()
153                            }
154                      )))
155                    }
156                  </LazyCollapsible>
157                }.into_any()
158            }
159            Self::Failed => {
160                let archive = archive.clone();
161                let rel_path = rel_path.to_string();
162                let tc = t.clone();
163                view! {
164                  <LazyCollapsible>
165                    <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
166                    {
167                      let archive = archive.clone();
168                      let rel_path = rel_path.clone();
169                      let tc = tc.clone();
170                      let queue = expect_context::<AllQueues>().selected.get_untracked();
171                      require_login(Box::new(move || wait_and_then_fn(
172                          move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
173                          |s| {
174                                view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
175                                    <pre style="width:fit-content;font-size:smaller;">{s}</pre>
176                                </Scrollbar>}
177                            }.into_any()
178                      )))
179                    }
180                  </LazyCollapsible>
181                }.into_any()
182            }
183        }
184    }
185}
186
187#[cfg(feature = "ssr")]
188impl From<flams_system::building::TaskState> for TaskState {
189    fn from(e: flams_system::building::TaskState) -> Self {
190        use flams_system::building::TaskState;
191        match e {
192            TaskState::Running => Self::Running,
193            TaskState::Queued => Self::Queued,
194            TaskState::Blocked => Self::Blocked,
195            TaskState::Done => Self::Done,
196            TaskState::Failed => Self::Failed,
197            TaskState::None => Self::None,
198        }
199    }
200}
201
202#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203pub enum QueueMessage {
204    Idle(Vec<Entry>),
205    Started {
206        running: Vec<Entry>,
207        queue: Vec<Entry>,
208        blocked: Vec<Entry>,
209        failed: Vec<Entry>,
210        done: Vec<Entry>,
211    },
212    Finished {
213        failed: Vec<Entry>,
214        done: Vec<Entry>,
215    },
216    TaskStarted {
217        id: u32,
218        target: String,
219    },
220    TaskSuccess {
221        id: u32,
222        target: String,
223        eta: Eta,
224    },
225    TaskFailed {
226        id: u32,
227        target: String,
228        eta: Eta,
229    },
230}
231#[cfg(feature = "ssr")]
232impl From<flams_system::building::QueueMessage> for QueueMessage {
233    fn from(e: flams_system::building::QueueMessage) -> Self {
234        use flams_system::building::QueueMessage;
235        match e {
236            QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
237            QueueMessage::Started {
238                running,
239                queue,
240                blocked,
241                failed,
242                done,
243            } => Self::Started {
244                running: running.into_iter().map(Into::into).collect(),
245                queue: queue.into_iter().map(Into::into).collect(),
246                blocked: blocked.into_iter().map(Into::into).collect(),
247                failed: failed.into_iter().map(Into::into).collect(),
248                done: done.into_iter().map(Into::into).collect(),
249            },
250            QueueMessage::Finished { failed, done } => Self::Finished {
251                failed: failed.into_iter().map(Into::into).collect(),
252                done: done.into_iter().map(Into::into).collect(),
253            },
254            QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
255                id: id.into(),
256                target: target.to_string(),
257            },
258            QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
259                id: id.into(),
260                target: target.to_string(),
261                eta,
262            },
263            QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
264                id: id.into(),
265                target: target.to_string(),
266                eta,
267            },
268        }
269    }
270}
271
272// ----------------------------------------------------------------------------------
273
274#[component]
275pub fn QueuesTop() -> AnyView {
276    use flams_web_utils::components::Spinner;
277    use thaw::{Divider, Layout, Tab, TabList};
278
279    let update = UpdateQueues(RwSignal::new(()));
280    provide_context(update);
281    (move || {
282        let () = update.0.get();
283        let params = use_params_map();
284        let id = move || params.read().get("queue");
285
286        require_login(Box::new(move || {
287            wait_and_then_fn(server_fns::get_queues, move |v| {
288                if v.is_empty() {
289                    return view!(<div>"(No running queues)"</div>).into_any();
290                }
291                let queues = AllQueues::new(v);
292                if let Some(id) = id() {
293                    if let Ok(id) = id.parse() {
294                        queues.selected.update_untracked(|v| *v = id);
295                    }
296                }
297                provide_context(queues);
298                let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
299                let _ = Effect::new(move |_| {
300                    let value = selected_value.get();
301                    let selected = queues.selected.get_untracked();
302                    let value = value.parse().unwrap_or_else(|_| unreachable!());
303                    if selected != value {
304                        queues.selected.set(value);
305                    }
306                });
307                inject_css(
308                    "flams-fullscreen",
309                    ".flams-fullscreen { width:100%; height:calc(100% - 44px - 21px) }",
310                );
311                view! {
312                  <TabList selected_value>
313                    <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
314                      <Tab value=i.to_string()>{
315                        queues.queue_names.get().get(&i).unwrap_or_else(|| unreachable!()).clone()
316                      }</Tab>
317                    }/>
318                  </TabList>
319                  <div style="margin:10px"><Divider/></div>
320                  <Layout class="flams-fullscreen">{move || {
321                    let curr = queues.selected.get();
322                    queues.show.update_untracked(|v| *v = false);
323                    QueueSocket::run(queues);
324                    move || view! {
325                      <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
326                        let ls = *queues.queues.get_untracked().get(&curr).unwrap_or_else(|| unreachable!());
327                        move || match ls.get() {
328                          QueueData::Idle(v) => {
329                              idle(curr,v)
330                          },
331                          QueueData::Running(r) => {
332                              running(curr,r)
333                          },
334                          QueueData::Finished(failed,done) => finished(curr,failed,done),
335                          QueueData::Empty => view!(<div>"Other"</div>).into_any()
336                        }
337                      }</Show>
338                    }
339                  }}</Layout>
340                }.into_any()
341            })
342        }) as _)
343    }).into_any()
344}
345
346#[allow(clippy::too_many_lines)]
347fn repos(queue_id: NonZeroU32, allowed: bool) -> AnyView {
348    use flams_web_utils::components::{Collapsible, Header};
349    use thaw::{
350        Caption1Strong, Table, TableBody, TableCell, TableCellLayout, TableHeader, TableHeaderCell,
351        TableRow,
352    };
353    if matches!(LoginState::get(), LoginState::NoAccounts) {
354        return ().into_any();
355    }
356    let queues: AllQueues = expect_context();
357    let Some(repos) = queues
358        .queue_repos
359        .with_untracked(|v| v.get(&queue_id).cloned())
360        .flatten() else {
361            return ().into_any()
362        };
363    if repos.is_empty() {
364        return ().into_any();
365    }
366    let style = if allowed { "" } else { "color:gray;" };
367    inject_css("flams-repo-table", include_str!("repo-table.css"));
368        view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
369          <Header slot><Caption1Strong>"Archives"</Caption1Strong></Header>
370          <Table class="flams-repo-table">
371            <TableHeader><TableRow>
372              <TableHeaderCell><Caption1Strong>"Archive"</Caption1Strong></TableHeaderCell>
373              <TableHeaderCell><Caption1Strong>"Branch"</Caption1Strong></TableHeaderCell>
374              <TableHeaderCell><Caption1Strong>"Commit"</Caption1Strong></TableHeaderCell>
375            </TableRow></TableHeader>
376            <TableBody>{
377              repos.into_iter().map(|d| match d {
378                RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
379                  <TableRow>
380                    <TableCell><TableCellLayout><span style=style>{id.to_string()}</span></TableCellLayout></TableCell>
381                    <TableCell><TableCellLayout>"(Copied from MathHub)"</TableCellLayout></TableCell>
382                  </TableRow>
383                }),
384                RepoInfo::Git{id,branch,commit,remote/*,updates */} => leptos::either::Either::Right({
385                  let updates = RwSignal::<Option<Vec<(String,flams_backend_types::git::Commit)>>>::new(None);
386                  let style = move || if allowed {
387                    updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
388                      "color:green;"
389                    } else {
390                      "color:yellowgreen;"
391                    }))
392                  } else {style};
393                  let idstr = id.to_string();
394                  view!{
395                    <TableRow>
396                      <TableCell><TableCellLayout><span style=style>{idstr}</span></TableCellLayout></TableCell>
397                      <TableCell><TableCellLayout>{branch}</TableCellLayout></TableCell>
398                      <TableCell><TableCellLayout>
399                        {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
400                        {if allowed {Some(move || updates.with(|up| {
401                          let Some(up) = up else {
402                            let aid = id.clone();
403                            let toaster = thaw::ToasterInjection::expect_context();
404                            let get_updates = Action::new(move |()| {
405                              let id = aid.clone();
406                              async move {
407                                match get_new_commits(Some(queue_id),id).await {
408                                  Ok(v) => updates.set(Some(v)),
409                                  Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
410                                }
411                              }
412                            });
413                            return leptos::either::EitherOf3::A(view!{
414                              <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
415                            });
416                          };
417                          if up.is_empty() {
418                            return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
419                          }
420                          let updates = up.clone();
421
422                          leptos::either::EitherOf3::C({
423                            use thaw::{Button,ButtonSize,Combobox,ComboboxOption};
424                            let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
425                            let branch = RwSignal::new(first.clone());
426                            let _ = Effect::new(move || if branch.with(String::is_empty) {
427                              branch.set(first.clone());
428                            });
429                            let update : UpdateQueues = expect_context();
430                            //let commit_map:VecMap<_,_> = updates.clone().into();
431                            let archive = id.clone();
432                            let remote = remote.clone();
433                            let act = flams_web_utils::components::message_action(
434                              move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
435                              move |(i,_)| {
436                                update.0.set(());
437                                format!("{i} jobs queued")
438                              }
439                            );
440                            view!{
441                              <span style="color:green">
442                                " Updates available: "
443                              </span>
444                              <div style="margin-left:10px">
445                                <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
446                                " from branch: "
447                                <div style="display:inline-block;"><Combobox selected_options=branch>{
448                                  updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
449                                    <ComboboxOption text=vname.clone() value=vname>
450                                      {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
451                                    </ComboboxOption>
452                                  }}).collect_view()
453                                }</Combobox></div>
454                              </div>
455                            }
456                          })
457                        }))} else {None}}
458                      </TableCellLayout></TableCell>
459                    </TableRow>
460                  }
461                }),
462              }).collect_view()
463            }</TableBody>
464          </Table>
465        </Collapsible></div>
466    }.into_any()
467}
468
469fn delete_action(id: NonZeroU32) -> Action<(), ()> {
470    use thaw::ToasterInjection;
471    let update: UpdateQueues = expect_context();
472    let toaster = ToasterInjection::expect_context();
473    Action::new(move |()| async move {
474        match flams_router_buildqueue_base::server_fns::delete(id).await {
475            Ok(()) => update.0.set(()),
476            Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
477        }
478    })
479}
480
481fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> AnyView {
482    use thaw::Button;
483    let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
484        flams_router_buildqueue_base::server_fns::run(id)
485    });
486    let del = delete_action(id);
487    view! {
488      <div style="width:100%"><div style="position:fixed;right:20px">
489          <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
490          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
491      </div></div>
492      {repos(id,true)}
493      <ol reversed style="margin-left:30px">
494        <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
495      </ol>
496    }.into_any()
497}
498
499fn running(id: NonZeroU32, queue: RunningQueue) -> AnyView {
500    use flams_web_utils::components::{Anchor, AnchorLink, Header};
501    use thaw::{Button, Layout};
502    let del = delete_action(id);
503    let RunningQueue {
504        running,
505        queue,
506        blocked,
507        failed,
508        done,
509        eta,
510    } = queue;
511    view! {
512      <div style="position:fixed;right:20px;z-index:5"><Anchor>
513          <AnchorLink href="#running"><Header slot>"Running"</Header></AnchorLink>
514          <AnchorLink href="#queued"><Header slot>"Queued"</Header></AnchorLink>
515          <AnchorLink href="#blocked"><Header slot>"Blocked"</Header></AnchorLink>
516          <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
517          <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
518      </Anchor></div>
519      {repos(id,false)}
520      <Layout content_style="text-align:left;">
521          {eta.into_view()}
522          <div style="width:100%"><div style="position:fixed;right:20px">
523              <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
524          </div></div>
525          <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
526          <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
527          <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
528          <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
529          <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
530          <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
531          <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
532          <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
533          <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
534          <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
535      </Layout>
536    }.into_any()
537}
538
539fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> AnyView {
540    use flams_web_utils::components::{Anchor, AnchorLink, Header};
541    use thaw::{Button, Layout};
542    let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
543    let num_failed = failed.len();
544    let num_done = done.len();
545    let del = delete_action(id);
546    view! {
547      <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
548          {if num_failed > 0 {Some(view!(
549            <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
550          ))} else { None }}
551          {migrate_button(id,num_failed)}
552          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
553      </div></div>
554      <div style="position:fixed;right:20px;z-index:5"><Anchor>
555          <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
556          <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
557      </Anchor></div>
558      {repos(id,true)}
559      <Layout content_style="text-align:left;">
560          <h3 id="failed">"Failed ("{num_failed}")"</h3>
561          <ul style="margin-left:30px">{
562            failed.iter().map(Entry::as_view).collect_view()
563          }</ul>
564          <h3 id="finished">"Finished ("{num_done}")"</h3>
565          <ul style="margin-left:30px">{
566            done.iter().map(Entry::as_view).collect_view()
567          }</ul>
568      </Layout>
569    }.into_any()
570}
571
572fn migrate_button(id: NonZeroU32, num_failed: usize) -> AnyView {
573    use leptos::either::EitherOf3;
574    use thaw::{Button, Caption1Strong, Dialog, DialogBody, DialogContent, DialogSurface, Divider};
575    if matches!(LoginState::get(), LoginState::NoAccounts) {
576        return ().into_any();
577    }
578    let update: UpdateQueues = expect_context();
579    let migrate = flams_web_utils::components::message_action(
580        move |()| flams_router_buildqueue_base::server_fns::migrate(id),
581        move |i| {
582            update.0.set(());
583            format!("{i} archives migrated")
584        },
585    );
586    if num_failed == 0 {
587        view! {
588          <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
589        }.into_any()
590    } else {
591        let clicked = RwSignal::new(false);
592        view! {
593          <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
594          <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
595            <Caption1Strong><span style="color:red">WARNING</span></Caption1Strong>
596            <Divider/>
597            <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
598            <div>
599              <div style="width:min-content;margin-left:auto;">
600                <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
601              </div>
602            </div>
603          </DialogContent></DialogBody></DialogSurface></Dialog>
604        }.into_any()
605    }
606}
607
608#[derive(Clone)]
609pub struct QueueSocket {
610    #[cfg(feature = "ssr")]
611    listener:
612        Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
613    #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
614    socket: leptos::web_sys::WebSocket,
615    #[cfg(feature = "docs-only")]
616    socket: (),
617    #[cfg(feature = "hydrate")]
618    _running: RwSignal<bool>,
619}
620impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
621    const SERVER_ENDPOINT: &'static str = "/ws/queue";
622}
623
624#[cfg(feature = "ssr")]
625#[async_trait::async_trait]
626impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
627    async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
628        match account {
629            LoginState::Admin
630            | LoginState::NoAccounts
631            | LoginState::User { is_admin: true, .. } => {
632                let listener = None; //flams_system::logger().listener();
633                Some(Self {
634                    listener,
635                    #[cfg(feature = "hydrate")]
636                    _running: RwSignal::new(false),
637                    #[cfg(feature = "hydrate")]
638                    socket: unreachable!(),
639                })
640            }
641            _ => None,
642        }
643    }
644    async fn next(&mut self) -> Option<QueueMessage> {
645        loop {
646            match &mut self.listener {
647                None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
648                Some(l) => return l.read().await.map(Into::into),
649            }
650        }
651    }
652    async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
653        let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
654            .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
655        self.listener = Some(lst);
656        Some(msg.into())
657    }
658    async fn on_start(&mut self, _: &mut ws::AxumWS) {}
659}
660
661#[cfg(feature = "hydrate")]
662impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
663    fn new(ws: leptos::web_sys::WebSocket) -> Self {
664        Self {
665            #[cfg(not(feature = "docs-only"))]
666            socket: ws,
667            #[cfg(feature = "docs-only")]
668            socket: (),
669            _running: RwSignal::new(false),
670            #[cfg(feature = "ssr")]
671            listener: unreachable!(),
672        }
673    }
674    fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
675        #[cfg(not(feature = "docs-only"))]
676        {
677            &mut self.socket
678        }
679        #[cfg(feature = "docs-only")]
680        {
681            unreachable!()
682        }
683    }
684    #[allow(clippy::used_underscore_binding)]
685    fn on_open(&self) -> Option<Box<dyn FnMut()>> {
686        let running = self._running;
687        Some(Box::new(move || {
688            running.set(true);
689        }))
690    }
691}
692
693#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
694impl QueueSocket {
695    fn run(_: AllQueues) {
696        Self::force_start_server();
697    }
698}
699
700#[cfg(feature = "hydrate")]
701impl QueueSocket {
702    fn run(queues: AllQueues) {
703        use ws::WebSocketClient;
704        Self::force_start_client(
705            move |msg| {
706                //tracing::warn!("Starting!");
707                let current = queues.selected.get_untracked();
708                queues.queues.with_untracked(|queues| {
709                    let Some(queue) = queues.get(&current) else {
710                        tracing::error!("Queue not found: {current}");
711                        return;
712                    };
713                    Self::do_msg(*queue, msg);
714                });
715                if !queues.show.get_untracked() {
716                    queues.show.set(true);
717                }
718                None
719            },
720            move |mut socket| {
721                #[allow(clippy::used_underscore_binding)]
722                Effect::new(move |_| {
723                    if socket._running.get() {
724                        let current = queues.selected.get_untracked();
725                        socket.send(&current);
726                    }
727                });
728            },
729        );
730    }
731    fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
732        match msg {
733            QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
734            QueueMessage::Started {
735                running,
736                queue: actual_queue,
737                blocked,
738                failed,
739                done,
740            } => queue.set(QueueData::Running(RunningQueue {
741                running: RwSignal::new(running),
742                queue: RwSignal::new(actual_queue),
743                blocked: RwSignal::new(blocked),
744                failed: RwSignal::new(failed),
745                done: RwSignal::new(done),
746                eta: WrappedEta(RwSignal::new(Eta::default())),
747            })),
748            QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
749            QueueMessage::TaskStarted { id, target } => queue.with_untracked(|queue| {
750                if let QueueData::Running(RunningQueue { queue, running, .. }) = queue {
751                    queue.update(|v| {
752                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
753                            return;
754                        };
755                        let e = v.remove(i);
756                        e.steps.update(|m| m.insert(target, TaskState::Running));
757                        running.update(|running| running.push(e));
758                    });
759                }
760            }),
761            QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
762                if let QueueData::Running(RunningQueue {
763                    queue,
764                    running,
765                    done,
766                    eta: etasignal,
767                    ..
768                }) = queue
769                {
770                    etasignal.0.set(eta);
771                    running.update(|v| {
772                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
773                            return;
774                        };
775                        let e = v.remove(i);
776                        e.steps.update(|m| m.insert(target, TaskState::Done));
777                        if e.steps.with_untracked(|v| {
778                            v.iter()
779                                .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
780                        }) {
781                            queue.update(|v| v.push(e));
782                        } else {
783                            done.update(|v| v.push(e));
784                        }
785                    });
786                }
787            }),
788            QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
789                if let QueueData::Running(RunningQueue {
790                    running,
791                    failed,
792                    eta: etasignal,
793                    ..
794                }) = queue
795                {
796                    etasignal.0.set(eta);
797                    running.update(|v| {
798                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
799                            return;
800                        };
801                        let e = v.remove(i);
802                        e.steps.update(|m| m.insert(target, TaskState::Failed));
803                        failed.update(|v| v.push(e));
804                    });
805                }
806            }),
807        }
808    }
809}
810
811#[derive(Clone, Copy)]
812struct AllQueues {
813    show: RwSignal<bool>,
814    selected: RwSignal<NonZeroU32>,
815    queue_names: RwSignal<VecMap<NonZeroU32, String>>,
816    queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
817    queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
818}
819
820impl AllQueues {
821    fn new(ids: Vec<QueueInfo>) -> Self {
822        let queues = RwSignal::new(
823            ids.iter()
824                .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
825                .collect(),
826        );
827        let selected = ids.first().map_or_else(
828            || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
829            |v| v.id,
830        );
831        let mut queue_names = VecMap::default();
832        let mut queue_repos = VecMap::default();
833        for d in ids {
834            queue_names.insert(d.id, d.name);
835            queue_repos.insert(d.id, d.archives);
836        }
837        Self {
838            show: RwSignal::new(false),
839            selected: RwSignal::new(selected),
840            queues,
841            queue_names: RwSignal::new(queue_names),
842            queue_repos: RwSignal::new(queue_repos),
843        }
844    }
845}
846
847#[derive(Clone)]
848#[allow(dead_code)]
849enum QueueData {
850    Idle(RwSignal<Vec<Entry>>),
851    Running(RunningQueue),
852    Empty,
853    Finished(Vec<Entry>, Vec<Entry>),
854}
855
856#[derive(Clone, Copy)] //,serde::Serialize,serde::Deserialize)]
857#[allow(dead_code)]
858struct RunningQueue {
859    running: RwSignal<Vec<Entry>>,
860    queue: RwSignal<Vec<Entry>>,
861    blocked: RwSignal<Vec<Entry>>,
862    failed: RwSignal<Vec<Entry>>,
863    done: RwSignal<Vec<Entry>>,
864    eta: WrappedEta,
865}
866
867#[derive(Clone, Copy)]
868struct WrappedEta(RwSignal<ftml_ontology::utils::time::Eta>);
869
870#[allow(clippy::cast_precision_loss)]
871impl WrappedEta {
872    fn into_view(self) -> impl IntoView {
873        use thaw::ProgressBar;
874        inject_css(
875            "flams-eta",
876            r"
877.flams-progress-bar {height:10px;}
878    ",
879        );
880        let pctg = Memo::new(move |_| {
881            let eta = self.0.get();
882            ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
883        });
884        let time_left = move || {
885            let eta = self.0.get();
886            if eta.time_left == Delta::default() {
887                "N/A".to_string()
888            } else {
889                eta.time_left.max_seconds().to_string()
890            }
891        };
892        view! {
893          <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
894            {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
895          </div>
896        }
897    }
898}