Skip to content

Commit a70922f

Browse files
committed
Breaking: Remove Disconnect Channel in ChannelReporter implementation
Previously we used a oneshot channel to rendezvous between the reporter and the listener when disconnecting the event channel between them. Doing so was required for `ChannelReporter::disconnect`, to wait for already received events to be handled. A disadvantage of how the listener was setup in conjunction with the reporter, made it difficult to have the thread used by the channel based implementation, return a value (at the end of the scope of the thread, i.e. without more channels). In the new implementation, in Reporter::disconnect, we just use the fact that when all senders of the channel are dropped, the channel will be in a disconnected state. Since already received events may still be processed, we split up the disconnecting and finishing up of the processing of events in two parts. The user is now responsible to wait for the processing of events to finish, instead of the disconnect method. This can be achieved by running FinishProcessing::finish_processing which will block until all events have been processed.
1 parent 8c4d485 commit a70922f

File tree

12 files changed

+293
-286
lines changed

12 files changed

+293
-286
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "storyteller"
3-
version = "0.4.2"
3+
version = "0.5.0"
44
edition = "2018"
55

66
authors = ["Martijn Gribnau <[email protected]>"]
@@ -17,8 +17,6 @@ exclude = ["/.github", "docs/sketches/*.png"]
1717
default = ["channel_reporter"]
1818
channel_reporter = ["crossbeam-channel"]
1919

20-
experimental_handle_disconnect_ack = []
21-
2220
[dependencies.crossbeam-channel]
2321
version = "0.5"
2422
optional = true
@@ -28,6 +26,9 @@ serde = { version = "1", features = ["derive"] }
2826
serde_json = "1"
2927
indicatif = "0.16.2"
3028

29+
# parameterized tests
30+
yare = "1.0.1"
31+
3132
[[example]]
3233
name = "json_lines"
3334
required-features = ["channel_reporter"]

examples/json_lines.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,13 @@ use std::io::{Stderr, Write};
22
use std::sync::{Arc, Mutex};
33
use std::time::Duration;
44
use std::{io, thread};
5-
use storyteller::EventHandler;
5+
use storyteller::{EventHandler, FinishProcessing};
66

7-
use storyteller::{
8-
disconnect_channel, event_channel, ChannelEventListener, ChannelReporter, EventListener,
9-
Reporter,
10-
};
7+
use storyteller::{event_channel, ChannelEventListener, ChannelReporter, EventListener, Reporter};
118

129
// See the test function `bar` in src/tests.rs for an example where the handler is a progress bar.
1310
fn main() {
1411
let (sender, receiver) = event_channel::<ExampleEvent>();
15-
let (disconnect_sender, disconnect_receiver) = disconnect_channel();
1612

1713
// Handlers are implemented by you. Here you find one which writes jsonlines messages to stderr.
1814
// This can be anything, for example a progress bar (see src/tests.rs for an example of this),
@@ -21,17 +17,17 @@ fn main() {
2117
let handler = JsonHandler::default();
2218

2319
// This one is included with the library. It just needs to be hooked up with a channel.
24-
let reporter = ChannelReporter::new(sender, disconnect_receiver);
20+
let reporter = ChannelReporter::new(sender);
2521

2622
// This one is also included with the library. It also needs to be hooked up with a channel.
27-
let listener = ChannelEventListener::new(receiver, disconnect_sender);
23+
let listener = ChannelEventListener::new(receiver);
2824

2925
// Here we use the jsonlines handler we defined above, in combination with the default `EventListener`
3026
// implementation on the `ChannelEventListener` we used above.
3127
//
3228
// If we don't run the handler, we'll end up in an infinite loop, because our `reporter.disconnect()`
3329
// below will block until it receives a Disconnect message.
34-
listener.run_handler(handler);
30+
let fin = listener.run_handler(handler);
3531

3632
#[allow(unused_must_use)]
3733
// sending events can fail, but we'll assume they won't for this example
@@ -55,7 +51,12 @@ fn main() {
5551
reporter.report_event(ExampleEvent::text("[status]\t\tFour"));
5652
}
5753

54+
// Within the ChannelReporter, the sender is dropped, thereby disconnecting the channel
55+
// Already sent events can still be processed.
5856
let _ = reporter.disconnect();
57+
58+
// To keep the processing of already sent events alive, we block the handler
59+
let _ = fin.finish_processing();
5960
}
6061

6162
// ------- Events + Disconnect

src/channel_reporter/channel.rs

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! Channels which can be used by the `ChannelReporter` and `ChannelEventListener`.
22
3-
use crate::Disconnect;
43
use std::fmt::Formatter;
54
use std::{any, fmt};
65

@@ -14,12 +13,18 @@ pub fn event_channel<Event>() -> (EventSender<Event>, EventReceiver<Event>) {
1413
}
1514

1615
/// A sender, used by `ChannelReporter` and `ChannelEventListener`.
16+
#[derive(Clone)]
1717
pub struct EventSender<T>(crossbeam_channel::Sender<T>);
1818

1919
impl<T> EventSender<T> {
2020
pub fn send(&self, message: T) -> Result<(), EventSendError<T>> {
2121
self.0.send(message).map_err(|err| EventSendError(err.0))
2222
}
23+
24+
/// When all senders are disconnected, the channel is disconnected
25+
pub fn disconnect(self) {
26+
drop(self.0)
27+
}
2328
}
2429

2530
/// A receiver, used by `ChannelReporter` and `ChannelEventListener`.
@@ -31,6 +36,12 @@ impl<T> EventReceiver<T> {
3136
}
3237
}
3338

39+
impl<T> Clone for EventReceiver<T> {
40+
fn clone(&self) -> Self {
41+
Self(self.0.clone())
42+
}
43+
}
44+
3445
#[derive(PartialEq, Eq, Clone, Copy)]
3546
pub struct EventSendError<T>(pub T);
3647

@@ -42,38 +53,3 @@ impl<T> fmt::Debug for EventSendError<T> {
4253

4354
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
4455
pub struct EventRecvError;
45-
46-
// --- Disconnect channel variants
47-
48-
/// A sender, used to communicate Disconnect's between the `ChannelReporter` and `ChannelEventListener`.
49-
pub struct DisconnectSender(crossbeam_channel::Sender<Disconnect>);
50-
51-
impl DisconnectSender {
52-
pub fn acknowledge_disconnection(&self) -> Result<(), DisconnectSendError> {
53-
self.0.send(Disconnect).map_err(|_| DisconnectSendError)
54-
}
55-
}
56-
57-
/// A receiver, used to communicate Disconnect's between the `ChannelReporter` and `ChannelEventListener`.
58-
pub struct DisconnectReceiver(crossbeam_channel::Receiver<Disconnect>);
59-
60-
impl DisconnectReceiver {
61-
pub(crate) fn recv(&self) -> Result<Disconnect, DisconnectRecvError> {
62-
self.0.recv().map_err(|_| DisconnectRecvError)
63-
}
64-
}
65-
66-
/// A channel used to by the `ChannelEventListener` to acknowledge the disconnection of the `ChannelReporter`.
67-
///
68-
/// Allows us to wait
69-
pub fn disconnect_channel() -> (DisconnectSender, DisconnectReceiver) {
70-
let (sender, receiver) = crossbeam_channel::bounded::<Disconnect>(0);
71-
72-
(DisconnectSender(sender), DisconnectReceiver(receiver))
73-
}
74-
75-
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
76-
pub struct DisconnectSendError;
77-
78-
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
79-
pub struct DisconnectRecvError;

src/channel_reporter/listener.rs

Lines changed: 70 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,33 @@
1-
use crate::{DisconnectSender, EventHandler, EventListener, EventReceiver};
1+
use crate::listener::FinishProcessing;
2+
use crate::{EventHandler, EventListener, EventReceiver};
23
use std::thread;
4+
use std::thread::JoinHandle;
35

46
/// A listener which uses a channel to receive messages of type `Event`, and uses
5-
/// a thread to run the event handler (in [`crate::ChannelEventListener::run_handler`].
7+
/// a thread to run the event handler (in [`ChannelEventListener::run_handler`]).
68
///
7-
/// The channels required to create an instance can be created by calling the [`crate::event_channel`]
8-
/// and [`crate::disconnect_channel`] functions.
9+
/// The channel based receiver required to create an instance can be created by calling the
10+
/// [`event_channel()`] function.
911
///
10-
/// The [`crate::Reporter`] associated with this event listener is the [`crate::ChannelReporter`].
12+
/// The [`Reporter`] associated with this event listener is the [`ChannelReporter`].
13+
///
14+
/// [`ChannelEventListener::run_handler`]: crate::ChannelEventListener::run_handler
15+
/// [`event_channel()`]: crate::event_channel
16+
/// [`Reporter`]: crate::Reporter
17+
/// [`ChannelReporter`]: crate::ChannelReporter
1118
pub struct ChannelEventListener<Event> {
12-
message_receiver: EventReceiver<Event>,
13-
disconnect_sender: DisconnectSender,
19+
event_receiver: EventReceiver<Event>,
1420
}
1521

1622
impl<Event> ChannelEventListener<Event> {
1723
/// Create a new channel based event listener.
1824
///
19-
/// The channels required to create an instance can be created by calling the [`crate::event_channel`]
20-
/// and [`crate::disconnect_channel`] functions.
21-
pub fn new(
22-
message_receiver: EventReceiver<Event>,
23-
disconnect_sender: DisconnectSender,
24-
) -> Self {
25-
Self {
26-
message_receiver,
27-
disconnect_sender,
28-
}
29-
}
30-
31-
/// If you use `ChannelEventListener` by wrapping it, instead of using it directly,
32-
/// for example if you want to write your own `EventListener` implementation,
33-
/// you will need this `&EventReceiver` to receive events.
34-
///
35-
/// ### Example
36-
///
37-
/// **NB:** This example should **not** be used on its own! It does not contain a fully working listener!
38-
/// See [`crate::EventListener`] on how to implement your own listener instead.
39-
///
40-
/// ```no_run
41-
/// // NB: This example is incomplete!
42-
/// // It does not contain a fully working listener!
43-
///
44-
/// use storyteller::{ChannelEventListener, EventHandler, EventListener};
45-
///
46-
/// struct MyEvent;
47-
///
48-
/// struct WrappingListener {
49-
/// listener: ChannelEventListener<MyEvent>,
50-
/// }
51-
///
52-
/// impl EventListener for WrappingListener {
53-
/// type Event = MyEvent;
54-
///
55-
/// fn run_handler<H>(self, handler: H) where H: EventHandler<Event=Self::Event> {
56-
///
57-
/// let disconnect_sender = self.listener.disconnect_sender();
58-
/// let message_receiver = self.listener.message_receiver(); // <---
59-
///
60-
/// loop {
61-
/// if let Err(_) = message_receiver.recv() {
62-
/// disconnect_sender.acknowledge_disconnection().unwrap();
63-
/// }
64-
/// }
65-
/// }
66-
/// }
67-
/// ```
68-
pub fn message_receiver(&self) -> &EventReceiver<Event> {
69-
&self.message_receiver
70-
}
71-
72-
/// If you use `ChannelEventListener` by wrapping it, instead of using it directly,
73-
/// for example if you want to write your own `EventListener` implementation,
74-
/// you will need this `&DisconnectSender` to acknowledge when a reporter disconnects.
75-
///
76-
/// ### Example
77-
///
78-
/// **NB:** This example should **not*** be used on its own! It does not contain a fully working listener!
79-
/// See [`crate::EventListener`] on how to implement your own listener instead.
80-
///
81-
/// ```no_run
82-
/// // NB: This example is incomplete!
83-
/// // It does not contain a fully working listener!
84-
///
85-
/// use storyteller::{ChannelEventListener, EventHandler, EventListener};
86-
///
87-
/// struct MyEvent;
88-
///
89-
/// struct WrappingListener {
90-
/// listener: ChannelEventListener<MyEvent>,
91-
/// }
25+
/// The channel based receiver required to create an instance can be created by calling the
26+
/// [`event_channel()`] function.
9227
///
93-
/// impl EventListener for WrappingListener {
94-
/// type Event = MyEvent;
95-
///
96-
/// fn run_handler<H>(self, handler: H) where H: EventHandler<Event=Self::Event> {
97-
///
98-
/// let disconnect_sender = self.listener.disconnect_sender(); // <---
99-
/// let message_receiver = self.listener.message_receiver();
100-
///
101-
/// loop {
102-
/// if let Err(_) = message_receiver.recv() {
103-
/// disconnect_sender.acknowledge_disconnection().unwrap();
104-
/// }
105-
/// }
106-
/// }
107-
/// }
108-
/// ```
109-
pub fn disconnect_sender(&self) -> &DisconnectSender {
110-
&self.disconnect_sender
28+
/// [`event_channel()`]: crate::event_channel
29+
pub fn new(event_receiver: EventReceiver<Event>) -> Self {
30+
Self { event_receiver }
11131
}
11232
}
11333

@@ -116,32 +36,68 @@ where
11636
Event: Send + 'static,
11737
{
11838
type Event = Event;
39+
type FinishProcessingHandle = ChannelFinalizeHandler;
11940

120-
fn run_handler<H>(self, handler: H)
41+
fn run_handler<H>(&self, handler: H) -> Self::FinishProcessingHandle
12142
where
122-
H: EventHandler<Event = Self::Event>,
43+
H: EventHandler<Event = Self::Event> + 'static,
12344
{
124-
thread::spawn(move || {
125-
let disconnect_sender = self.disconnect_sender();
126-
let message_receiver = self.message_receiver();
45+
let event_receiver = self.event_receiver.clone();
12746

128-
loop {
129-
match message_receiver.recv() {
47+
let handle = thread::spawn(move || {
48+
//
49+
'evl: loop {
50+
match event_receiver.recv() {
13051
Ok(message) => handler.handle(message),
13152
Err(_disconnect) => {
13253
handler.finish();
133-
134-
let _ack = disconnect_sender.acknowledge_disconnection();
135-
136-
#[cfg(not(feature = "experimental_handle_disconnect_ack"))]
137-
{
138-
_ack.expect("Failed to send disconnect acknowledgement!");
139-
}
140-
141-
break;
54+
break 'evl;
14255
}
14356
}
14457
}
14558
});
59+
60+
ChannelFinalizeHandler::new(handle)
61+
}
62+
}
63+
64+
/// A [`FinishProcessing`] implementation for the [`ChannelEventListener`].
65+
/// Used to wait for the [`EventHandler`] ran by the `listener` to finish processing
66+
/// events.
67+
///
68+
/// ### Caution: Infinite looping
69+
///
70+
/// Calling [`FinishProcessing::finish_processing`] without first disconnecting
71+
/// the sender channel of the reporter will cause the program to be stuck in an infinite
72+
/// loop.
73+
///
74+
/// The reason for this is that disconnecting the channel causes the loop to process
75+
/// a disconnect event, where we break out of the loop. If this disconnect does not
76+
/// happen, the thread processing events will not be finished, and
77+
/// [`FinishProcessing::finish_processing`] will block, since it waits for the thread
78+
/// to be finished.
79+
///
80+
/// To disconnect the sender channel of the reporter, call [`ChannelReporter::disconnect`].
81+
///
82+
/// [`FinishProcessing`]: crate::FinishProcessing
83+
/// [`EventHandler`]: crate::EventHandler
84+
/// [`ChannelEventListener`]: crate::ChannelEventListener
85+
/// [`ChannelReporter::disconnect`]: crate::ChannelReporter::disconnect
86+
#[must_use]
87+
pub struct ChannelFinalizeHandler {
88+
handle: JoinHandle<()>,
89+
}
90+
91+
impl ChannelFinalizeHandler {
92+
fn new(handle: JoinHandle<()>) -> Self {
93+
Self { handle }
94+
}
95+
}
96+
97+
impl FinishProcessing for ChannelFinalizeHandler {
98+
type Err = ();
99+
100+
fn finish_processing(self) -> Result<(), Self::Err> {
101+
self.handle.join().map_err(|_| ())
146102
}
147103
}

0 commit comments

Comments
 (0)