Skip to main content

flams_router_buildqueue_components/
lib.rs

1#![recursion_limit = "256"]
2#![allow(clippy::must_use_candidate)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4
5#[cfg(any(
6    all(feature = "ssr", feature = "hydrate", not(feature = "docs-only")),
7    not(any(feature = "ssr", feature = "hydrate"))
8))]
9compile_error!("exactly one of the features \"ssr\" or \"hydrate\" must be enabled");
10
11use flams_router_base::{LoginState, require_login, ws::WebSocket};
12use flams_router_base::{maybe_lazy, ws};
13#[cfg(feature = "hydrate")]
14use flams_router_buildqueue_base::server_fns::get_log;
15use flams_router_buildqueue_base::{QueueInfo, RepoInfo, server_fns};
16use flams_router_content::checks::{DocumentCheckResult, ResultExt};
17use flams_router_git_base::server_fns::{get_new_commits, update_from_branch};
18use flams_utils::vecmap::VecMap;
19use flams_web_utils::components::wait_and_then_fn;
20use ftml_component_utils::{Collapsible, Header, LazyCollapsible};
21use ftml_dom::utils::css::inject_css;
22use ftml_ontology::utils::time::{Delta, Eta};
23use ftml_uris::{ArchiveId, DocumentUri};
24use leptos::{either::EitherOf4, prelude::*};
25use leptos_router::hooks::use_params_map;
26use std::num::NonZeroU32;
27
28#[derive(Copy, Clone)]
29struct UpdateQueues(RwSignal<()>);
30
31#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
32pub struct Entry {
33    id: u32,
34    archive: ArchiveId,
35    rel_path: String,
36    #[cfg(feature = "hydrate")]
37    steps: RwSignal<VecMap<String, TaskState>>,
38    #[cfg(not(feature = "hydrate"))]
39    steps: VecMap<String, TaskState>,
40}
41
42impl Entry {
43    #[cfg(not(feature = "hydrate"))]
44    fn as_view(&self) -> impl IntoView + use<> {
45        view! {
46          <li>{format!("[{}]{}",self.archive,self.rel_path)}</li>
47        }
48    }
49
50    #[cfg(feature = "hydrate")]
51    fn as_view(&self) -> AnyView {
52        use flams_router_base::vscode_link;
53
54        let vscode = vscode_link(&self.archive, &self.rel_path);
55
56        let title = format!("[{}]{}", self.archive, self.rel_path);
57        let total = self.steps.with_untracked(|v| v.0.len());
58        let steps = self.steps;
59        let current = move || {
60            steps.with(|v| {
61                v.iter()
62                    .enumerate()
63                    .find_map(|(i, (e, s))| {
64                        if *s == TaskState::Done {
65                            None
66                        } else {
67                            Some((i + 1, e.clone()))
68                        }
69                    })
70                    .unwrap_or_else(|| (total, "Done".to_string()))
71            })
72        };
73        let rel_path = self.rel_path.clone();
74        let archive = self.archive.clone();
75        view! {
76          <li><Collapsible>
77            <Header slot>
78              <b>
79                {title}
80                {move || {let (i,s) = current(); format!(" ({i}/{total}) {s}")}}
81                " "{vscode}
82              </b>
83            </Header>
84            <ol>
85            {let rel_path = rel_path.clone();
86              let archive = archive.clone();
87              move || steps.get().iter().map(|(t,e)|
88              view!(<li>{e.into_view(t.clone(),&archive,&rel_path)}</li>)
89            ).collect_view()}
90            </ol>
91          </Collapsible></li>
92        }
93        .into_any()
94    }
95}
96
97#[cfg(feature = "ssr")]
98impl From<flams_system::building::QueueEntry> for Entry {
99    fn from(e: flams_system::building::QueueEntry) -> Self {
100        #[cfg(feature = "hydrate")]
101        {
102            unreachable!()
103        }
104        #[cfg(not(feature = "hydrate"))]
105        Self {
106            id: e.id.into(),
107            archive: e.archive,
108            rel_path: e.rel_path.to_string(),
109            steps: e
110                .steps
111                .into_iter()
112                .map(|(k, v)| (k.to_string(), v.into()))
113                .collect(),
114        }
115    }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
119pub enum TaskState {
120    Running,
121    Queued,
122    Blocked,
123    Done,
124    Failed,
125    None,
126}
127impl TaskState {
128    #[cfg(feature = "hydrate")]
129    fn into_view(self, t: String, archive: &ArchiveId, rel_path: &str) -> AnyView {
130        match self {
131            Self::Running => view! {<i style="color:yellow">{t}" (Running)"</i>}.into_any(),
132            Self::Queued | Self::Blocked | Self::None => {
133                view! {<span style="color:gray">{t}" (...)"</span>}.into_any()
134            }
135            Self::Done => {
136                let archive = archive.clone();
137                let rel_path = rel_path.to_string();
138                let tc = t.clone();
139                view! {
140                  <LazyCollapsible>
141                    <Header slot><span style="color:green">{t}" (Done)"</span></Header>
142                    {
143                      let archive = archive.clone();
144                      let rel_path = rel_path.clone();
145                      let tc = tc.clone();
146                      let queue = expect_context::<AllQueues>().selected.get_untracked();
147                      require_login(Box::new(move || wait_and_then_fn(
148                          move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
149                          |s| do_log(s)
150                      )))
151                    }
152                  </LazyCollapsible>
153                }
154                .into_any()
155            }
156            Self::Failed => {
157                let archive = archive.clone();
158                let rel_path = rel_path.to_string();
159                let tc = t.clone();
160                view! {
161                  <LazyCollapsible>
162                    <Header slot><span style="color:red">{t}" (Failed)"</span></Header>
163                    {
164                      let archive = archive.clone();
165                      let rel_path = rel_path.clone();
166                      let tc = tc.clone();
167                      let queue = expect_context::<AllQueues>().selected.get_untracked();
168                      require_login(Box::new(move || wait_and_then_fn(
169                          move || get_log(queue,archive.clone(),rel_path.clone(),tc.clone()),
170                          do_log
171                      )))
172                    }
173                  </LazyCollapsible>
174                }
175                .into_any()
176            }
177        }
178    }
179}
180
181fn do_log(s: either::Either<String, String>) -> AnyView {
182    use ftml_component_utils::Scrollbar;
183    view! {<Scrollbar style="max-height: 160px;max-width:80vw;border:2px solid black;padding:5px;">{
184        match s {
185            either::Left(s) => leptos::either::Either::Left(view!{
186                <pre style="width:fit-content;font-size:smaller;">{s}</pre>
187            }),
188            either::Right(v) => leptos::either::Either::Right({
189                ftml_solver_trace::results::DocumentCheckResult::from_json(&v)
190                    .map_or_else(|_| view!{<pre>{v}</pre>}.into_any(),|e| e.render())
191            })
192        }
193    }</Scrollbar>}
194    .into_any()
195}
196
197#[cfg(feature = "ssr")]
198impl From<flams_system::building::TaskState> for TaskState {
199    fn from(e: flams_system::building::TaskState) -> Self {
200        use flams_system::building::TaskState;
201        match e {
202            TaskState::Running => Self::Running,
203            TaskState::Queued => Self::Queued,
204            TaskState::Blocked => Self::Blocked,
205            TaskState::Done => Self::Done,
206            TaskState::Failed => Self::Failed,
207            TaskState::None => Self::None,
208        }
209    }
210}
211
212#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
213pub enum QueueMessage {
214    Idle(Vec<Entry>),
215    Started {
216        running: Vec<Entry>,
217        queue: Vec<Entry>,
218        blocked: Vec<Entry>,
219        failed: Vec<Entry>,
220        done: Vec<Entry>,
221    },
222    Finished {
223        failed: Vec<Entry>,
224        done: Vec<Entry>,
225    },
226    TaskStarted {
227        id: u32,
228        target: String,
229    },
230    TaskSuccess {
231        id: u32,
232        target: String,
233        eta: Eta,
234    },
235    TaskFailed {
236        id: u32,
237        target: String,
238        eta: Eta,
239    },
240    TaskBlocked {
241        id: u32,
242        target: String,
243        eta: Eta,
244    },
245}
246#[cfg(feature = "ssr")]
247impl From<flams_system::building::QueueMessage> for QueueMessage {
248    fn from(e: flams_system::building::QueueMessage) -> Self {
249        use flams_system::building::QueueMessage;
250        match e {
251            QueueMessage::Idle(v) => Self::Idle(v.into_iter().map(Into::into).collect()),
252            QueueMessage::Started {
253                running,
254                queue,
255                blocked,
256                failed,
257                done,
258            } => Self::Started {
259                running: running.into_iter().map(Into::into).collect(),
260                queue: queue.into_iter().map(Into::into).collect(),
261                blocked: blocked.into_iter().map(Into::into).collect(),
262                failed: failed.into_iter().map(Into::into).collect(),
263                done: done.into_iter().map(Into::into).collect(),
264            },
265            QueueMessage::Finished { failed, done } => Self::Finished {
266                failed: failed.into_iter().map(Into::into).collect(),
267                done: done.into_iter().map(Into::into).collect(),
268            },
269            QueueMessage::TaskStarted { id, target } => Self::TaskStarted {
270                id: id.into(),
271                target: target.to_string(),
272            },
273            QueueMessage::TaskSuccess { id, target, eta } => Self::TaskSuccess {
274                id: id.into(),
275                target: target.to_string(),
276                eta,
277            },
278            QueueMessage::TaskFailed { id, target, eta } => Self::TaskFailed {
279                id: id.into(),
280                target: target.to_string(),
281                eta,
282            },
283            QueueMessage::TaskBlocked { id, target, eta } => Self::TaskBlocked {
284                id: id.into(),
285                target: target.to_string(),
286                eta,
287            },
288        }
289    }
290}
291
292// ----------------------------------------------------------------------------------
293
294maybe_lazy!(QueuesTop = queues_top());
295
296//#[component]
297pub fn queues_top() -> AnyView {
298    use flams_web_utils::components::{Layout, LayoutHeader};
299    use ftml_component_utils::{Divider, Spinner, Tab, TabList};
300
301    let update = UpdateQueues(RwSignal::new(()));
302    provide_context(update);
303    (move || {
304        let () = update.0.get();
305        let params = use_params_map();
306        let id = move || params.read().get("queue");
307
308        require_login(Box::new(move || {
309            wait_and_then_fn(server_fns::get_queues, move |v| {
310                if v.is_empty() {
311                    return view!(<div>"(No running queues)"</div>).into_any();
312                }
313                let queues = AllQueues::new(v);
314                if let Some(id) = params.read_untracked().get("queue") && let Ok(id) = id.parse() {
315                    queues.selected.update_untracked(|v| *v = id);
316                }
317                provide_context(queues);
318                let selected_value = RwSignal::new(queues.selected.get_untracked().to_string());
319                let _ = Effect::new(move |_| {
320                    let value = selected_value.get();
321                    let selected = queues.selected.get_untracked();
322                    let value = value.parse().unwrap_or_else(|_| unreachable!());
323                    if selected != value {
324                        queues.selected.set(value);
325                    }
326                });
327                view! {<Layout>
328                    <LayoutHeader slot>
329                        <TabList selected_value>
330                            <For each=move || queues.queues.get() key=|e| e.0 children=move |(i,_)| view!{
331                            <Tab value=i.to_string()>{
332                                queues.queue_names.with_untracked(|m| m.get(&i).cloned()).unwrap_or_else(|| unreachable!())
333                            }</Tab>
334                            }/>
335                        </TabList>
336                        <div style="margin:10px"><Divider/></div>
337                    </LayoutHeader>
338                    {move || {
339                        //let curr = queues.selected.get();
340                        queues.show.update_untracked(|v| *v = false);
341                        QueueSocket::run(queues);
342                        move || view! {
343                        <Show when=move || queues.show.get() fallback=|| view!(<Spinner/>)>{
344                            let ls = move || {
345                                let curr = queues.selected.get();
346                                (curr,queues.queues.with(|m| m.get(&curr).copied()).unwrap_or_else(|| unreachable!()))
347                            };
348                            move || {
349                                let (curr,ls) = ls();
350                                match ls.get() {
351                                    QueueData::Idle(v) => {
352                                        idle(curr,v)
353                                    },
354                                    QueueData::Running(r) => {
355                                        running(curr,r)
356                                    },
357                                    QueueData::Finished(failed,done) => finished(curr,failed,done),
358                                    QueueData::Empty => view!(<div>"Other"</div>).into_any()
359                                }
360                            }
361                        }</Show>
362                        }
363                    }}
364                </Layout>}.into_any()
365            })
366        }) as _)
367    }).into_any()
368}
369
370#[allow(clippy::too_many_lines)]
371fn repos(queue_id: NonZeroU32, allowed: bool) -> AnyView {
372    use ftml_component_utils::{BoldCaption, Table, TableCell, TableHeader, TableRow};
373    if matches!(LoginState::get(), LoginState::NoAccounts) {
374        return ().into_any();
375    }
376    let queues: AllQueues = expect_context();
377    let Some(repos) = queues
378        .queue_repos
379        .with_untracked(|v| v.get(&queue_id).cloned())
380        .flatten()
381    else {
382        return ().into_any();
383    };
384    if repos.is_empty() {
385        return ().into_any();
386    }
387    let style = if allowed { "" } else { "color:gray;" };
388    inject_css("flams-repo-table", include_str!("repo-table.css"));
389    view! {<div style="margin-left:45px;width:fit-content;"><Collapsible>
390          <Header slot><BoldCaption>"Archives"</BoldCaption></Header>
391          <Table class="flams-repo-table">
392            <TableHeader slot>
393              <TableCell><BoldCaption>"Archive"</BoldCaption></TableCell>
394              <TableCell><BoldCaption>"Branch"</BoldCaption></TableCell>
395              <TableCell><BoldCaption>"Commit"</BoldCaption></TableCell>
396            </TableHeader>
397            {
398              repos.into_iter().map(|d| match d {
399                RepoInfo::Copy(id) => leptos::either::Either::Left(view!{
400                  <TableRow>
401                    <TableCell><span style=style>{id.to_string()}</span></TableCell>
402                    <TableCell>"(Copied from MathHub)"</TableCell>
403                  </TableRow>
404                }),
405                RepoInfo::Git{id,branch,commit,remote/*,updates */} => leptos::either::Either::Right({
406                  let updates = RwSignal::<Option<Vec<(String,flams_backend_types::git::Commit)>>>::new(None);
407                  let style = move || if allowed {
408                    updates.with(|updates| updates.as_ref().map_or("",|updates| if updates.is_empty() {
409                      "color:green;"
410                    } else {
411                      "color:yellowgreen;"
412                    }))
413                  } else {style};
414                  let idstr = id.to_string();
415                  view!{
416                    <TableRow>
417                      <TableCell><span style=style>{idstr}</span></TableCell>
418                      <TableCell>{branch}</TableCell>
419                      <TableCell>
420                        {commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}
421                        {if allowed {Some(move || updates.with(|up| {
422                          let Some(up) = up else {
423                            let aid = id.clone();
424                            let toaster = ftml_component_utils::toasts::ToasterInjection::expect_context();
425                            let get_updates = Action::new(move |()| {
426                              let id = aid.clone();
427                              async move {
428                                match get_new_commits(Some(queue_id),id).await {
429                                  Ok(v) => updates.set(Some(v)),
430                                  Err(e) => flams_web_utils::components::error_with_toaster(e, toaster),
431                                }
432                              }
433                            });
434                            return leptos::either::EitherOf3::A(view!{
435                              <button on:click=move |_| {get_updates.dispatch(());}>"Check for updates"</button>
436                            });
437                          };
438                          if up.is_empty() {
439                            return leptos::either::EitherOf3::B(view!(<span style=style>" (already up-to-date)"</span>))
440                          }
441                          let updates = up.clone();
442
443                          leptos::either::EitherOf3::C({
444                            use ftml_component_utils::{Button,ButtonSize,Combobox,ComboboxOption};
445                            let first = updates.first().map(|(name,_)| name.clone()).unwrap_or_default();
446                            let branch = RwSignal::new(first.clone());
447                            let _ = Effect::new(move || if branch.with(String::is_empty) {
448                              branch.set(first.clone());
449                            });
450                            let update : UpdateQueues = expect_context();
451                            //let commit_map:VecMap<_,_> = updates.clone().into();
452                            let archive = id.clone();
453                            let remote = remote.clone();
454                            let act = flams_web_utils::components::message_action(
455                              move |()| update_from_branch(Some(queue_id),archive.clone(),remote.clone(),branch.get_untracked()),
456                              move |(i,_)| {
457                                update.0.set(());
458                                format!("{i} jobs queued")
459                              }
460                            );
461                            view!{
462                              <span style="color:green">
463                                " Updates available: "
464                              </span>
465                              <div style="margin-left:10px">
466                                <Button size=ButtonSize::Small on_click=move |_| {act.dispatch(());}>"Update"</Button>
467                                " from branch: "
468                                <div style="display:inline-block;"><Combobox selected_options=branch>{
469                                  updates.into_iter().map(|(name,commit)| {let vname = name.clone(); view!{
470                                    <ComboboxOption text=vname.clone() value=vname>
471                                      {name}<span style="font-size:x-small">" (Last commit "{commit.id[..8].to_string()}" at "{commit.created_at.to_string()}" by "{commit.author_name}")"</span>
472                                    </ComboboxOption>
473                                  }}).collect_view()
474                                }</Combobox></div>
475                              </div>
476                            }
477                          })
478                        }))} else {None}}
479                      </TableCell>
480                    </TableRow>
481                  }
482                }),
483              }).collect_view()
484            }
485          </Table>
486        </Collapsible></div>
487    }.into_any()
488}
489
490fn delete_action(id: NonZeroU32) -> Action<(), ()> {
491    use ftml_component_utils::toasts::ToasterInjection;
492    let update: UpdateQueues = expect_context();
493    let toaster = ToasterInjection::expect_context();
494    Action::new(move |()| async move {
495        match flams_router_buildqueue_base::server_fns::delete(id).await {
496            Ok(()) => update.0.set(()),
497            Err(e) => flams_web_utils::components::error_with_toaster(e.to_string(), toaster),
498        }
499    })
500}
501
502fn idle(id: NonZeroU32, ls: RwSignal<Vec<Entry>>) -> AnyView {
503    use ftml_component_utils::Button;
504    let act = Action::<(), Result<(), ServerFnError<String>>>::new(move |()| {
505        flams_router_buildqueue_base::server_fns::run(id)
506    });
507    let del = delete_action(id);
508    view! {
509      <div style="width:100%"><div style="position:fixed;right:20px">
510          <Button on_click=move |_| {act.dispatch(());}>"Run"</Button>
511          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
512      </div></div>
513      {repos(id,true)}
514      <ol reversed style="margin-left:30px">
515        <For each=move || ls.get() key=|e| e.id children=|e| e.as_view()/>
516      </ol>
517    }
518    .into_any()
519}
520
521fn running(id: NonZeroU32, queue: RunningQueue) -> AnyView {
522    use ftml_component_utils::{AnchorMenu, AnchorMenuEntry, Button};
523    let del = delete_action(id);
524    let RunningQueue {
525        running,
526        queue,
527        blocked,
528        failed,
529        done,
530        eta,
531    } = queue;
532    view! {
533      <div style="position:fixed;right:20px;z-index:5"><AnchorMenu>
534          <AnchorMenuEntry href="#running">"Running"</AnchorMenuEntry>
535          <AnchorMenuEntry href="#queued">"Queued"</AnchorMenuEntry>
536          <AnchorMenuEntry href="#blocked">"Blocked"</AnchorMenuEntry>
537          <AnchorMenuEntry href="#failed">"Failed"</AnchorMenuEntry>
538          <AnchorMenuEntry href="#finished">"Finished"</AnchorMenuEntry>
539      </AnchorMenu></div>
540      {repos(id,false)}
541      <div style="text-align:left;">
542          {eta.into_view()}
543          <div style="width:100%"><div style="position:fixed;right:20px">
544              <Button on_click=move |_| {del.dispatch(());}>"Abort and Delete"</Button>
545          </div></div>
546          <h3 id="running">"Running ("{move || running.with(Vec::len)}")"</h3>
547          <ul style="margin-left:30px"><For each=move || running.get() key=|e| e.id children=|e| e.as_view()/></ul>
548          <h3 id="queued">"Queued ("{move || queue.with(Vec::len)}")"</h3>
549          <ul style="margin-left:30px"><For each=move || queue.get() key=|e| e.id children=|e| e.as_view()/></ul>
550          <h3 id="blocked">"Blocked ("{move || blocked.with(Vec::len)}")"</h3>
551          <ul style="margin-left:30px"><For each=move || blocked.get() key=|e| e.id children=|e| e.as_view()/></ul>
552          <h3 id="failed">"Failed ("{move || failed.with(Vec::len)}")"</h3>
553          <ul style="margin-left:30px"><For each=move || failed.get() key=|e| e.id children=|e| e.as_view()/></ul>
554          <h3 id="finished">"Finished ("{move || done.with(Vec::len)}")"</h3>
555          <ul style="margin-left:30px"><For each=move || done.get() key=|e| e.id children=|e| e.as_view()/></ul>
556      </div>
557    }.into_any()
558}
559
560fn finished(id: NonZeroU32, failed: Vec<Entry>, done: Vec<Entry>) -> AnyView {
561    use ftml_component_utils::{AnchorMenu, AnchorMenuEntry, Button};
562    let requeue = Action::new(move |()| flams_router_buildqueue_base::server_fns::requeue(id));
563    let num_failed = failed.len();
564    let num_done = done.len();
565    let del = delete_action(id);
566    view! {
567      <div style="width:100%"><div style="position:fixed;right:120px;z-index:10">
568          {if num_failed > 0 {Some(view!(
569            <Button on_click=move |_| {requeue.dispatch(());}>"Requeue Failed"</Button>
570          ))} else { None }}
571          {migrate_button(id,num_failed)}
572          <Button on_click=move |_| {del.dispatch(());}>"Delete"</Button>
573      </div></div>
574      <div style="position:fixed;right:20px;z-index:5"><AnchorMenu>
575          <AnchorMenuEntry href="#failed">"Failed"</AnchorMenuEntry>
576          <AnchorMenuEntry href="#finished">"Finished"</AnchorMenuEntry>
577      </AnchorMenu></div>
578      {repos(id,true)}
579      <div style="text-align:left;">
580          <h3 id="failed">"Failed ("{num_failed}")"</h3>
581          <ul style="margin-left:30px">{
582            failed.iter().map(Entry::as_view).collect_view()
583          }</ul>
584          <h3 id="finished">"Finished ("{num_done}")"</h3>
585          <ul style="margin-left:30px">{
586            done.iter().map(Entry::as_view).collect_view()
587          }</ul>
588      </div>
589    }
590    .into_any()
591}
592
593fn migrate_button(id: NonZeroU32, num_failed: usize) -> AnyView {
594    use ftml_component_utils::{
595        BoldCaption, Button, Dialog, DialogBody, DialogContent, DialogSurface, Divider,
596    };
597    use leptos::either::EitherOf3;
598    if matches!(LoginState::get(), LoginState::NoAccounts) {
599        return ().into_any();
600    }
601    let update: UpdateQueues = expect_context();
602    let migrate = flams_web_utils::components::message_action(
603        move |()| flams_router_buildqueue_base::server_fns::migrate(id),
604        move |i| {
605            update.0.set(());
606            format!("{i} archives migrated")
607        },
608    );
609    if num_failed == 0 {
610        view! {
611          <Button on_click=move |_| {migrate.dispatch(());}>"Migrate"</Button>
612        }
613        .into_any()
614    } else {
615        let clicked = RwSignal::new(false);
616        view! {
617          <Button on_click=move |_| {clicked.set(true);}>"Migrate"</Button>
618          <Dialog open=clicked><DialogSurface><DialogBody><DialogContent>
619            <BoldCaption><span style="color:red">WARNING</span></BoldCaption>
620            <Divider/>
621            <p>{num_failed}" jobs have failed to build!"<br/>"Migrate anyway?"</p>
622            <div>
623              <div style="width:min-content;margin-left:auto;">
624                <Button on_click=move |_| {migrate.dispatch(());}>"Force Migration"</Button>
625              </div>
626            </div>
627          </DialogContent></DialogBody></DialogSurface></Dialog>
628        }
629        .into_any()
630    }
631}
632
633#[derive(Clone)]
634pub struct QueueSocket {
635    #[cfg(feature = "ssr")]
636    listener:
637        Option<flams_utils::change_listener::ChangeListener<flams_system::building::QueueMessage>>,
638    #[cfg(all(not(feature = "docs-only"), feature = "hydrate"))]
639    socket: leptos::web_sys::WebSocket,
640    #[cfg(feature = "docs-only")]
641    socket: (),
642    #[cfg(feature = "hydrate")]
643    _running: RwSignal<bool>,
644}
645impl WebSocket<NonZeroU32, QueueMessage> for QueueSocket {
646    const SERVER_ENDPOINT: &'static str = "/ws/queue";
647}
648
649#[cfg(feature = "ssr")]
650#[async_trait::async_trait]
651impl ws::WebSocketServer<NonZeroU32, QueueMessage> for QueueSocket {
652    async fn new(account: LoginState, _db: ws::DBBackend) -> Option<Self> {
653        match account {
654            LoginState::Admin
655            | LoginState::NoAccounts
656            | LoginState::User { is_admin: true, .. } => {
657                let listener = None; //flams_system::logger().listener();
658                Some(Self {
659                    listener,
660                    #[cfg(feature = "hydrate")]
661                    _running: RwSignal::new(false),
662                    #[cfg(feature = "hydrate")]
663                    socket: unreachable!(),
664                })
665            }
666            _ => None,
667        }
668    }
669    async fn next(&mut self) -> Option<QueueMessage> {
670        loop {
671            match &mut self.listener {
672                None => tokio::time::sleep(tokio::time::Duration::from_secs_f32(0.5)).await,
673                Some(l) => return l.read().await.map(Into::into),
674            }
675        }
676    }
677    async fn handle_message(&mut self, msg: NonZeroU32) -> Option<QueueMessage> {
678        let (lst, msg) = flams_system::building::queue_manager::QueueManager::get()
679            .with_queue(msg.into(), |q| q.map(|q| (q.listener(), q.state_message())))?;
680        self.listener = Some(lst);
681        Some(msg.into())
682    }
683    async fn on_start(&mut self, _: &mut ws::AxumWS) {}
684}
685
686#[cfg(feature = "hydrate")]
687impl ws::WebSocketClient<NonZeroU32, QueueMessage> for QueueSocket {
688    fn new(ws: leptos::web_sys::WebSocket) -> Self {
689        Self {
690            #[cfg(not(feature = "docs-only"))]
691            socket: ws,
692            #[cfg(feature = "docs-only")]
693            socket: (),
694            _running: RwSignal::new(false),
695            #[cfg(feature = "ssr")]
696            listener: unreachable!(),
697        }
698    }
699    fn socket(&mut self) -> &mut leptos::web_sys::WebSocket {
700        #[cfg(not(feature = "docs-only"))]
701        {
702            &mut self.socket
703        }
704        #[cfg(feature = "docs-only")]
705        {
706            unreachable!()
707        }
708    }
709    #[allow(clippy::used_underscore_binding)]
710    fn on_open(&self) -> Option<Box<dyn FnMut()>> {
711        let running = self._running;
712        Some(Box::new(move || {
713            running.set(true);
714        }))
715    }
716}
717
718#[cfg(all(feature = "ssr", not(feature = "hydrate")))]
719impl QueueSocket {
720    fn run(_: AllQueues) {
721        Self::force_start_server();
722    }
723}
724
725#[cfg(feature = "hydrate")]
726impl QueueSocket {
727    fn run(queues: AllQueues) {
728        use ws::WebSocketClient;
729        Self::force_start_client(
730            move |msg| {
731                //tracing::warn!("Starting!");
732                let current = queues.selected.get_untracked();
733                queues.queues.with_untracked(|queues| {
734                    let Some(queue) = queues.get(&current) else {
735                        tracing::error!("Queue not found: {current}");
736                        return;
737                    };
738                    Self::do_msg(*queue, msg);
739                });
740                if !queues.show.get_untracked() {
741                    queues.show.set(true);
742                }
743                None
744            },
745            move |mut socket| {
746                #[allow(clippy::used_underscore_binding)]
747                Effect::new(move |_| {
748                    if socket._running.get() {
749                        let current = queues.selected.get_untracked();
750                        socket.send(&current);
751                    }
752                });
753            },
754        );
755    }
756    fn do_msg(queue: RwSignal<QueueData>, msg: QueueMessage) {
757        match msg {
758            QueueMessage::Idle(entries) => queue.set(QueueData::Idle(RwSignal::new(entries))),
759            QueueMessage::Started {
760                running,
761                queue: actual_queue,
762                blocked,
763                failed,
764                done,
765            } => queue.set(QueueData::Running(RunningQueue {
766                running: RwSignal::new(running),
767                queue: RwSignal::new(actual_queue),
768                blocked: RwSignal::new(blocked),
769                failed: RwSignal::new(failed),
770                done: RwSignal::new(done),
771                eta: WrappedEta(RwSignal::new(Eta::default())),
772            })),
773            QueueMessage::Finished { failed, done } => queue.set(QueueData::Finished(failed, done)),
774            QueueMessage::TaskStarted { id, mut target } => queue.with_untracked(|queue| {
775                if let QueueData::Running(RunningQueue {
776                    queue,
777                    running,
778                    blocked,
779                    ..
780                }) = queue
781                {
782                    let mut worked = false;
783                    queue.update(|v| {
784                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
785                            return;
786                        };
787                        worked = true;
788                        let e = v.remove(i);
789                        e.steps
790                            .update(|m| m.insert(std::mem::take(&mut target), TaskState::Running));
791                        running.update(|running| running.push(e));
792                    });
793                    if !worked {
794                        blocked.update(|v| {
795                            let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id)
796                            else {
797                                return;
798                            };
799                            worked = true;
800                            let e = v.remove(i);
801                            e.steps.update(|m| m.insert(target, TaskState::Running));
802                            running.update(|running| running.push(e));
803                        });
804                    }
805                }
806            }),
807            QueueMessage::TaskSuccess { id, target, eta } => queue.with_untracked(|queue| {
808                if let QueueData::Running(RunningQueue {
809                    queue,
810                    running,
811                    done,
812                    eta: etasignal,
813                    ..
814                }) = queue
815                {
816                    etasignal.0.set(eta);
817                    running.update(|v| {
818                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
819                            return;
820                        };
821                        let e = v.remove(i);
822                        e.steps.update(|m| m.insert(target, TaskState::Done));
823                        if e.steps.with_untracked(|v| {
824                            v.iter()
825                                .any(|(_, v)| *v == TaskState::Queued || *v == TaskState::Blocked)
826                        }) {
827                            queue.update(|v| v.push(e));
828                        } else {
829                            done.update(|v| v.push(e));
830                        }
831                    });
832                }
833            }),
834            QueueMessage::TaskFailed { id, target, eta } => queue.with_untracked(|queue| {
835                if let QueueData::Running(RunningQueue {
836                    running,
837                    failed,
838                    eta: etasignal,
839                    ..
840                }) = queue
841                {
842                    etasignal.0.set(eta);
843                    running.update(|v| {
844                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
845                            return;
846                        };
847                        let e = v.remove(i);
848                        e.steps.update(|m| m.insert(target, TaskState::Failed));
849                        failed.update(|v| v.push(e));
850                    });
851                }
852            }),
853            QueueMessage::TaskBlocked { id, target, eta } => queue.with_untracked(|queue| {
854                if let QueueData::Running(RunningQueue {
855                    running,
856                    blocked,
857                    eta: etasignal,
858                    ..
859                }) = queue
860                {
861                    etasignal.0.set(eta);
862                    running.update(|v| {
863                        let Some((i, _)) = v.iter().enumerate().find(|(_, e)| e.id == id) else {
864                            return;
865                        };
866                        let e = v.remove(i);
867                        e.steps.update(|m| m.insert(target, TaskState::Blocked));
868                        blocked.update(|v| v.push(e));
869                    });
870                }
871            }),
872        }
873    }
874}
875
876#[derive(Clone, Copy)]
877struct AllQueues {
878    show: RwSignal<bool>,
879    selected: RwSignal<NonZeroU32>,
880    queue_names: RwSignal<VecMap<NonZeroU32, String>>,
881    queue_repos: RwSignal<VecMap<NonZeroU32, Option<Vec<RepoInfo>>>>,
882    queues: RwSignal<VecMap<NonZeroU32, RwSignal<QueueData>>>,
883}
884
885impl AllQueues {
886    fn new(ids: Vec<QueueInfo>) -> Self {
887        let queues = RwSignal::new(
888            ids.iter()
889                .map(|v| (v.id, RwSignal::new(QueueData::Empty)))
890                .collect(),
891        );
892        let selected = ids.first().map_or_else(
893            || NonZeroU32::new(1).unwrap_or_else(|| unreachable!()),
894            |v| v.id,
895        );
896        let mut queue_names = VecMap::default();
897        let mut queue_repos = VecMap::default();
898        for d in ids {
899            queue_names.insert(d.id, d.name);
900            queue_repos.insert(d.id, d.archives);
901        }
902        Self {
903            show: RwSignal::new(false),
904            selected: RwSignal::new(selected),
905            queues,
906            queue_names: RwSignal::new(queue_names),
907            queue_repos: RwSignal::new(queue_repos),
908        }
909    }
910}
911
912#[derive(Clone)]
913#[allow(dead_code)]
914enum QueueData {
915    Idle(RwSignal<Vec<Entry>>),
916    Running(RunningQueue),
917    Empty,
918    Finished(Vec<Entry>, Vec<Entry>),
919}
920
921#[derive(Clone, Copy)] //,serde::Serialize,serde::Deserialize)]
922#[allow(dead_code)]
923struct RunningQueue {
924    running: RwSignal<Vec<Entry>>,
925    queue: RwSignal<Vec<Entry>>,
926    blocked: RwSignal<Vec<Entry>>,
927    failed: RwSignal<Vec<Entry>>,
928    done: RwSignal<Vec<Entry>>,
929    eta: WrappedEta,
930}
931
932#[derive(Clone, Copy)]
933struct WrappedEta(RwSignal<ftml_ontology::utils::time::Eta>);
934
935#[allow(clippy::cast_precision_loss)]
936impl WrappedEta {
937    fn into_view(self) -> impl IntoView {
938        use ftml_component_utils::ProgressBar;
939        inject_css(
940            "flams-eta",
941            r"
942.flams-progress-bar {height:10px;}
943    ",
944        );
945        let pctg = Memo::new(move |_| {
946            let eta = self.0.get();
947            ((eta.done as f64 / eta.total as f64) * 1000.0).round() / 1000.0
948        });
949        let time_left = move || {
950            let eta = self.0.get();
951            if eta.time_left == Delta::default() {
952                "N/A".to_string()
953            } else {
954                eta.time_left.max_seconds().to_string()
955            }
956        };
957        view! {
958          <div style="width:500px;"><ProgressBar class="flams-progress-bar" value=pctg/>
959            {move || (pctg.get() * 100.0).to_string().chars().take(4).collect::<String>()} "%; ca. "{time_left}" remaining"
960          </div>
961        }
962    }
963}