1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use hyper::client::{HttpConnector, Client};
use hyper_tls::HttpsConnector;
use hyper::client::{Request as HttpRequest, Response as HttpResponse};
use hyper::header::RetryAfter;
use futures::{Future, Poll};
use futures::future::{ok, err};
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use tokio_service::Service;
use tokio_timer::{Timer, Timeout};
use std::fmt;
use std::time::{SystemTime, Duration};
use services::{firebase, autopush};
use error::WebPushError;
use message::{WebPushMessage, WebPushService};
pub struct WebPushResponse(Box<Future<Item = (), Error = WebPushError> + 'static>);
impl fmt::Debug for WebPushResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Future<Response>")
}
}
impl Future for WebPushResponse {
type Item = ();
type Error = WebPushError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
pub struct WebPushClient {
client: Client<HttpsConnector<HttpConnector>>,
timer: Timer,
}
impl WebPushClient {
pub fn new(handle: &Handle) -> Result<WebPushClient, WebPushError> {
let client = Client::configure()
.connector(HttpsConnector::new(4, handle)?)
.keep_alive(true)
.build(handle);
Ok(WebPushClient {
client: client,
timer: Timer::default(),
})
}
pub fn send(&self, message: WebPushMessage) -> WebPushResponse {
self.call(message)
}
pub fn send_with_timeout(
&self,
message: WebPushMessage,
timeout: Duration,
) -> Timeout<WebPushResponse> {
self.timer.timeout(self.send(message), timeout)
}
}
impl Service for WebPushClient {
type Request = WebPushMessage;
type Response = ();
type Error = WebPushError;
type Future = WebPushResponse;
fn call(&self, message: Self::Request) -> Self::Future {
let service = message.service.clone();
let request: HttpRequest = match service {
WebPushService::Firebase =>
firebase::build_request(message),
_ =>
autopush::build_request(message),
};
let request_f = self.client.request(request).map_err(
|_| WebPushError::Unspecified,
);
let push_f = request_f.and_then(move |response: HttpResponse| {
let retry_after = response.headers().get::<RetryAfter>().map(|ra| *ra);
let response_status = response.status().clone();
response
.body()
.map_err(|_| WebPushError::Unspecified)
.concat2()
.and_then(move |body| {
let response = match service {
WebPushService::Firebase =>
firebase::parse_response(response_status, body.to_vec()),
_ =>
autopush::parse_response(response_status, body.to_vec()),
};
match response {
Err(WebPushError::ServerError(None)) => {
let retry_duration = match retry_after {
Some(RetryAfter::Delay(duration)) => Some(duration),
Some(RetryAfter::DateTime(retry_time)) => {
let retry_system_time: SystemTime = retry_time.into();
let duration = retry_system_time
.duration_since(SystemTime::now())
.unwrap_or(Duration::new(0, 0));
Some(duration)
}
None => None,
};
err(WebPushError::ServerError(retry_duration))
}
Err(e) => err(e),
Ok(()) => ok(()),
}
})
});
WebPushResponse(Box::new(push_f))
}
}