1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use hyper::{
    client::{Client, HttpConnector},
    Body, Request as HttpRequest,
};
use futures::stream::StreamExt;
use crate::error::{RetryAfter, WebPushError};
use http::header::{RETRY_AFTER, CONTENT_LENGTH};
use hyper_tls::HttpsConnector;
use crate::message::{WebPushMessage, WebPushService};
use crate::services::{autopush, firebase};
use std::future::Future;

/// An async client for sending the notification payload.
pub struct WebPushClient {
    client: Client<HttpsConnector<HttpConnector>>,
}

impl WebPushClient {
    pub fn new() -> WebPushClient {
        let mut builder = Client::builder();
        builder.keep_alive(true);

        WebPushClient {
            client: builder.build(HttpsConnector::new()),
        }
    }

    /// Sends a notification. Never times out.
    pub fn send(&self, message: WebPushMessage) -> impl Future<Output = Result<(), WebPushError>> + 'static {
        let service = message.service.clone();

        let request: HttpRequest<Body> = match service {
            WebPushService::Firebase => firebase::build_request(message),
            _ => autopush::build_request(message),
        };

        trace!("Request: {:?}", request);

        let requesting = self.client.request(request);

        async move {
            let response = requesting.await?;

            let retry_after = response
                .headers()
                .get(RETRY_AFTER)
                .and_then(|ra| ra.to_str().ok())
                .and_then(|ra| RetryAfter::from_str(ra));

            let response_status = response.status();
            trace!("Response status: {}", response_status);

            let content_length: usize = response
                .headers()
                .get(CONTENT_LENGTH)
                .and_then(|s| s.to_str().ok())
                .and_then(|s| s.parse().ok())
                .unwrap_or(0);

            let mut body: Vec<u8> = Vec::with_capacity(content_length);
            let mut chunks = response.into_body();

            while let Some(chunk) = chunks.next().await {
                body.extend_from_slice(&chunk?);
            }
            trace!("Body: {:?}", body);

            let response = match service {
                WebPushService::Firebase => {
                    firebase::parse_response(response_status, body.to_vec())
                }
                _ => autopush::parse_response(response_status, body.to_vec()),
            };

            debug!("Response: {:?}", response);

            if let Err(WebPushError::ServerError(None)) = response {
                Err(WebPushError::ServerError(retry_after))
            } else {
                Ok(response?)
            }
        }
    }
}