rs_dapi_client/
transport.rs

1//! Transport options that DAPI requests use under the hood.
2
3pub(crate) mod grpc;
4#[cfg(not(target_arch = "wasm32"))]
5pub(crate) mod tonic_channel;
6#[cfg(target_arch = "wasm32")]
7pub(crate) mod wasm_channel;
8
9use crate::connection_pool::ConnectionPool;
10pub use crate::request_settings::AppliedRequestSettings;
11use crate::{CanRetry, RequestSettings, Uri};
12use dapi_grpc::mock::Mockable;
13pub use futures::future::BoxFuture;
14use std::any;
15use std::fmt::Debug;
16use std::time::Duration;
17
18#[cfg(not(target_arch = "wasm32"))]
19pub use tonic_channel::{
20    create_channel, CoreGrpcClient, PlatformGrpcClient, TokioBackonSleeper as BackonSleeper,
21};
22#[cfg(target_arch = "wasm32")]
23pub use wasm_channel::{
24    create_channel, CoreGrpcClient, PlatformGrpcClient, WasmBackonSleeper as BackonSleeper,
25};
26
27/// Sleep for the given duration.
28#[cfg(not(target_arch = "wasm32"))]
29pub async fn sleep(duration: Duration) {
30    tokio::time::sleep(duration).await;
31}
32
33/// Sleep for the given duration.
34#[cfg(target_arch = "wasm32")]
35pub async fn sleep(duration: Duration) {
36    wasm_channel::into_send_sleep(duration).await;
37}
38
39/// Generic transport layer request.
40/// Requires [Clone] as could be retried and a client in general consumes a request.
41pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable {
42    /// A client specific to this type of transport.
43    type Client: TransportClient;
44
45    /// Transport layer response.
46    type Response: Mockable + Send + Debug;
47
48    /// Settings that will override [DapiClient](crate::DapiClient)'s ones each time the request is executed.
49    const SETTINGS_OVERRIDES: RequestSettings;
50
51    /// gRPC request name
52    fn request_name(&self) -> &'static str {
53        any::type_name::<Self>()
54    }
55
56    /// gRPC response name
57    fn response_name(&self) -> &'static str {
58        any::type_name::<Self::Response>()
59    }
60
61    /// gRPC method name
62    fn method_name(&self) -> &'static str;
63
64    /// Perform transport request asynchronously.
65    fn execute_transport<'c>(
66        self,
67        client: &'c mut Self::Client,
68        settings: &AppliedRequestSettings,
69    ) -> BoxFuture<'c, Result<Self::Response, TransportError>>;
70}
71
72/// Transport error type.
73#[derive(Debug, thiserror::Error)]
74#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
75pub enum TransportError {
76    /// gRPC error
77    #[error("grpc error: {0}")]
78    Grpc(
79        #[from]
80        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
81        dapi_grpc::tonic::Status,
82    ),
83}
84
85impl Clone for TransportError {
86    fn clone(&self) -> Self {
87        match self {
88            TransportError::Grpc(status) => {
89                // tonic::Status doesn't implement Clone, so we reconstruct it
90                // from its components. Note: this loses the original error source.
91                let cloned_status = dapi_grpc::tonic::Status::with_details_and_metadata(
92                    status.code(),
93                    status.message(),
94                    status.details().to_vec().into(),
95                    status.metadata().clone(),
96                );
97                TransportError::Grpc(cloned_status)
98            }
99        }
100    }
101}
102
103impl CanRetry for TransportError {
104    fn can_retry(&self) -> bool {
105        match self {
106            TransportError::Grpc(status) => status.can_retry(),
107        }
108    }
109}
110
111/// Serialization of [TransportError].
112///
113/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
114impl Mockable for TransportError {
115    #[cfg(feature = "mocks")]
116    fn mock_serialize(&self) -> Option<Vec<u8>> {
117        Some(serde_json::to_vec(self).expect("serialize Transport error"))
118    }
119
120    #[cfg(feature = "mocks")]
121    fn mock_deserialize(data: &[u8]) -> Option<Self> {
122        Some(serde_json::from_slice(data).expect("deserialize Transport error"))
123    }
124}
125
126/// Serialization of boxed [TransportError].
127impl Mockable for Box<TransportError> {
128    #[cfg(feature = "mocks")]
129    fn mock_serialize(&self) -> Option<Vec<u8>> {
130        self.as_ref().mock_serialize()
131    }
132
133    #[cfg(feature = "mocks")]
134    fn mock_deserialize(data: &[u8]) -> Option<Self> {
135        TransportError::mock_deserialize(data).map(Box::new)
136    }
137}
138
139/// Generic way to create a transport client from provided [Uri].
140pub trait TransportClient: Send + Sized {
141    /// Build client using node's url.
142    fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, TransportError>;
143
144    /// Build client using node's url and [AppliedRequestSettings].
145    fn with_uri_and_settings(
146        uri: Uri,
147        settings: &AppliedRequestSettings,
148        pool: &ConnectionPool,
149    ) -> Result<Self, TransportError>;
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use dapi_grpc::tonic::Code;
156
157    #[test]
158    fn test_tonic_status_can_retry_retryable_codes() {
159        let retryable_codes = vec![
160            Code::Ok,
161            Code::DataLoss,
162            Code::Cancelled,
163            Code::Unknown,
164            Code::DeadlineExceeded,
165            Code::ResourceExhausted,
166            Code::Aborted,
167            Code::Internal,
168            Code::Unavailable,
169        ];
170
171        for code in retryable_codes {
172            let status = dapi_grpc::tonic::Status::new(code, "test");
173            assert!(
174                status.can_retry(),
175                "Expected code {:?} to be retryable",
176                code
177            );
178        }
179    }
180
181    #[test]
182    fn test_tonic_status_can_retry_non_retryable_codes() {
183        let non_retryable_codes = vec![
184            Code::InvalidArgument,
185            Code::NotFound,
186            Code::AlreadyExists,
187            Code::PermissionDenied,
188            Code::FailedPrecondition,
189            Code::OutOfRange,
190            Code::Unimplemented,
191            Code::Unauthenticated,
192        ];
193
194        for code in non_retryable_codes {
195            let status = dapi_grpc::tonic::Status::new(code, "test");
196            assert!(
197                !status.can_retry(),
198                "Expected code {:?} to be non-retryable",
199                code
200            );
201        }
202    }
203
204    #[test]
205    fn test_transport_error_can_retry() {
206        let retryable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
207        assert!(retryable.can_retry());
208
209        let non_retryable = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
210        assert!(!non_retryable.can_retry());
211    }
212
213    #[test]
214    fn test_transport_error_clone() {
215        let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message"));
216
217        let cloned = original.clone();
218
219        match (&original, &cloned) {
220            (TransportError::Grpc(orig), TransportError::Grpc(clone)) => {
221                assert_eq!(orig.code(), clone.code());
222                assert_eq!(orig.message(), clone.message());
223            }
224        }
225    }
226
227    #[test]
228    fn test_transport_error_display() {
229        let err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("service down"));
230        let display = format!("{}", err);
231        assert!(display.contains("service down"));
232    }
233
234    #[cfg(feature = "mocks")]
235    #[test]
236    fn test_transport_error_mock_roundtrip() {
237        let original =
238            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test roundtrip"));
239        let serialized = original.mock_serialize().expect("should serialize");
240        let deserialized =
241            TransportError::mock_deserialize(&serialized).expect("should deserialize");
242
243        match deserialized {
244            TransportError::Grpc(status) => {
245                assert_eq!(status.code(), Code::Unavailable);
246            }
247        }
248    }
249
250    #[cfg(feature = "mocks")]
251    #[test]
252    fn test_boxed_transport_error_mock_roundtrip() {
253        let original = Box::new(TransportError::Grpc(dapi_grpc::tonic::Status::internal(
254            "boxed test",
255        )));
256        let serialized = original.mock_serialize().expect("should serialize");
257        let deserialized =
258            Box::<TransportError>::mock_deserialize(&serialized).expect("should deserialize");
259
260        match *deserialized {
261            TransportError::Grpc(status) => {
262                assert_eq!(status.code(), Code::Internal);
263            }
264        }
265    }
266}