1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use super::*;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct Connection {
collection: Arc<Mutex<Collection>>
}
impl Connection {
pub fn new(collection: Arc<Mutex<Collection>>) -> Connection {
Connection {
collection: collection
}
}
pub fn publish(&self, event: Event) -> Result<u64, DatabaseError> {
self.collection.lock().unwrap().publish(event)
}
pub fn subscribe(&self, query: Query) -> Result<EventStream, DatabaseError> {
self.collection.lock().unwrap().subscribe(query)
}
pub fn close(self) {
drop(self)
}
}
#[cfg(test)]
mod tests {
use super::super::*;
use exar_testkit::*;
#[test]
fn test_connection() {
let mut db = Database::new(DatabaseConfig::default());
let ref collection_name = random_collection_name();
let collection = db.get_collection(collection_name).expect("Unable to get collection");
let connection = Connection::new(collection);
let test_event = Event::new("data", vec!["tag1", "tag2"]);
assert_eq!(connection.publish(test_event.clone()), Ok(1));
let query = Query::current();
let retrieved_events: Vec<_> = connection.subscribe(query).unwrap().take(1).collect();
let expected_event = test_event.clone().with_id(1).with_timestamp(retrieved_events[0].timestamp);
assert_eq!(retrieved_events, vec![expected_event]);
connection.close();
assert!(db.drop_collection(collection_name).is_ok());
assert!(!db.contains_collection(collection_name));
}
}