1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use hyper::{
client::{Client, HttpConnector},
Body, Request as HttpRequest,
};
use futures::stream::StreamExt;
use crate::error::{RetryAfter, WebPushError};
use http::header::{RETRY_AFTER, CONTENT_LENGTH};
use hyper_tls::HttpsConnector;
use crate::message::{WebPushMessage, WebPushService};
use crate::services::{autopush, firebase};
use std::future::Future;
pub struct WebPushClient {
client: Client<HttpsConnector<HttpConnector>>,
}
impl WebPushClient {
pub fn new() -> WebPushClient {
let mut builder = Client::builder();
builder.keep_alive(true);
WebPushClient {
client: builder.build(HttpsConnector::new()),
}
}
pub fn send(&self, message: WebPushMessage) -> impl Future<Output = Result<(), WebPushError>> + 'static {
let service = message.service.clone();
let request: HttpRequest<Body> = match service {
WebPushService::Firebase => firebase::build_request(message),
_ => autopush::build_request(message),
};
trace!("Request: {:?}", request);
let requesting = self.client.request(request);
async move {
let response = requesting.await?;
let retry_after = response
.headers()
.get(RETRY_AFTER)
.and_then(|ra| ra.to_str().ok())
.and_then(|ra| RetryAfter::from_str(ra));
let response_status = response.status();
trace!("Response status: {}", response_status);
let content_length: usize = response
.headers()
.get(CONTENT_LENGTH)
.and_then(|s| s.to_str().ok())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let mut body: Vec<u8> = Vec::with_capacity(content_length);
let mut chunks = response.into_body();
while let Some(chunk) = chunks.next().await {
body.extend_from_slice(&chunk?);
}
trace!("Body: {:?}", body);
let response = match service {
WebPushService::Firebase => {
firebase::parse_response(response_status, body.to_vec())
}
_ => autopush::parse_response(response_status, body.to_vec()),
};
debug!("Response: {:?}", response);
if let Err(WebPushError::ServerError(None)) = response {
Err(WebPushError::ServerError(retry_after))
} else {
Ok(response?)
}
}
}
}