1use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16 transport::{TransportClient, TransportRequest},
17 AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18 RequestSettings,
19};
20
21#[derive(Debug, thiserror::Error, Clone)]
23#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
24pub enum DapiClientError {
25 #[error("transport error: {0}")]
27 Transport(
28 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
29 TransportError,
30 ),
31 #[error("no available addresses to use")]
33 NoAvailableAddresses,
34 #[error("no available addresses to retry, last error: {0}")]
37 NoAvailableAddressesToRetry(
38 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39 Box<TransportError>,
40 ),
41 #[error("address list error: {0}")]
43 AddressList(AddressListError),
44
45 #[cfg(feature = "mocks")]
46 #[error("mock error: {0}")]
47 Mock(#[from] crate::mock::MockError),
49}
50
51impl CanRetry for DapiClientError {
52 fn can_retry(&self) -> bool {
53 use DapiClientError::*;
54 match self {
55 NoAvailableAddresses => false,
56 NoAvailableAddressesToRetry(_) => false,
57 Transport(transport_error) => transport_error.can_retry(),
58 AddressList(_) => false,
59 #[cfg(feature = "mocks")]
60 Mock(_) => false,
61 }
62 }
63
64 fn is_no_available_addresses(&self) -> bool {
65 matches!(
66 self,
67 DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
68 )
69 }
70}
71
72impl Mockable for DapiClientError {
76 #[cfg(feature = "mocks")]
77 fn mock_serialize(&self) -> Option<Vec<u8>> {
78 Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
79 }
80
81 #[cfg(feature = "mocks")]
82 fn mock_deserialize(data: &[u8]) -> Option<Self> {
83 Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct DapiClient {
90 address_list: AddressList,
91 settings: RequestSettings,
92 pool: ConnectionPool,
93 #[cfg(not(target_arch = "wasm32"))]
94 pub ca_certificate: Option<Certificate>,
96 #[cfg(feature = "dump")]
97 pub(crate) dump_dir: Option<std::path::PathBuf>,
98}
99
100impl DapiClient {
101 pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
103 let address_count = 3 * address_list.len();
105
106 Self {
107 address_list,
108 settings,
109 pool: ConnectionPool::new(address_count),
110 #[cfg(feature = "dump")]
111 dump_dir: None,
112 #[cfg(not(target_arch = "wasm32"))]
113 ca_certificate: None,
114 }
115 }
116
117 #[cfg(not(target_arch = "wasm32"))]
126 pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
127 self.ca_certificate = Some(ca_cert);
128
129 self
130 }
131
132 pub fn address_list(&self) -> &AddressList {
134 &self.address_list
135 }
136
137 pub fn get_live_addresses(&self) -> Vec<crate::Address> {
156 self.address_list.get_live_addresses()
157 }
158}
159
160pub fn update_address_ban_status<R, E>(
163 address_list: &AddressList,
164 result: &ExecutionResult<R, E>,
165 applied_settings: &AppliedRequestSettings,
166) where
167 E: CanRetry + Display + Debug,
168{
169 match &result {
170 Ok(response) => {
171 if address_list.is_banned(&response.address) {
173 if address_list.unban(&response.address) {
174 tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
175 } else {
176 tracing::debug!(
179 address = ?response.address,
180 "unable to unban address {} because it's not in the list anymore",
181 response.address
182 );
183 }
184 }
185 }
186 Err(error) => {
187 if error.can_retry() {
188 if let Some(address) = error.address.as_ref() {
189 if applied_settings.ban_failed_address {
190 if address_list.ban(address) {
191 tracing::warn!(
192 ?address,
193 ?error,
194 "ban address {address} due to error: {error}"
195 );
196 } else {
197 tracing::debug!(
200 ?address,
201 ?error,
202 "unable to ban address {address} because it's not in the list anymore"
203 );
204 }
205 } else {
206 tracing::debug!(
207 ?error,
208 ?address,
209 "we should ban the address {address} due to the error but banning is disabled"
210 );
211 }
212 } else {
213 tracing::debug!(
214 ?error,
215 "we should ban an address due to the error but address is absent"
216 );
217 }
218 }
219 }
220 };
221}
222
223#[cfg(test)]
224#[allow(clippy::items_after_test_module)]
225mod tests {
226 use super::*;
227
228 fn mock_address() -> crate::Address {
229 "http://127.0.0.1:3000".parse().expect("valid address")
230 }
231
232 fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
233 AppliedRequestSettings {
234 connect_timeout: None,
235 timeout: Duration::from_secs(10),
236 retries: 5,
237 ban_failed_address: ban,
238 max_decoding_message_size: None,
239 #[cfg(not(target_arch = "wasm32"))]
240 ca_certificate: None,
241 }
242 }
243
244 #[test]
245 fn test_can_retry_no_available_addresses() {
246 let err = DapiClientError::NoAvailableAddresses;
247 assert!(!err.can_retry());
248 }
249
250 #[test]
251 fn test_can_retry_no_available_addresses_to_retry() {
252 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
253 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
254 assert!(!err.can_retry());
255 }
256
257 #[test]
258 fn test_can_retry_transport_retryable() {
259 let transport_err =
260 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
261 let err = DapiClientError::Transport(transport_err);
262 assert!(err.can_retry());
263 }
264
265 #[test]
266 fn test_can_retry_transport_non_retryable() {
267 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
268 let err = DapiClientError::Transport(transport_err);
269 assert!(!err.can_retry());
270 }
271
272 #[test]
273 fn test_can_retry_address_list_error() {
274 let err =
275 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
276 assert!(!err.can_retry());
277 }
278
279 #[cfg(feature = "mocks")]
280 #[test]
281 fn test_can_retry_mock_error() {
282 let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
283 "test".to_string(),
284 ));
285 assert!(!err.can_retry());
286 }
287
288 #[test]
289 fn test_is_no_available_addresses() {
290 assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
291
292 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
293 assert!(
294 DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
295 .is_no_available_addresses()
296 );
297
298 let transport_err =
299 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
300 assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
301 }
302
303 #[test]
304 fn test_update_address_ban_status_success_unbans() {
305 let mut address_list = AddressList::new();
306 let addr = mock_address();
307 address_list.add(addr.clone());
308 address_list.ban(&addr);
309 assert!(address_list.is_banned(&addr));
310
311 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
312 inner: 42,
313 retries: 0,
314 address: addr.clone(),
315 });
316
317 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
318
319 assert!(!address_list.is_banned(&addr));
320 }
321
322 #[test]
323 fn test_update_address_ban_status_success_on_unbanned_is_noop() {
324 let mut address_list = AddressList::new();
325 let addr = mock_address();
326 address_list.add(addr.clone());
327
328 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
329 inner: 42,
330 retries: 0,
331 address: addr.clone(),
332 });
333
334 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
336 assert!(!address_list.is_banned(&addr));
337 }
338
339 #[test]
340 fn test_update_address_ban_status_retryable_error_bans_address() {
341 let mut address_list = AddressList::new();
342 let addr = mock_address();
343 address_list.add(addr.clone());
344
345 let transport_err =
346 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
347 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
348 inner: DapiClientError::Transport(transport_err),
349 retries: 0,
350 address: Some(addr.clone()),
351 });
352
353 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
354 assert!(address_list.is_banned(&addr));
355 }
356
357 #[test]
358 fn test_update_address_ban_status_retryable_error_ban_disabled() {
359 let mut address_list = AddressList::new();
360 let addr = mock_address();
361 address_list.add(addr.clone());
362
363 let transport_err =
364 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
365 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
366 inner: DapiClientError::Transport(transport_err),
367 retries: 0,
368 address: Some(addr.clone()),
369 });
370
371 update_address_ban_status(&address_list, &result, &make_applied_settings(false));
372 assert!(!address_list.is_banned(&addr));
374 }
375
376 #[test]
377 fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
378 let mut address_list = AddressList::new();
379 let addr = mock_address();
380 address_list.add(addr.clone());
381
382 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
383 inner: DapiClientError::NoAvailableAddresses,
384 retries: 0,
385 address: Some(addr.clone()),
386 });
387
388 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
389 assert!(!address_list.is_banned(&addr));
390 }
391
392 #[test]
393 fn test_update_address_ban_status_retryable_error_no_address() {
394 let address_list = AddressList::new();
395
396 let transport_err =
397 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
398 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
399 inner: DapiClientError::Transport(transport_err),
400 retries: 0,
401 address: None,
402 });
403
404 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
406 }
407
408 #[test]
409 fn test_update_address_ban_status_unban_removed_address() {
410 let mut address_list = AddressList::new();
411 let addr = mock_address();
412 address_list.add(addr.clone());
413 address_list.ban(&addr);
414
415 address_list.remove(&addr);
417
418 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
419 inner: 42,
420 retries: 0,
421 address: addr.clone(),
422 });
423
424 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
426 }
427
428 #[test]
429 fn test_update_address_ban_status_ban_removed_address() {
430 let address_list = AddressList::new();
431 let addr = mock_address();
432
433 let transport_err =
434 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
435 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
436 inner: DapiClientError::Transport(transport_err),
437 retries: 0,
438 address: Some(addr),
439 });
440
441 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
443 }
444
445 #[test]
446 fn test_dapi_client_new() {
447 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
448 .parse()
449 .unwrap();
450 let client = DapiClient::new(address_list, RequestSettings::default());
451 assert_eq!(client.address_list().len(), 2);
452 }
453
454 #[test]
455 fn test_dapi_client_get_live_addresses() {
456 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
457 .parse()
458 .unwrap();
459 let client = DapiClient::new(address_list, RequestSettings::default());
460 let live = client.get_live_addresses();
461 assert_eq!(live.len(), 2);
462 }
463
464 #[cfg(not(target_arch = "wasm32"))]
465 #[test]
466 fn test_dapi_client_with_ca_certificate() {
467 let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
468 let client = DapiClient::new(address_list, RequestSettings::default());
469 let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
470 let client = client.with_ca_certificate(cert);
471 assert!(client.ca_certificate.is_some());
472 }
473
474 #[cfg(feature = "mocks")]
475 #[test]
476 fn test_dapi_client_error_mock_serialize_deserialize() {
477 use dapi_grpc::mock::Mockable;
478
479 let err = DapiClientError::NoAvailableAddresses;
480 let serialized = err.mock_serialize().expect("should serialize");
481 let deserialized =
482 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
483 assert!(matches!(
484 deserialized,
485 DapiClientError::NoAvailableAddresses
486 ));
487 }
488
489 #[cfg(feature = "mocks")]
490 #[test]
491 fn test_dapi_client_error_transport_mock_roundtrip() {
492 use dapi_grpc::mock::Mockable;
493
494 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
495 let err = DapiClientError::Transport(transport_err);
496 let serialized = err.mock_serialize().expect("should serialize");
497 let deserialized =
498 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
499 assert!(matches!(deserialized, DapiClientError::Transport(_)));
500 }
501
502 #[test]
503 fn test_dapi_client_error_display() {
504 let err = DapiClientError::NoAvailableAddresses;
505 let display = format!("{}", err);
506 assert!(display.contains("no available addresses"));
507
508 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
509 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
510 let display = format!("{}", err);
511 assert!(display.contains("no available addresses to retry"));
512
513 let err =
514 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
515 let display = format!("{}", err);
516 assert!(display.contains("address list error"));
517 }
518}
519
520#[async_trait]
521impl DapiRequestExecutor for DapiClient {
522 async fn execute<R>(
524 &self,
525 request: R,
526 settings: RequestSettings,
527 ) -> ExecutionResult<R::Response, DapiClientError>
528 where
529 R: TransportRequest + Mockable,
530 R::Response: Mockable,
531 TransportError: Mockable,
532 {
533 let applied_settings = self
535 .settings
536 .override_by(R::SETTINGS_OVERRIDES)
537 .override_by(settings)
538 .finalize();
539 #[cfg(not(target_arch = "wasm32"))]
540 let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
541
542 #[cfg(feature = "dump")]
544 let dump_dir = self.dump_dir.clone();
545 #[cfg(feature = "dump")]
546 let dump_request = request.clone();
547
548 let max_retries = applied_settings.retries;
549 let retry_delay = Duration::from_millis(10);
550
551 let mut retries: usize = 0;
552 let mut last_transport_error: Option<TransportError> = None;
554
555 let result: ExecutionResult<R::Response, DapiClientError> = async {
556 loop {
557 let Some(address) = self.address_list.get_live_address() else {
559 let error = if let Some(transport_error) = last_transport_error.take() {
561 tracing::debug!(
562 "no addresses available, returning last transport error"
563 );
564 DapiClientError::NoAvailableAddressesToRetry(Box::new(
565 transport_error,
566 ))
567 } else {
568 DapiClientError::NoAvailableAddresses
569 };
570
571 return Err(ExecutionError {
572 inner: error,
573 retries,
574 address: None,
575 });
576 };
577
578 tracing::trace!(
579 ?request,
580 "calling {} with {} request",
581 request.method_name(),
582 request.request_name(),
583 );
584
585 let transport_request = request.clone();
586 let response_name = request.response_name();
587
588 let transport_client_result = R::Client::with_uri_and_settings(
590 address.uri().clone(),
591 &applied_settings,
592 &self.pool,
593 );
594
595 let mut transport_client = match transport_client_result {
596 Ok(client) => client,
597 Err(transport_error) => {
598 let can_retry_error = transport_error.can_retry();
599
600 let cloned_error = transport_error.clone();
602
603 let execution_error = ExecutionError {
604 inner: DapiClientError::Transport(transport_error),
605 retries,
606 address: Some(address.clone()),
607 };
608
609 update_address_ban_status::<R::Response, DapiClientError>(
610 &self.address_list,
611 &Err(execution_error.clone()),
612 &applied_settings,
613 );
614
615 if can_retry_error && retries < max_retries {
616 last_transport_error = Some(cloned_error);
618
619 retries += 1;
620 tracing::warn!(
621 error = ?execution_error,
622 "retrying error with sleeping {} secs",
623 retry_delay.as_secs_f32()
624 );
625 transport::sleep(retry_delay).await;
626 continue;
627 }
628
629 return Err(execution_error);
630 }
631 };
632
633 let result = transport_request
635 .execute_transport(&mut transport_client, &applied_settings)
636 .instrument(tracing::trace_span!(
637 "execute_request",
638 ?address,
639 settings = ?applied_settings,
640 method = request.method_name(),
641 ))
642 .await;
643
644 let execution_result = match result {
645 Ok(response) => {
646 tracing::trace!(response = ?response, "received {} response", response_name);
647 Ok(ExecutionResponse {
648 inner: response,
649 retries,
650 address: address.clone(),
651 })
652 }
653 Err(transport_error) => {
654 tracing::debug!(error = ?transport_error, "received error: {transport_error}");
655 Err(ExecutionError {
656 inner: DapiClientError::Transport(transport_error),
657 retries,
658 address: Some(address.clone()),
659 })
660 }
661 };
662
663 update_address_ban_status::<R::Response, DapiClientError>(
664 &self.address_list,
665 &execution_result,
666 &applied_settings,
667 );
668
669 match execution_result {
670 Ok(response) => return Ok(response),
671 Err(error) => {
672 if error.can_retry() && retries < max_retries {
673 if let DapiClientError::Transport(ref te) = error.inner {
675 last_transport_error = Some(te.clone());
676 }
677
678 retries += 1;
679 tracing::warn!(
680 ?error,
681 "retrying error with sleeping {} secs",
682 retry_delay.as_secs_f32()
683 );
684 transport::sleep(retry_delay).await;
685 continue;
686 }
687
688 return Err(error);
689 }
690 }
691 }
692 }
693 .instrument(tracing::info_span!("request routine"))
694 .await;
695
696 if let Err(error) = &result {
697 if !error.can_retry() {
698 tracing::error!(?error, "request failed");
699 }
700 }
701
702 #[cfg(feature = "dump")]
704 Self::dump_request_response(&dump_request, &result, dump_dir);
705
706 result
707 }
708}