rs_dapi_client/
transport.rs1pub(crate) mod grpc;
4#[cfg(not(target_arch = "wasm32"))]
5pub(crate) mod tonic_channel;
6#[cfg(target_arch = "wasm32")]
7pub(crate) mod wasm_channel;
8
9use crate::connection_pool::ConnectionPool;
10pub use crate::request_settings::AppliedRequestSettings;
11use crate::{CanRetry, RequestSettings, Uri};
12use dapi_grpc::mock::Mockable;
13pub use futures::future::BoxFuture;
14use std::any;
15use std::fmt::Debug;
16use std::time::Duration;
17
18#[cfg(not(target_arch = "wasm32"))]
19pub use tonic_channel::{
20 create_channel, CoreGrpcClient, PlatformGrpcClient, TokioBackonSleeper as BackonSleeper,
21};
22#[cfg(target_arch = "wasm32")]
23pub use wasm_channel::{
24 create_channel, CoreGrpcClient, PlatformGrpcClient, WasmBackonSleeper as BackonSleeper,
25};
26
27#[cfg(not(target_arch = "wasm32"))]
29pub async fn sleep(duration: Duration) {
30 tokio::time::sleep(duration).await;
31}
32
33#[cfg(target_arch = "wasm32")]
35pub async fn sleep(duration: Duration) {
36 wasm_channel::into_send_sleep(duration).await;
37}
38
39pub trait TransportRequest: Clone + Send + Sync + Debug + Mockable {
42 type Client: TransportClient;
44
45 type Response: Mockable + Send + Debug;
47
48 const SETTINGS_OVERRIDES: RequestSettings;
50
51 fn request_name(&self) -> &'static str {
53 any::type_name::<Self>()
54 }
55
56 fn response_name(&self) -> &'static str {
58 any::type_name::<Self::Response>()
59 }
60
61 fn method_name(&self) -> &'static str;
63
64 fn execute_transport<'c>(
66 self,
67 client: &'c mut Self::Client,
68 settings: &AppliedRequestSettings,
69 ) -> BoxFuture<'c, Result<Self::Response, TransportError>>;
70}
71
72#[derive(Debug, thiserror::Error)]
74#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
75pub enum TransportError {
76 #[error("grpc error: {0}")]
78 Grpc(
79 #[from]
80 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
81 dapi_grpc::tonic::Status,
82 ),
83}
84
85impl Clone for TransportError {
86 fn clone(&self) -> Self {
87 match self {
88 TransportError::Grpc(status) => {
89 let cloned_status = dapi_grpc::tonic::Status::with_details_and_metadata(
92 status.code(),
93 status.message(),
94 status.details().to_vec().into(),
95 status.metadata().clone(),
96 );
97 TransportError::Grpc(cloned_status)
98 }
99 }
100 }
101}
102
103impl CanRetry for TransportError {
104 fn can_retry(&self) -> bool {
105 match self {
106 TransportError::Grpc(status) => status.can_retry(),
107 }
108 }
109}
110
111impl Mockable for TransportError {
115 #[cfg(feature = "mocks")]
116 fn mock_serialize(&self) -> Option<Vec<u8>> {
117 Some(serde_json::to_vec(self).expect("serialize Transport error"))
118 }
119
120 #[cfg(feature = "mocks")]
121 fn mock_deserialize(data: &[u8]) -> Option<Self> {
122 Some(serde_json::from_slice(data).expect("deserialize Transport error"))
123 }
124}
125
126impl Mockable for Box<TransportError> {
128 #[cfg(feature = "mocks")]
129 fn mock_serialize(&self) -> Option<Vec<u8>> {
130 self.as_ref().mock_serialize()
131 }
132
133 #[cfg(feature = "mocks")]
134 fn mock_deserialize(data: &[u8]) -> Option<Self> {
135 TransportError::mock_deserialize(data).map(Box::new)
136 }
137}
138
139pub trait TransportClient: Send + Sized {
141 fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, TransportError>;
143
144 fn with_uri_and_settings(
146 uri: Uri,
147 settings: &AppliedRequestSettings,
148 pool: &ConnectionPool,
149 ) -> Result<Self, TransportError>;
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use dapi_grpc::tonic::Code;
156
157 #[test]
158 fn test_tonic_status_can_retry_retryable_codes() {
159 let retryable_codes = vec![
160 Code::Ok,
161 Code::DataLoss,
162 Code::Cancelled,
163 Code::Unknown,
164 Code::DeadlineExceeded,
165 Code::ResourceExhausted,
166 Code::Aborted,
167 Code::Internal,
168 Code::Unavailable,
169 ];
170
171 for code in retryable_codes {
172 let status = dapi_grpc::tonic::Status::new(code, "test");
173 assert!(
174 status.can_retry(),
175 "Expected code {:?} to be retryable",
176 code
177 );
178 }
179 }
180
181 #[test]
182 fn test_tonic_status_can_retry_non_retryable_codes() {
183 let non_retryable_codes = vec![
184 Code::InvalidArgument,
185 Code::NotFound,
186 Code::AlreadyExists,
187 Code::PermissionDenied,
188 Code::FailedPrecondition,
189 Code::OutOfRange,
190 Code::Unimplemented,
191 Code::Unauthenticated,
192 ];
193
194 for code in non_retryable_codes {
195 let status = dapi_grpc::tonic::Status::new(code, "test");
196 assert!(
197 !status.can_retry(),
198 "Expected code {:?} to be non-retryable",
199 code
200 );
201 }
202 }
203
204 #[test]
205 fn test_transport_error_can_retry() {
206 let retryable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
207 assert!(retryable.can_retry());
208
209 let non_retryable = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
210 assert!(!non_retryable.can_retry());
211 }
212
213 #[test]
214 fn test_transport_error_clone() {
215 let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message"));
216
217 let cloned = original.clone();
218
219 match (&original, &cloned) {
220 (TransportError::Grpc(orig), TransportError::Grpc(clone)) => {
221 assert_eq!(orig.code(), clone.code());
222 assert_eq!(orig.message(), clone.message());
223 }
224 }
225 }
226
227 #[test]
228 fn test_transport_error_display() {
229 let err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("service down"));
230 let display = format!("{}", err);
231 assert!(display.contains("service down"));
232 }
233
234 #[cfg(feature = "mocks")]
235 #[test]
236 fn test_transport_error_mock_roundtrip() {
237 let original =
238 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test roundtrip"));
239 let serialized = original.mock_serialize().expect("should serialize");
240 let deserialized =
241 TransportError::mock_deserialize(&serialized).expect("should deserialize");
242
243 match deserialized {
244 TransportError::Grpc(status) => {
245 assert_eq!(status.code(), Code::Unavailable);
246 }
247 }
248 }
249
250 #[cfg(feature = "mocks")]
251 #[test]
252 fn test_boxed_transport_error_mock_roundtrip() {
253 let original = Box::new(TransportError::Grpc(dapi_grpc::tonic::Status::internal(
254 "boxed test",
255 )));
256 let serialized = original.mock_serialize().expect("should serialize");
257 let deserialized =
258 Box::<TransportError>::mock_deserialize(&serialized).expect("should deserialize");
259
260 match *deserialized {
261 TransportError::Grpc(status) => {
262 assert_eq!(status.code(), Code::Internal);
263 }
264 }
265 }
266}