flams_router_buildqueue_components/
lib.rs

1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_auto_cfg))]
4
5#[cfg(any(
6    all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7    not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_ontology::uris::ArchiveId;
12use flams_router_base::ws;
13use flams_router_base::{LoginState, require_login, ws::WebSocket};
14#[cfg(feature = "hydrate")]
15use flams_router_buildqueue_base::server_fns::get_log;
16use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
17use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
18use flams_utils::time::{Delta, Eta};
19use flams_utils::vecmap::VecMap;
20use flams_web_utils::{components::wait_and_then_fn, inject_css};
21use leptos::{either::EitherOf4, prelude::*};
22use leptos_router::hooks::use_params_map;
23use std::num::NonZeroU32;
24
25#[derive(Copy, Clone)]
26struct UpdateQueues(RwSignal<()>);
27
28#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
29pub struct Entry {
30    id: u32,
31    archive: ArchiveId,
32    rel_path: String,
33    #[cfg(feature = "hydrate")]
34    steps: RwSignal<VecMap<String, TaskState>>,
35    #[cfg(not(feature = "hydrate"))]
36    steps: VecMap<String, TaskState>,
37}
38
39impl Entry {
40    #[cfg(not(feature = "hydrate"))]
41    fn as_view(&self) -> impl IntoView + use<> {
42        view! {
43          <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
44        }
45    }
46
47    #[cfg(feature = "hydrate")]
48    fn as_view(&self) -> impl IntoView + use<> {
49        use flams_router_base::vscode_link;
50        use flams_web_utils::components::{Collapsible, Header};
51
52        let vscode = vscode_link(&self.archive, &self.rel_path);
53
54        let title = format!("[{}]{}", self.archive, self.rel_path);
55        let total = self.steps.with_untracked(|v| v.0.len());
56        let steps = self.steps;
57        let current = move || {
58            steps.with(|v| {
59                v.iter()
60                    .enumerate()
61                    .find_map(|(i, (e, s))| {
62                        if *s == TaskState::Done {
63                            None
64                        } else {
65                            Some((i + 1, e.clone()))
66                        }
67                    })
68                    .unwrap_or_else(|| (total, "Done".to_string()))
69            })
70        };
71        let rel_path = self.rel_path.clone();
72        let archive = self.archive.clone();
73        view! {
74          <li><Collapsible>
75            <Header slot>
76              <b>
77                {title}
78                {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
79                " "{vscode}
80              </b>
81            </Header>
82            <ol>
83            {let rel_path = rel_path.clone();
84              let archive = archive.clone();
85              move || steps.get().iter().map(|(t,e)|
86              view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
87            ).collect_view()}
88            </ol>
89          </Collapsible></li>
90        }
91    }
92}
93
94#[cfg(feature = "ssr")]
95impl From<flams_system::building::QueueEntry> for Entry {
96    fn from(e: flams_system::building::QueueEntry) -> Self {
97        #[cfg(feature = "hydrate")]
98        {
99            unreachable!()
100        }
101        #[cfg(not(feature = "hydrate"))]
102        Self {
103            id: e.id.into(),
104            archive: e.archive,
105            rel_path: e.rel_path.to_string(),
106            steps: e
107                .steps
108                .into_iter()
109                .map(|(k, v)| (k.to_string(), v.into()))
110                .collect(),
111        }
112    }
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
116pub enum TaskState {
117    Running,
118    Queued,
119    Blocked,
120    Done,
121    Failed,
122    None,
123}
124impl TaskState {
125    #[cfg(feature = "hydrate")]
126    fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> impl IntoView + use<> {
127        use flams_web_utils::components::{Header, LazyCollapsible};
128        use thaw::Scrollbar;
129        match self {
130            Self::Running => EitherOf4::A(view! {<i style="color:yellow">{t}" (Running)"</i>}),
131            Self::Queued | Self::Blocked | Self::None => {
132                EitherOf4::B(view! {<span style="color:gray">{t}" (...)"</span>})
133            }
134            Self::Done => {
135                let archive = archive.clone();
136                let rel_path = rel_path.to_string();
137                let tc = t.clone();
138                EitherOf4::C(view! {
139                  <LazyCollapsible>
140                    <Header slot><span style="color:green">{t}" (Done)"</span></Header>
141                    {
142                      let archive = archive.clone();
143                      let rel_path = rel_path.clone();
144                      let tc = tc.clone();
145                      let queue = expect_context::<AllQueues>().selected.get_untracked();
146                      require_login(move || wait_and_then_fn(
147                          move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
148                          |s| {
149                            view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
150                                <pre style="width:fit-content;font-size:smaller;">{s}</pre>
151                            </Scrollbar>}
152                            }
153                      ))
154                    }
155                  </LazyCollapsible>
156                })
157            }
158            Self::Failed => {
159                let archive = archive.clone();
160                let rel_path = rel_path.to_string();
161                let tc = t.clone();
162                EitherOf4::D(view! {
163                  <LazyCollapsible>
164                    <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
165                    {
166                      let archive = archive.clone();
167                      let rel_path = rel_path.clone();
168                      let tc = tc.clone();
169                      let queue = expect_context::<AllQueues>().selected.get_untracked();
170                      require_login(move || wait_and_then_fn(
171                          move || get_log(queue,archive.clone(),rel_path.to_string(),tc.clone()),
172                          |s| {
173                                view!{<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">
174                                    <pre style="width:fit-content;font-size:smaller;">{s}</pre>
175                                </Scrollbar>}
176                            }
177                      ))
178                    }
179                  </LazyCollapsible>
180                })
181            }
182        }
183    }
184}
185
186#[cfg(feature = "ssr")]
187impl From<flams_system::building::TaskState> for TaskState {
188    fn from(e: flams_system::building::TaskState) -> Self {
189        use flams_system::building::TaskState;
190        match e {
191            TaskState::Running => Self::Running,
192            TaskState::Queued => Self::Queued,
193            TaskState::Blocked => Self::Blocked,
194            TaskState::Done => Self::Done,
195            TaskState::Failed => Self::Failed,
196            TaskState::None => Self::None,
197        }
198    }
199}
200
201#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
202pub enum QueueMessage {
203    Idle(Vec<Entry>),
204    Started {
205        running: Vec<Entry>,
206        queue: Vec<Entry>,
207        blocked: Vec<Entry>,
208        failed: Vec<Entry>,
209        done: Vec<Entry>,
210    },
211    Finished {
212        failed: Vec<Entry>,
213        done: Vec<Entry>,
214    },
215    TaskStarted {
216        id: u32,
217        target: String,
218    },
219    TaskSuccess {
220        id: u32,
221        target: String,
222        eta: Eta,
223    },
224    TaskFailed {
225        id: u32,
226        target: String,
227        eta: Eta,
228    },
229}
230#[cfg(feature = "ssr")]
231impl From<flams_system::building::QueueMessage> for QueueMessage {
232    fn from(e: flams_system::building::QueueMessage) -> Self {
233        use flams_system::building::QueueMessage;
234        match e {
235            QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
236            QueueMessage::Started {
237                running,
238                queue,
239                blocked,
240                failed,
241                done,
242            } => Self::Started {
243                running: running.into_iter().map(Into::into).collect(),
244                queue: queue.into_iter().map(Into::into).collect(),
245                blocked: blocked.into_iter().map(Into::into).collect(),
246                failed: failed.into_iter().map(Into::into).collect(),
247                done: done.into_iter().map(Into::into).collect(),
248            },
249            QueueMessage::Finished { failed, done } => Self::Finished {
250                failed: failed.into_iter().map(Into::into).collect(),
251                done: done.into_iter().map(Into::into).collect(),
252            },
253            QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
254                id: id.into(),
255                target: target.to_string(),
256            },
257            QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
258                id: id.into(),
259                target: target.to_string(),
260                eta,
261            },
262            QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
263                id: id.into(),
264                target: target.to_string(),
265                eta,
266            },
267        }
268    }
269}
270
271// ----------------------------------------------------------------------------------
272
273#[component]
274pub fn QueuesTop() -> impl IntoView {
275    use flams_web_utils::components::Spinner;
276    use thaw::{Divider, Layout, Tab, TabList};
277
278    let update = UpdateQueues(RwSignal::new(()));
279    provide_context(update);
280    move || {
281        let () = update.0.get();
282        let params = use_params_map();
283        let id = move || params.read().get("queue");
284
285        require_login(move || {
286            wait_and_then_fn(server_fns::get_queues, move |v| {
287                if v.is_empty() {
288                    return leptos::either::Either::Left(view!(<div>"(No running queues)"</div>));
289                }
290                let queues = AllQueues::new(v);
291                if let Some(id) = id() {
292                    if let Ok(id) = id.parse() {
293                        queues.selected.update_untracked(|v| *v = id);
294                    }
295                }
296                provide_context(queues);
297                let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
298                let _ = Effect::new(move |_| {
299                    let value = selected_value.get();
300                    let selected = queues.selected.get_untracked();
301                    let value = value.parse().unwrap_or_else(|_| unreachable!());
302                    if selected != value {
303                        queues.selected.set(value);
304                    }
305                });
306                inject_css(
307                    "flams-fullscreen",
308                    ".flams-fullscreen { width:100%; height:calc(100% - 44px - 21px) }",
309                );
310                leptos::either::Either::Right(view! {
311                  <TabList selected_value>
312                    <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
313                      <Tab value=i.to_string()>{
314                        queues.queue_names.get().get(&i).unwrap_or_else(|| unreachable!()).clone()
315                      }</Tab>
316                    }/>
317                  </TabList>
318                  <div style="margin:10px"><Divider/></div>
319                  <Layout class="flams-fullscreen">{move || {
320                    let curr = queues.selected.get();
321                    queues.show.update_untracked(|v| *v = false);
322                    QueueSocket::run(queues);
323                    move || view! {
324                      <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
325                        let ls = *queues.queues.get_untracked().get(&curr).unwrap_or_else(|| unreachable!());
326                        move || match ls.get() {
327                          QueueData::Idle(v) => {
328                              EitherOf4::A(idle(curr,v))
329                          },
330                          QueueData::Running(r) => {
331                              EitherOf4::B(running(curr,r))
332                          },
333                          QueueData::Finished(failed,done) => EitherOf4::C(finished(curr,failed,done)),
334                          QueueData::Empty => EitherOf4::D(view!(<div>"Other"</div>))
335                        }
336                      }</Show>
337                    }
338                  }}</Layout>
339                })
340            })
341        })
342    }
343}
344
345#[allow(clippy::too_many_lines)]
346fn repos(queue_id: NonZeroU32, allowed: bool) -> impl IntoView {
347    use flams_web_utils::components::{Collapsible, Header};
348    use thaw::{
349        Caption1Strong, Table, TableBody, TableCell, TableCellLayout, TableHeader, TableHeaderCell,
350        TableRow,
351    };
352    if matches!(LoginState::get(), LoginState::NoAccounts) {
353        return None;
354    }
355    let queues: AllQueues = expect_context();
356    let repos = queues
357        .queue_repos
358        .with_untracked(|v| v.get(&queue_id).cloned())
359        .flatten()?;
360    if repos.is_empty() {
361        return None;
362    }
363    let style = if allowed { "" } else { "color:gray;" };
364    inject_css("flams-repo-table", include_str!("repo-table.css"));
365    Some(
366        view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
367          <Header slot><Caption1Strong>"Archives"</Caption1Strong></Header>
368          <Table class="flams-repo-table">
369            <TableHeader><TableRow>
370              <TableHeaderCell><Caption1Strong>"Archive"</Caption1Strong></TableHeaderCell>
371              <TableHeaderCell><Caption1Strong>"Branch"</Caption1Strong></TableHeaderCell>
372              <TableHeaderCell><Caption1Strong>"Commit"</Caption1Strong></TableHeaderCell>
373            </TableRow></TableHeader>
374            <TableBody>{
375              repos.into_iter().map(|d| match d {
376                RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
377                  <TableRow>
378                    <TableCell><TableCellLayout><span style=style>{id.to_string()}</span></TableCellLayout></TableCell>
379                    <TableCell><TableCellLayout>"(Copied from MathHub)"</TableCellLayout></TableCell>
380                  </TableRow>
381                }),
382                RepoInfo::Git{id,branch,commit,remote/*,updates */} => leptos::either::Either::Right({
383                  let updates = RwSignal::<Option<Vec<(String,flams_git::Commit)>>>::new(None);
384                  let style = move || if allowed {
385                    updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
386                      "color:green;"
387                    } else {
388                      "color:yellowgreen;"
389                    }))
390                  } else {style};
391                  let idstr = id.to_string();
392                  view!{
393                    <TableRow>
394                      <TableCell><TableCellLayout><span style=style>{idstr}</span></TableCellLayout></TableCell>
395                      <TableCell><TableCellLayout>{branch}</TableCellLayout></TableCell>
396                      <TableCell><TableCellLayout>
397                        {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
398                        {if allowed {Some(move || updates.with(|up| {
399                          let Some(up) = up else {
400                            let aid = id.clone();
401                            let toaster = thaw::ToasterInjection::expect_context();
402                            let get_updates = Action::new(move |()| {
403                              let id = aid.clone();
404                              async move {
405                                match get_new_commits(Some(queue_id),id).await {
406                                  Ok(v) => updates.set(Some(v)),
407                                  Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
408                                }
409                              }
410                            });
411                            return leptos::either::EitherOf3::A(view!{
412                              <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
413                            });
414                          };
415                          if up.is_empty() {
416                            return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
417                          }
418                          let updates = up.clone();
419
420                          leptos::either::EitherOf3::C({
421                            use thaw::{Button,ButtonSize,Combobox,ComboboxOption};
422                            let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
423                            let branch = RwSignal::new(first.clone());
424                            let _ = Effect::new(move || if branch.with(String::is_empty) {
425                              branch.set(first.clone());
426                            });
427                            let update : UpdateQueues = expect_context();
428                            //let toaster = ToasterInjection::expect_context();
429                            //let commit_map:VecMap<_,_> = updates.clone().into();
430                            let archive = id.clone();
431                            let remote = remote.clone();
432                            let act = flams_web_utils::components::message_action(
433                              move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
434                              move |(i,_)| {
435                                update.0.set(());
436                                format!("{i} jobs queued")
437                              }
438                            );
439                            view!{
440                              <span style="color:green">
441                                " Updates available: "
442                              </span>
443                              <div style="margin-left:10px">
444                                <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
445                                " from branch: "
446                                <div style="display:inline-block;"><Combobox selected_options=branch>{
447                                  updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
448                                    <ComboboxOption text=vname.clone() value=vname>
449                                      {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
450                                    </ComboboxOption>
451                                  }}).collect_view()
452                                }</Combobox></div>
453                              </div>
454                            }
455                          })
456                        }))} else {None}}
457                      </TableCellLayout></TableCell>
458                    </TableRow>
459                  }
460                }),
461              }).collect_view()
462            }</TableBody>
463          </Table>
464        </Collapsible></div>},
465    )
466}
467
468fn delete_action(id: NonZeroU32) -> Action<(), ()> {
469    use thaw::ToasterInjection;
470    let update: UpdateQueues = expect_context();
471    let toaster = ToasterInjection::expect_context();
472    Action::new(move |()| async move {
473        match flams_router_buildqueue_base::server_fns::delete(id).await {
474            Ok(()) => update.0.set(()),
475            Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
476        }
477    })
478}
479
480fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> impl IntoView {
481    use thaw::Button;
482    let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
483        flams_router_buildqueue_base::server_fns::run(id)
484    });
485    let del = delete_action(id);
486    view! {
487      <div style="width:100%"><div style="position:fixed;right:20px">
488          <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
489          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
490      </div></div>
491      {repos(id,true)}
492      <ol reversed style="margin-left:30px">
493        <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
494      </ol>
495    }
496}
497
498fn running(id: NonZeroU32, queue: RunningQueue) -> impl IntoView {
499    use flams_web_utils::components::{Anchor, AnchorLink, Header};
500    use thaw::{Button, Layout};
501    let del = delete_action(id);
502    let RunningQueue {
503        running,
504        queue,
505        blocked,
506        failed,
507        done,
508        eta,
509    } = queue;
510    view! {
511      <div style="position:fixed;right:20px;z-index:5"><Anchor>
512          <AnchorLink href="#running"><Header slot>"Running"</Header></AnchorLink>
513          <AnchorLink href="#queued"><Header slot>"Queued"</Header></AnchorLink>
514          <AnchorLink href="#blocked"><Header slot>"Blocked"</Header></AnchorLink>
515          <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
516          <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
517      </Anchor></div>
518      {repos(id,false)}
519      <Layout content_style="text-align:left;">
520          {eta.into_view()}
521          <div style="width:100%"><div style="position:fixed;right:20px">
522              <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
523          </div></div>
524          <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
525          <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
526          <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
527          <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
528          <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
529          <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
530          <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
531          <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
532          <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
533          <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
534      </Layout>
535    }
536}
537
538fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> impl IntoView {
539    use flams_web_utils::components::{Anchor, AnchorLink, Header};
540    use thaw::{Button, Layout};
541    let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
542    let num_failed = failed.len();
543    let num_done = done.len();
544    let del = delete_action(id);
545    view! {
546      <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
547          {if num_failed > 0 {Some(view!(
548            <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
549          ))} else { None }}
550          {migrate_button(id,num_failed)}
551          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
552      </div></div>
553      <div style="position:fixed;right:20px;z-index:5"><Anchor>
554          <AnchorLink href="#failed"><Header slot>"Failed"</Header></AnchorLink>
555          <AnchorLink href="#finished"><Header slot>"Finished"</Header></AnchorLink>
556      </Anchor></div>
557      {repos(id,true)}
558      <Layout content_style="text-align:left;">
559          <h3 id="failed">"Failed ("{num_failed}")"</h3>
560          <ul style="margin-left:30px">{
561            failed.iter().map(Entry::as_view).collect_view()
562          }</ul>
563          <h3 id="finished">"Finished ("{num_done}")"</h3>
564          <ul style="margin-left:30px">{
565            done.iter().map(Entry::as_view).collect_view()
566          }</ul>
567      </Layout>
568    }
569}
570
571fn migrate_button(id: NonZeroU32, num_failed: usize) -> impl IntoView {
572    use leptos::either::EitherOf3;
573    use thaw::{Button, Caption1Strong, Dialog, DialogBody, DialogContent, DialogSurface, Divider};
574    if matches!(LoginState::get(), LoginState::NoAccounts) {
575        return EitherOf3::A(());
576    }
577    let update: UpdateQueues = expect_context();
578    let migrate = flams_web_utils::components::message_action(
579        move |()| flams_router_buildqueue_base::server_fns::migrate(id),
580        move |i| {
581            update.0.set(());
582            format!("{i} archives migrated")
583        },
584    );
585    if num_failed == 0 {
586        EitherOf3::B(view! {
587          <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
588        })
589    } else {
590        let clicked = RwSignal::new(false);
591        EitherOf3::C(view! {
592          <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
593          <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
594            <Caption1Strong><span style="color:red">WARNING</span></Caption1Strong>
595            <Divider/>
596            <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
597            <div>
598              <div style="width:min-content;margin-left:auto;">
599                <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
600              </div>
601            </div>
602          </DialogContent></DialogBody></DialogSurface></Dialog>
603        })
604    }
605}
606
607#[derive(Clone)]
608pub struct QueueSocket {
609    #[cfg(feature = "ssr")]
610    listener:
611        Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
612    #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
613    socket: leptos::web_sys::WebSocket,
614    #[cfg(feature = "docs-only")]
615    socket: (),
616    #[cfg(feature = "hydrate")]
617    _running: RwSignal<bool>,
618}
619impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
620    const SERVER_ENDPOINT: &'static str = "/ws/queue";
621}
622
623#[cfg(feature = "ssr")]
624#[async_trait::async_trait]
625impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
626    async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
627        match account {
628            LoginState::Admin
629            | LoginState::NoAccounts
630            | LoginState::User { is_admin: true, .. } => {
631                let listener = None; //flams_system::logger().listener();
632                Some(Self {
633                    listener,
634                    #[cfg(feature = "hydrate")]
635                    _running: RwSignal::new(false),
636                    #[cfg(feature = "hydrate")]
637                    socket: unreachable!(),
638                })
639            }
640            _ => None,
641        }
642    }
643    async fn next(&mut self) -> Option<QueueMessage> {
644        loop {
645            match &mut self.listener {
646                None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
647                Some(l) => return l.read().await.map(Into::into),
648            }
649        }
650    }
651    async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
652        let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
653            .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
654        self.listener = Some(lst);
655        Some(msg.into())
656    }
657    async fn on_start(&mut self, _: &mut ws::AxumWS) {}
658}
659
660#[cfg(feature = "hydrate")]
661impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
662    fn new(ws: leptos::web_sys::WebSocket) -> Self {
663        Self {
664            #[cfg(not(feature = "docs-only"))]
665            socket: ws,
666            #[cfg(feature = "docs-only")]
667            socket: (),
668            _running: RwSignal::new(false),
669            #[cfg(feature = "ssr")]
670            listener: unreachable!(),
671        }
672    }
673    fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
674        #[cfg(not(feature = "docs-only"))]
675        {
676            &mut self.socket
677        }
678        #[cfg(feature = "docs-only")]
679        {
680            unreachable!()
681        }
682    }
683    #[allow(clippy::used_underscore_binding)]
684    fn on_open(&self) -> Option<Box<dyn FnMut()>> {
685        let running = self._running;
686        Some(Box::new(move || {
687            running.set(true);
688        }))
689    }
690}
691
692#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
693impl QueueSocket {
694    fn run(_: AllQueues) {
695        Self::force_start_server();
696    }
697}
698
699#[cfg(feature = "hydrate")]
700impl QueueSocket {
701    fn run(queues: AllQueues) {
702        use ws::WebSocketClient;
703        Self::force_start_client(
704            move |msg| {
705                //tracing::warn!("Starting!");
706                let current = queues.selected.get_untracked();
707                queues.queues.with_untracked(|queues| {
708                    let Some(queue) = queues.get(&current) else {
709                        tracing::error!("Queue not found: {current}");
710                        return;
711                    };
712                    Self::do_msg(*queue, msg);
713                });
714                if !queues.show.get_untracked() {
715                    queues.show.set(true);
716                }
717                None
718            },
719            move |mut socket| {
720                #[allow(clippy::used_underscore_binding)]
721                Effect::new(move |_| {
722                    if socket._running.get() {
723                        let current = queues.selected.get_untracked();
724                        socket.send(&current);
725                    }
726                });
727            },
728        );
729    }
730    fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
731        match msg {
732            QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
733            QueueMessage::Started {
734                running,
735                queue: actual_queue,
736                blocked,
737                failed,
738                done,
739            } => queue.set(QueueData::Running(RunningQueue {
740                running: RwSignal::new(running),
741                queue: RwSignal::new(actual_queue),
742                blocked: RwSignal::new(blocked),
743                failed: RwSignal::new(failed),
744                done: RwSignal::new(done),
745                eta: WrappedEta(RwSignal::new(Eta::default())),
746            })),
747            QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
748            QueueMessage::TaskStarted { id, target } => queue.with_untracked(|queue| {
749                if let QueueData::Running(RunningQueue { queue, running, .. }) = queue {
750                    queue.update(|v| {
751                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
752                            return;
753                        };
754                        let e = v.remove(i);
755                        e.steps.update(|m| m.insert(target, TaskState::Running));
756                        running.update(|running| running.push(e));
757                    });
758                }
759            }),
760            QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
761                if let QueueData::Running(RunningQueue {
762                    queue,
763                    running,
764                    done,
765                    eta: etasignal,
766                    ..
767                }) = queue
768                {
769                    etasignal.0.set(eta);
770                    running.update(|v| {
771                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
772                            return;
773                        };
774                        let e = v.remove(i);
775                        e.steps.update(|m| m.insert(target, TaskState::Done));
776                        if e.steps.with_untracked(|v| {
777                            v.iter()
778                                .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
779                        }) {
780                            queue.update(|v| v.push(e));
781                        } else {
782                            done.update(|v| v.push(e));
783                        }
784                    });
785                }
786            }),
787            QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
788                if let QueueData::Running(RunningQueue {
789                    running,
790                    failed,
791                    eta: etasignal,
792                    ..
793                }) = queue
794                {
795                    etasignal.0.set(eta);
796                    running.update(|v| {
797                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
798                            return;
799                        };
800                        let e = v.remove(i);
801                        e.steps.update(|m| m.insert(target, TaskState::Failed));
802                        failed.update(|v| v.push(e));
803                    });
804                }
805            }),
806        }
807    }
808}
809
810#[derive(Clone, Copy)]
811struct AllQueues {
812    show: RwSignal<bool>,
813    selected: RwSignal<NonZeroU32>,
814    queue_names: RwSignal<VecMap<NonZeroU32, String>>,
815    queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
816    queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
817}
818
819impl AllQueues {
820    fn new(ids: Vec<QueueInfo>) -> Self {
821        let queues = RwSignal::new(
822            ids.iter()
823                .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
824                .collect(),
825        );
826        let selected = ids.first().map_or_else(
827            || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
828            |v| v.id,
829        );
830        let mut queue_names = VecMap::default();
831        let mut queue_repos = VecMap::default();
832        for d in ids {
833            queue_names.insert(d.id, d.name);
834            queue_repos.insert(d.id, d.archives);
835        }
836        Self {
837            show: RwSignal::new(false),
838            selected: RwSignal::new(selected),
839            queues,
840            queue_names: RwSignal::new(queue_names),
841            queue_repos: RwSignal::new(queue_repos),
842        }
843    }
844}
845
846#[derive(Clone)]
847#[allow(dead_code)]
848enum QueueData {
849    Idle(RwSignal<Vec<Entry>>),
850    Running(RunningQueue),
851    Empty,
852    Finished(Vec<Entry>, Vec<Entry>),
853}
854
855#[derive(Clone, Copy)] //,serde::Serialize,serde::Deserialize)]
856#[allow(dead_code)]
857struct RunningQueue {
858    running: RwSignal<Vec<Entry>>,
859    queue: RwSignal<Vec<Entry>>,
860    blocked: RwSignal<Vec<Entry>>,
861    failed: RwSignal<Vec<Entry>>,
862    done: RwSignal<Vec<Entry>>,
863    eta: WrappedEta,
864}
865
866#[derive(Clone, Copy)]
867struct WrappedEta(RwSignal<flams_utils::time::Eta>);
868
869#[allow(clippy::cast_precision_loss)]
870impl WrappedEta {
871    fn into_view(self) -> impl IntoView {
872        use thaw::ProgressBar;
873        inject_css(
874            "flams-eta",
875            r"
876.flams-progress-bar {height:10px;}
877    ",
878        );
879        let pctg = Memo::new(move |_| {
880            let eta = self.0.get();
881            ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
882        });
883        let time_left = move || {
884            let eta = self.0.get();
885            if eta.time_left == Delta::default() {
886                "N/A".to_string()
887            } else {
888                eta.time_left.max_seconds().to_string()
889            }
890        };
891        view! {
892          <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
893            {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
894          </div>
895        }
896    }
897}