rs_dapi_client/
transport.rs1pub(crate) mod grpc;
4#[cfg(not(target_arch = "wasm32"))]
5pub(crate) mod tonic_channel;
6#[cfg(target_arch = "wasm32")]
7pub(crate) mod wasm_channel;
8
9use crate::connection_pool::ConnectionPool;
10pub use crate::request_settings::AppliedRequestSettings;
11use crate::{CanRetry, RequestSettings, Uri};
12use dapi_grpc::mock::Mockable;
13pub use futures::future::BoxFuture;
14use std::any;
15use std::fmt::Debug;
16use std::time::Duration;
17
18#[cfg(not(target_arch = "wasm32"))]
19pub use tonic_channel::{
20 create_channel, CoreGrpcClient, PlatformGrpcClient, TokioBackonSleeper as BackonSleeper,
21};
22#[cfg(target_arch = "wasm32")]
23pub use wasm_channel::{
24 create_channel, CoreGrpcClient, PlatformGrpcClient, WasmBackonSleeper as BackonSleeper,
25};
26
27#[cfg(not(target_arch = "wasm32"))]
29pub async fn sleep(duration: Duration) {
30 tokio::time::sleep(duration).await;
31}
32
33#[cfg(target_arch = "wasm32")]
35pub async fn sleep(duration: Duration) {
36 wasm_channel::into_send_sleep(duration).await;
37}
38
39pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable {
42 type Client: TransportClient;
44
45 type Response: Mockable + Send + Debug;
47
48 const SETTINGS_OVERRIDES: RequestSettings;
50
51 fn request_name(&self) -> &'static str {
53 any::type_name::<Self>()
54 }
55
56 fn response_name(&self) -> &'static str {
58 any::type_name::<Self::Response>()
59 }
60
61 fn method_name(&self) -> &'static str;
63
64 fn execute_transport<'c>(
66 self,
67 client: &'c mut Self::Client,
68 settings: &AppliedRequestSettings,
69 ) -> BoxFuture<'c, Result<Self::Response, TransportError>>;
70}
71
72#[derive(Debug, thiserror::Error)]
74#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
75pub enum TransportError {
76 #[error("grpc error: {0}")]
78 Grpc(
79 #[from]
80 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
81 dapi_grpc::tonic::Status,
82 ),
83}
84
85impl Clone for TransportError {
86 fn clone(&self) -> Self {
87 match self {
88 TransportError::Grpc(status) => {
89 let cloned_status = dapi_grpc::tonic::Status::with_details_and_metadata(
92 status.code(),
93 status.message(),
94 status.details().to_vec().into(),
95 status.metadata().clone(),
96 );
97 TransportError::Grpc(cloned_status)
98 }
99 }
100 }
101}
102
103impl CanRetry for TransportError {
104 fn can_retry(&self) -> bool {
105 match self {
106 TransportError::Grpc(status) => status.can_retry(),
107 }
108 }
109
110 fn rate_limit_ban_duration(&self) -> Option<std::time::Duration> {
111 match self {
112 TransportError::Grpc(status) => status.rate_limit_ban_duration(),
113 }
114 }
115}
116
117impl Mockable for TransportError {
121 #[cfg(feature = "mocks")]
122 fn mock_serialize(&self) -> Option<Vec<u8>> {
123 Some(serde_json::to_vec(self).expect("serialize Transport error"))
124 }
125
126 #[cfg(feature = "mocks")]
127 fn mock_deserialize(data: &[u8]) -> Option<Self> {
128 Some(serde_json::from_slice(data).expect("deserialize Transport error"))
129 }
130}
131
132impl Mockable for Box<TransportError> {
134 #[cfg(feature = "mocks")]
135 fn mock_serialize(&self) -> Option<Vec<u8>> {
136 self.as_ref().mock_serialize()
137 }
138
139 #[cfg(feature = "mocks")]
140 fn mock_deserialize(data: &[u8]) -> Option<Self> {
141 TransportError::mock_deserialize(data).map(Box::new)
142 }
143}
144
145pub trait TransportClient: Send + Sized {
147 fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, TransportError>;
149
150 fn with_uri_and_settings(
152 uri: Uri,
153 settings: &AppliedRequestSettings,
154 pool: &ConnectionPool,
155 ) -> Result<Self, TransportError>;
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use dapi_grpc::tonic::Code;
162
163 #[test]
164 fn test_tonic_status_can_retry_retryable_codes() {
165 let retryable_codes = vec![
166 Code::Ok,
167 Code::DataLoss,
168 Code::Cancelled,
169 Code::Unknown,
170 Code::DeadlineExceeded,
171 Code::ResourceExhausted,
172 Code::Aborted,
173 Code::Internal,
174 Code::Unavailable,
175 Code::Unimplemented,
178 ];
179
180 for code in retryable_codes {
181 let status = dapi_grpc::tonic::Status::new(code, "test");
182 assert!(
183 status.can_retry(),
184 "Expected code {:?} to be retryable",
185 code
186 );
187 }
188 }
189
190 #[test]
191 fn test_tonic_status_can_retry_non_retryable_codes() {
192 let non_retryable_codes = vec![
193 Code::InvalidArgument,
194 Code::NotFound,
195 Code::AlreadyExists,
196 Code::PermissionDenied,
197 Code::FailedPrecondition,
198 Code::OutOfRange,
199 Code::Unauthenticated,
200 ];
201
202 for code in non_retryable_codes {
203 let status = dapi_grpc::tonic::Status::new(code, "test");
204 assert!(
205 !status.can_retry(),
206 "Expected code {:?} to be non-retryable",
207 code
208 );
209 }
210 }
211
212 #[test]
213 fn test_transport_error_can_retry() {
214 let retryable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
215 assert!(retryable.can_retry());
216
217 let non_retryable = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
218 assert!(!non_retryable.can_retry());
219 }
220
221 #[test]
225 fn test_tonic_status_rate_limit_ban_duration() {
226 use dapi_grpc::tonic::metadata::MetadataValue;
227
228 let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
230 status
231 .metadata_mut()
232 .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap());
233 assert_eq!(
234 status.rate_limit_ban_duration(),
235 Some(std::time::Duration::from_secs(30))
236 );
237
238 let no_header = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
240 assert!(no_header.rate_limit_ban_duration().is_none());
241
242 for code in [
244 Code::Ok,
245 Code::Unavailable,
246 Code::Internal,
247 Code::DeadlineExceeded,
248 ] {
249 let mut s = dapi_grpc::tonic::Status::new(code, "x");
250 s.metadata_mut()
251 .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap());
252 assert!(
253 s.rate_limit_ban_duration().is_none(),
254 "code {code:?} must return None"
255 );
256 }
257 }
258
259 #[test]
260 fn test_transport_error_rate_limit_ban_duration_delegates() {
261 use dapi_grpc::tonic::metadata::MetadataValue;
262
263 let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429");
264 status
265 .metadata_mut()
266 .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap());
267 let rate_limited = TransportError::Grpc(status);
268 assert_eq!(
269 rate_limited.rate_limit_ban_duration(),
270 Some(std::time::Duration::from_secs(45))
271 );
272 assert!(rate_limited.can_retry());
274
275 let unavailable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down"));
276 assert!(unavailable.rate_limit_ban_duration().is_none());
277 assert!(unavailable.can_retry());
278 }
279
280 #[test]
281 fn test_transport_error_clone() {
282 let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message"));
283
284 let cloned = original.clone();
285
286 match (&original, &cloned) {
287 (TransportError::Grpc(orig), TransportError::Grpc(clone)) => {
288 assert_eq!(orig.code(), clone.code());
289 assert_eq!(orig.message(), clone.message());
290 }
291 }
292 }
293
294 #[test]
295 fn test_transport_error_display() {
296 let err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("service down"));
297 let display = format!("{}", err);
298 assert!(display.contains("service down"));
299 }
300
301 #[cfg(feature = "mocks")]
302 #[test]
303 fn test_transport_error_mock_roundtrip() {
304 let original =
305 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test roundtrip"));
306 let serialized = original.mock_serialize().expect("should serialize");
307 let deserialized =
308 TransportError::mock_deserialize(&serialized).expect("should deserialize");
309
310 match deserialized {
311 TransportError::Grpc(status) => {
312 assert_eq!(status.code(), Code::Unavailable);
313 }
314 }
315 }
316
317 #[cfg(feature = "mocks")]
318 #[test]
319 fn test_boxed_transport_error_mock_roundtrip() {
320 let original = Box::new(TransportError::Grpc(dapi_grpc::tonic::Status::internal(
321 "boxed test",
322 )));
323 let serialized = original.mock_serialize().expect("should serialize");
324 let deserialized =
325 Box::<TransportError>::mock_deserialize(&serialized).expect("should deserialize");
326
327 match *deserialized {
328 TransportError::Grpc(status) => {
329 assert_eq!(status.code(), Code::Internal);
330 }
331 }
332 }
333}