Skip to main content

rs_dapi_client/
transport.rs

1//! Transport options that DAPI requests use under the hood.
2
3pub(crate) mod grpc;
4#[cfg(not(target_arch = "wasm32"))]
5pub(crate) mod tonic_channel;
6#[cfg(target_arch = "wasm32")]
7pub(crate) mod wasm_channel;
8
9use crate::connection_pool::ConnectionPool;
10pub use crate::request_settings::AppliedRequestSettings;
11use crate::{CanRetry, RequestSettings, Uri};
12use dapi_grpc::mock::Mockable;
13pub use futures::future::BoxFuture;
14use std::any;
15use std::fmt::Debug;
16use std::time::Duration;
17
18#[cfg(not(target_arch = "wasm32"))]
19pub use tonic_channel::{
20    create_channel, CoreGrpcClient, PlatformGrpcClient, TokioBackonSleeper as BackonSleeper,
21};
22#[cfg(target_arch = "wasm32")]
23pub use wasm_channel::{
24    create_channel, CoreGrpcClient, PlatformGrpcClient, WasmBackonSleeper as BackonSleeper,
25};
26
27/// Sleep for the given duration.
28#[cfg(not(target_arch = "wasm32"))]
29pub async fn sleep(duration: Duration) {
30    tokio::time::sleep(duration).await;
31}
32
33/// Sleep for the given duration.
34#[cfg(target_arch = "wasm32")]
35pub async fn sleep(duration: Duration) {
36    wasm_channel::into_send_sleep(duration).await;
37}
38
39/// Generic transport layer request.
40/// Requires [Clone] as could be retried and a client in general consumes a request.
41pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable {
42    /// A client specific to this type of transport.
43    type Client: TransportClient;
44
45    /// Transport layer response.
46    type Response: Mockable + Send + Debug;
47
48    /// Settings that will override [DapiClient](crate::DapiClient)'s ones each time the request is executed.
49    const SETTINGS_OVERRIDES: RequestSettings;
50
51    /// gRPC request name
52    fn request_name(&self) -> &'static str {
53        any::type_name::<Self>()
54    }
55
56    /// gRPC response name
57    fn response_name(&self) -> &'static str {
58        any::type_name::<Self::Response>()
59    }
60
61    /// gRPC method name
62    fn method_name(&self) -> &'static str;
63
64    /// Perform transport request asynchronously.
65    fn execute_transport<'c>(
66        self,
67        client: &'c mut Self::Client,
68        settings: &AppliedRequestSettings,
69    ) -> BoxFuture<'c, Result<Self::Response, TransportError>>;
70}
71
72/// Transport error type.
73#[derive(Debug, thiserror::Error)]
74#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
75pub enum TransportError {
76    /// gRPC error
77    #[error("grpc error: {0}")]
78    Grpc(
79        #[from]
80        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
81        dapi_grpc::tonic::Status,
82    ),
83}
84
85impl Clone for TransportError {
86    fn clone(&self) -> Self {
87        match self {
88            TransportError::Grpc(status) => {
89                // tonic::Status doesn't implement Clone, so we reconstruct it
90                // from its components. Note: this loses the original error source.
91                let cloned_status = dapi_grpc::tonic::Status::with_details_and_metadata(
92                    status.code(),
93                    status.message(),
94                    status.details().to_vec().into(),
95                    status.metadata().clone(),
96                );
97                TransportError::Grpc(cloned_status)
98            }
99        }
100    }
101}
102
103impl CanRetry for TransportError {
104    fn can_retry(&self) -> bool {
105        match self {
106            TransportError::Grpc(status) => status.can_retry(),
107        }
108    }
109
110    fn rate_limit_ban_duration(&self) -> Option<std::time::Duration> {
111        match self {
112            TransportError::Grpc(status) => status.rate_limit_ban_duration(),
113        }
114    }
115}
116
117/// Serialization of [TransportError].
118///
119/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
120impl Mockable for TransportError {
121    #[cfg(feature = "mocks")]
122    fn mock_serialize(&self) -> Option<Vec<u8>> {
123        Some(serde_json::to_vec(self).expect("serialize Transport error"))
124    }
125
126    #[cfg(feature = "mocks")]
127    fn mock_deserialize(data: &[u8]) -> Option<Self> {
128        Some(serde_json::from_slice(data).expect("deserialize Transport error"))
129    }
130}
131
132/// Serialization of boxed [TransportError].
133impl Mockable for Box<TransportError> {
134    #[cfg(feature = "mocks")]
135    fn mock_serialize(&self) -> Option<Vec<u8>> {
136        self.as_ref().mock_serialize()
137    }
138
139    #[cfg(feature = "mocks")]
140    fn mock_deserialize(data: &[u8]) -> Option<Self> {
141        TransportError::mock_deserialize(data).map(Box::new)
142    }
143}
144
145/// Generic way to create a transport client from provided [Uri].
146pub trait TransportClient: Send + Sized {
147    /// Build client using node's url.
148    fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, TransportError>;
149
150    /// Build client using node's url and [AppliedRequestSettings].
151    fn with_uri_and_settings(
152        uri: Uri,
153        settings: &AppliedRequestSettings,
154        pool: &ConnectionPool,
155    ) -> Result<Self, TransportError>;
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use dapi_grpc::tonic::Code;
162
163    #[test]
164    fn test_tonic_status_can_retry_retryable_codes() {
165        let retryable_codes = vec![
166            Code::Ok,
167            Code::DataLoss,
168            Code::Cancelled,
169            Code::Unknown,
170            Code::DeadlineExceeded,
171            Code::ResourceExhausted,
172            Code::Aborted,
173            Code::Internal,
174            Code::Unavailable,
175            // Retryable on another node: an old build in a mixed-version network
176            // returns Unimplemented for methods newer nodes already serve.
177            Code::Unimplemented,
178        ];
179
180        for code in retryable_codes {
181            let status = dapi_grpc::tonic::Status::new(code, "test");
182            assert!(
183                status.can_retry(),
184                "Expected code {:?} to be retryable",
185                code
186            );
187        }
188    }
189
190    #[test]
191    fn test_tonic_status_can_retry_non_retryable_codes() {
192        let non_retryable_codes = vec![
193            Code::InvalidArgument,
194            Code::NotFound,
195            Code::AlreadyExists,
196            Code::PermissionDenied,
197            Code::FailedPrecondition,
198            Code::OutOfRange,
199            Code::Unauthenticated,
200        ];
201
202        for code in non_retryable_codes {
203            let status = dapi_grpc::tonic::Status::new(code, "test");
204            assert!(
205                !status.can_retry(),
206                "Expected code {:?} to be non-retryable",
207                code
208            );
209        }
210    }
211
212    #[test]
213    fn test_transport_error_can_retry() {
214        let retryable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
215        assert!(retryable.can_retry());
216
217        let non_retryable = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
218        assert!(!non_retryable.can_retry());
219    }
220
221    /// `rate_limit_ban_duration` returns `Some` only for `ResourceExhausted` with
222    /// a parseable positive `ratelimit-reset` header.  Every other code returns
223    /// `None` regardless of headers.
224    #[test]
225    fn test_tonic_status_rate_limit_ban_duration() {
226        use dapi_grpc::tonic::metadata::MetadataValue;
227
228        // ResourceExhausted + valid header → Some.
229        let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
230        status
231            .metadata_mut()
232            .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap());
233        assert_eq!(
234            status.rate_limit_ban_duration(),
235            Some(std::time::Duration::from_secs(30))
236        );
237
238        // ResourceExhausted without header → None.
239        let no_header = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
240        assert!(no_header.rate_limit_ban_duration().is_none());
241
242        // Non-ResourceExhausted codes → None regardless.
243        for code in [
244            Code::Ok,
245            Code::Unavailable,
246            Code::Internal,
247            Code::DeadlineExceeded,
248        ] {
249            let mut s = dapi_grpc::tonic::Status::new(code, "x");
250            s.metadata_mut()
251                .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap());
252            assert!(
253                s.rate_limit_ban_duration().is_none(),
254                "code {code:?} must return None"
255            );
256        }
257    }
258
259    #[test]
260    fn test_transport_error_rate_limit_ban_duration_delegates() {
261        use dapi_grpc::tonic::metadata::MetadataValue;
262
263        let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
264        status
265            .metadata_mut()
266            .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap());
267        let rate_limited = TransportError::Grpc(status);
268        assert_eq!(
269            rate_limited.rate_limit_ban_duration(),
270            Some(std::time::Duration::from_secs(45))
271        );
272        // Still retryable — rate-limit ban duration doesn't affect can_retry.
273        assert!(rate_limited.can_retry());
274
275        let unavailable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down"));
276        assert!(unavailable.rate_limit_ban_duration().is_none());
277        assert!(unavailable.can_retry());
278    }
279
280    #[test]
281    fn test_transport_error_clone() {
282        let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message"));
283
284        let cloned = original.clone();
285
286        match (&original, &cloned) {
287            (TransportError::Grpc(orig), TransportError::Grpc(clone)) => {
288                assert_eq!(orig.code(), clone.code());
289                assert_eq!(orig.message(), clone.message());
290            }
291        }
292    }
293
294    #[test]
295    fn test_transport_error_display() {
296        let err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("service down"));
297        let display = format!("{}", err);
298        assert!(display.contains("service down"));
299    }
300
301    #[cfg(feature = "mocks")]
302    #[test]
303    fn test_transport_error_mock_roundtrip() {
304        let original =
305            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test roundtrip"));
306        let serialized = original.mock_serialize().expect("should serialize");
307        let deserialized =
308            TransportError::mock_deserialize(&serialized).expect("should deserialize");
309
310        match deserialized {
311            TransportError::Grpc(status) => {
312                assert_eq!(status.code(), Code::Unavailable);
313            }
314        }
315    }
316
317    #[cfg(feature = "mocks")]
318    #[test]
319    fn test_boxed_transport_error_mock_roundtrip() {
320        let original = Box::new(TransportError::Grpc(dapi_grpc::tonic::Status::internal(
321            "boxed test",
322        )));
323        let serialized = original.mock_serialize().expect("should serialize");
324        let deserialized =
325            Box::<TransportError>::mock_deserialize(&serialized).expect("should deserialize");
326
327        match *deserialized {
328            TransportError::Grpc(status) => {
329                assert_eq!(status.code(), Code::Internal);
330            }
331        }
332    }
333}