flams_utils/
change_listener.rs1#[derive(Debug)]
2pub struct ChangeListener<T: Clone> {
3 pub inner: async_broadcast::Receiver<T>,
4}
5impl<T: Clone + Send + Sync> ChangeListener<T> {
6 pub fn get(&mut self) -> Option<T> {
7 match self.inner.try_recv() {
8 Ok(t) => Some(t),
9 Err(async_broadcast::TryRecvError::Empty | async_broadcast::TryRecvError::Closed) => {
10 None
11 }
12 Err(e) => {
13 tracing::error!("Error in ChangeListener::get: {:?}", e);
14 None
15 }
16 }
17 }
18 pub async fn read(&mut self) -> Option<T> {
19 self.inner.recv_direct().await.ok()
20 }
21}
22impl<T: Clone> Clone for ChangeListener<T> {
23 fn clone(&self) -> Self {
24 Self {
25 inner: self.inner.new_receiver(),
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
31pub struct ChangeSender<T: Clone> {
32 inner: async_broadcast::Sender<T>,
33 _recv: async_broadcast::InactiveReceiver<T>,
35}
36impl<T: Clone> ChangeSender<T> {
37 #[must_use]
38 pub fn new(cap: usize) -> Self {
39 let (mut s, r) = async_broadcast::broadcast(cap);
40 s.set_overflow(true);
41 Self {
42 inner: s,
43 _recv: r.deactivate(),
44 }
45 }
46 pub fn send(&self, msg: T) {
47 match self.inner.try_broadcast(msg) {
48 Ok(_) | Err(async_broadcast::TrySendError::Inactive(_)) => (),
49 _ => todo!(),
50 }
51 }
52
53 #[inline]
54 pub fn lazy_send<F: FnOnce() -> T>(&self, msg: F) {
55 if self.inner.receiver_count() > 0 {
56 self.send(msg());
57 }
58 }
59 #[inline]
60 #[must_use]
61 pub fn listener(&self) -> ChangeListener<T> {
62 ChangeListener {
63 inner: self.inner.new_receiver(),
64 }
65 }
66}