flams_utils/
change_listener.rs

1#[derive(Debug)]
2pub struct ChangeListener<T: Clone> {
3    pub inner: async_broadcast::Receiver<T>,
4}
5impl<T: Clone + Send + Sync> ChangeListener<T> {
6    pub fn get(&mut self) -> Option<T> {
7        match self.inner.try_recv() {
8            Ok(t) => Some(t),
9            Err(async_broadcast::TryRecvError::Empty | async_broadcast::TryRecvError::Closed) => {
10                None
11            }
12            Err(e) => {
13                tracing::error!("Error in ChangeListener::get: {:?}", e);
14                None
15            }
16        }
17    }
18    pub async fn read(&mut self) -> Option<T> {
19        self.inner.recv_direct().await.ok()
20    }
21}
22impl<T: Clone> Clone for ChangeListener<T> {
23    fn clone(&self) -> Self {
24        Self {
25            inner: self.inner.new_receiver(),
26        }
27    }
28}
29
30#[derive(Debug, Clone)]
31pub struct ChangeSender<T: Clone> {
32    inner: async_broadcast::Sender<T>,
33    // keeps the channel open:
34    _recv: async_broadcast::InactiveReceiver<T>,
35}
36impl<T: Clone> ChangeSender<T> {
37    #[must_use]
38    pub fn new(cap: usize) -> Self {
39        let (mut s, r) = async_broadcast::broadcast(cap);
40        s.set_overflow(true);
41        Self {
42            inner: s,
43            _recv: r.deactivate(),
44        }
45    }
46    pub fn send(&self, msg: T) {
47        match self.inner.try_broadcast(msg) {
48            Ok(_) | Err(async_broadcast::TrySendError::Inactive(_)) => (),
49            _ => todo!(),
50        }
51    }
52
53    #[inline]
54    pub fn lazy_send<F: FnOnce() -> T>(&self, msg: F) {
55        if self.inner.receiver_count() > 0 {
56            self.send(msg());
57        }
58    }
59    #[inline]
60    #[must_use]
61    pub fn listener(&self) -> ChangeListener<T> {
62        ChangeListener {
63            inner: self.inner.new_receiver(),
64        }
65    }
66}