1use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16 transport::{TransportClient, TransportRequest},
17 AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18 RequestSettings,
19};
20
21#[derive(Debug, thiserror::Error, Clone)]
23#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
24pub enum DapiClientError {
25 #[error("transport error: {0}")]
27 Transport(
28 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
29 TransportError,
30 ),
31 #[error("no available addresses to use")]
33 NoAvailableAddresses,
34 #[error("no available addresses to retry, last error: {0}")]
37 NoAvailableAddressesToRetry(
38 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39 Box<TransportError>,
40 ),
41 #[error("address list error: {0}")]
43 AddressList(AddressListError),
44
45 #[cfg(feature = "mocks")]
46 #[error("mock error: {0}")]
47 Mock(#[from] crate::mock::MockError),
49}
50
51impl CanRetry for DapiClientError {
52 fn can_retry(&self) -> bool {
53 use DapiClientError::*;
54 match self {
55 NoAvailableAddresses => false,
56 NoAvailableAddressesToRetry(_) => false,
57 Transport(transport_error) => transport_error.can_retry(),
58 AddressList(_) => false,
59 #[cfg(feature = "mocks")]
60 Mock(_) => false,
61 }
62 }
63
64 fn is_no_available_addresses(&self) -> bool {
65 matches!(
66 self,
67 DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
68 )
69 }
70}
71
72impl Mockable for DapiClientError {
76 #[cfg(feature = "mocks")]
77 fn mock_serialize(&self) -> Option<Vec<u8>> {
78 Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
79 }
80
81 #[cfg(feature = "mocks")]
82 fn mock_deserialize(data: &[u8]) -> Option<Self> {
83 Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct DapiClient {
90 address_list: AddressList,
91 settings: RequestSettings,
92 pool: ConnectionPool,
93 #[cfg(not(target_arch = "wasm32"))]
94 pub ca_certificate: Option<Certificate>,
96 #[cfg(feature = "dump")]
97 pub(crate) dump_dir: Option<std::path::PathBuf>,
98}
99
100impl DapiClient {
101 pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
103 let address_count = 3 * address_list.len();
105
106 Self {
107 address_list,
108 settings,
109 pool: ConnectionPool::new(address_count),
110 #[cfg(feature = "dump")]
111 dump_dir: None,
112 #[cfg(not(target_arch = "wasm32"))]
113 ca_certificate: None,
114 }
115 }
116
117 #[cfg(not(target_arch = "wasm32"))]
126 pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
127 self.ca_certificate = Some(ca_cert);
128
129 self
130 }
131
132 pub fn address_list(&self) -> &AddressList {
134 &self.address_list
135 }
136
137 pub fn get_live_addresses(&self) -> Vec<crate::Address> {
156 self.address_list.get_live_addresses()
157 }
158}
159
160pub fn update_address_ban_status<R, E>(
163 address_list: &AddressList,
164 result: &ExecutionResult<R, E>,
165 applied_settings: &AppliedRequestSettings,
166) where
167 E: CanRetry + Display + Debug,
168{
169 match &result {
170 Ok(response) => {
171 if address_list.is_banned(&response.address) {
173 if address_list.unban(&response.address) {
174 tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
175 } else {
176 tracing::debug!(
179 address = ?response.address,
180 "unable to unban address {} because it's not in the list anymore",
181 response.address
182 );
183 }
184 }
185 }
186 Err(error) => {
187 if error.can_retry() {
188 if let Some(address) = error.address.as_ref() {
189 if applied_settings.ban_failed_address {
190 if address_list.ban(address) {
191 tracing::warn!(
192 ?address,
193 ?error,
194 "ban address {address} due to error: {error}"
195 );
196 } else {
197 tracing::debug!(
200 ?address,
201 ?error,
202 "unable to ban address {address} because it's not in the list anymore"
203 );
204 }
205 } else {
206 tracing::debug!(
207 ?error,
208 ?address,
209 "we should ban the address {address} due to the error but banning is disabled"
210 );
211 }
212 } else {
213 tracing::debug!(
214 ?error,
215 "we should ban an address due to the error but address is absent"
216 );
217 }
218 }
219 }
220 };
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 fn mock_address() -> crate::Address {
228 "http://127.0.0.1:3000".parse().expect("valid address")
229 }
230
231 fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
232 AppliedRequestSettings {
233 connect_timeout: None,
234 timeout: Duration::from_secs(10),
235 retries: 5,
236 ban_failed_address: ban,
237 max_decoding_message_size: None,
238 #[cfg(not(target_arch = "wasm32"))]
239 ca_certificate: None,
240 }
241 }
242
243 #[test]
244 fn test_can_retry_no_available_addresses() {
245 let err = DapiClientError::NoAvailableAddresses;
246 assert!(!err.can_retry());
247 }
248
249 #[test]
250 fn test_can_retry_no_available_addresses_to_retry() {
251 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
252 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
253 assert!(!err.can_retry());
254 }
255
256 #[test]
257 fn test_can_retry_transport_retryable() {
258 let transport_err =
259 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
260 let err = DapiClientError::Transport(transport_err);
261 assert!(err.can_retry());
262 }
263
264 #[test]
265 fn test_can_retry_transport_non_retryable() {
266 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
267 let err = DapiClientError::Transport(transport_err);
268 assert!(!err.can_retry());
269 }
270
271 #[test]
272 fn test_can_retry_address_list_error() {
273 let err =
274 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
275 assert!(!err.can_retry());
276 }
277
278 #[cfg(feature = "mocks")]
279 #[test]
280 fn test_can_retry_mock_error() {
281 let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
282 "test".to_string(),
283 ));
284 assert!(!err.can_retry());
285 }
286
287 #[test]
288 fn test_is_no_available_addresses() {
289 assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
290
291 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
292 assert!(
293 DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
294 .is_no_available_addresses()
295 );
296
297 let transport_err =
298 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
299 assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
300 }
301
302 #[test]
303 fn test_update_address_ban_status_success_unbans() {
304 let mut address_list = AddressList::new();
305 let addr = mock_address();
306 address_list.add(addr.clone());
307 address_list.ban(&addr);
308 assert!(address_list.is_banned(&addr));
309
310 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
311 inner: 42,
312 retries: 0,
313 address: addr.clone(),
314 });
315
316 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
317
318 assert!(!address_list.is_banned(&addr));
319 }
320
321 #[test]
322 fn test_update_address_ban_status_success_on_unbanned_is_noop() {
323 let mut address_list = AddressList::new();
324 let addr = mock_address();
325 address_list.add(addr.clone());
326
327 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
328 inner: 42,
329 retries: 0,
330 address: addr.clone(),
331 });
332
333 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
335 assert!(!address_list.is_banned(&addr));
336 }
337
338 #[test]
339 fn test_update_address_ban_status_retryable_error_bans_address() {
340 let mut address_list = AddressList::new();
341 let addr = mock_address();
342 address_list.add(addr.clone());
343
344 let transport_err =
345 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
346 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
347 inner: DapiClientError::Transport(transport_err),
348 retries: 0,
349 address: Some(addr.clone()),
350 });
351
352 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
353 assert!(address_list.is_banned(&addr));
354 }
355
356 #[test]
357 fn test_update_address_ban_status_retryable_error_ban_disabled() {
358 let mut address_list = AddressList::new();
359 let addr = mock_address();
360 address_list.add(addr.clone());
361
362 let transport_err =
363 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
364 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
365 inner: DapiClientError::Transport(transport_err),
366 retries: 0,
367 address: Some(addr.clone()),
368 });
369
370 update_address_ban_status(&address_list, &result, &make_applied_settings(false));
371 assert!(!address_list.is_banned(&addr));
373 }
374
375 #[test]
376 fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
377 let mut address_list = AddressList::new();
378 let addr = mock_address();
379 address_list.add(addr.clone());
380
381 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
382 inner: DapiClientError::NoAvailableAddresses,
383 retries: 0,
384 address: Some(addr.clone()),
385 });
386
387 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
388 assert!(!address_list.is_banned(&addr));
389 }
390
391 #[test]
392 fn test_update_address_ban_status_retryable_error_no_address() {
393 let address_list = AddressList::new();
394
395 let transport_err =
396 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
397 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
398 inner: DapiClientError::Transport(transport_err),
399 retries: 0,
400 address: None,
401 });
402
403 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
405 }
406
407 #[test]
408 fn test_update_address_ban_status_unban_removed_address() {
409 let mut address_list = AddressList::new();
410 let addr = mock_address();
411 address_list.add(addr.clone());
412 address_list.ban(&addr);
413
414 address_list.remove(&addr);
416
417 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
418 inner: 42,
419 retries: 0,
420 address: addr.clone(),
421 });
422
423 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
425 }
426
427 #[test]
428 fn test_update_address_ban_status_ban_removed_address() {
429 let address_list = AddressList::new();
430 let addr = mock_address();
431
432 let transport_err =
433 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
434 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
435 inner: DapiClientError::Transport(transport_err),
436 retries: 0,
437 address: Some(addr),
438 });
439
440 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
442 }
443
444 #[test]
445 fn test_dapi_client_new() {
446 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
447 .parse()
448 .unwrap();
449 let client = DapiClient::new(address_list, RequestSettings::default());
450 assert_eq!(client.address_list().len(), 2);
451 }
452
453 #[test]
454 fn test_dapi_client_get_live_addresses() {
455 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
456 .parse()
457 .unwrap();
458 let client = DapiClient::new(address_list, RequestSettings::default());
459 let live = client.get_live_addresses();
460 assert_eq!(live.len(), 2);
461 }
462
463 #[cfg(not(target_arch = "wasm32"))]
464 #[test]
465 fn test_dapi_client_with_ca_certificate() {
466 let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
467 let client = DapiClient::new(address_list, RequestSettings::default());
468 let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
469 let client = client.with_ca_certificate(cert);
470 assert!(client.ca_certificate.is_some());
471 }
472
473 #[cfg(feature = "mocks")]
474 #[test]
475 fn test_dapi_client_error_mock_serialize_deserialize() {
476 use dapi_grpc::mock::Mockable;
477
478 let err = DapiClientError::NoAvailableAddresses;
479 let serialized = err.mock_serialize().expect("should serialize");
480 let deserialized =
481 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
482 assert!(matches!(
483 deserialized,
484 DapiClientError::NoAvailableAddresses
485 ));
486 }
487
488 #[cfg(feature = "mocks")]
489 #[test]
490 fn test_dapi_client_error_transport_mock_roundtrip() {
491 use dapi_grpc::mock::Mockable;
492
493 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
494 let err = DapiClientError::Transport(transport_err);
495 let serialized = err.mock_serialize().expect("should serialize");
496 let deserialized =
497 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
498 assert!(matches!(deserialized, DapiClientError::Transport(_)));
499 }
500
501 #[test]
502 fn test_dapi_client_error_display() {
503 let err = DapiClientError::NoAvailableAddresses;
504 let display = format!("{}", err);
505 assert!(display.contains("no available addresses"));
506
507 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
508 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
509 let display = format!("{}", err);
510 assert!(display.contains("no available addresses to retry"));
511
512 let err =
513 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
514 let display = format!("{}", err);
515 assert!(display.contains("address list error"));
516 }
517}
518
519#[async_trait]
520impl DapiRequestExecutor for DapiClient {
521 async fn execute<R>(
523 &self,
524 request: R,
525 settings: RequestSettings,
526 ) -> ExecutionResult<R::Response, DapiClientError>
527 where
528 R: TransportRequest + Mockable,
529 R::Response: Mockable,
530 TransportError: Mockable,
531 {
532 let applied_settings = self
534 .settings
535 .override_by(R::SETTINGS_OVERRIDES)
536 .override_by(settings)
537 .finalize();
538 #[cfg(not(target_arch = "wasm32"))]
539 let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
540
541 #[cfg(feature = "dump")]
543 let dump_dir = self.dump_dir.clone();
544 #[cfg(feature = "dump")]
545 let dump_request = request.clone();
546
547 let max_retries = applied_settings.retries;
548 let retry_delay = Duration::from_millis(10);
549
550 let mut retries: usize = 0;
551 let mut last_transport_error: Option<TransportError> = None;
553
554 let result: ExecutionResult<R::Response, DapiClientError> = async {
555 loop {
556 let Some(address) = self.address_list.get_live_address() else {
558 let error = if let Some(transport_error) = last_transport_error.take() {
560 tracing::debug!(
561 "no addresses available, returning last transport error"
562 );
563 DapiClientError::NoAvailableAddressesToRetry(Box::new(
564 transport_error,
565 ))
566 } else {
567 DapiClientError::NoAvailableAddresses
568 };
569
570 return Err(ExecutionError {
571 inner: error,
572 retries,
573 address: None,
574 });
575 };
576
577 tracing::trace!(
578 ?request,
579 "calling {} with {} request",
580 request.method_name(),
581 request.request_name(),
582 );
583
584 let transport_request = request.clone();
585 let response_name = request.response_name();
586
587 let transport_client_result = R::Client::with_uri_and_settings(
589 address.uri().clone(),
590 &applied_settings,
591 &self.pool,
592 );
593
594 let mut transport_client = match transport_client_result {
595 Ok(client) => client,
596 Err(transport_error) => {
597 let can_retry_error = transport_error.can_retry();
598
599 let cloned_error = transport_error.clone();
601
602 let execution_error = ExecutionError {
603 inner: DapiClientError::Transport(transport_error),
604 retries,
605 address: Some(address.clone()),
606 };
607
608 update_address_ban_status::<R::Response, DapiClientError>(
609 &self.address_list,
610 &Err(execution_error.clone()),
611 &applied_settings,
612 );
613
614 if can_retry_error && retries < max_retries {
615 last_transport_error = Some(cloned_error);
617
618 retries += 1;
619 tracing::warn!(
620 error = ?execution_error,
621 "retrying error with sleeping {} secs",
622 retry_delay.as_secs_f32()
623 );
624 transport::sleep(retry_delay).await;
625 continue;
626 }
627
628 return Err(execution_error);
629 }
630 };
631
632 let result = transport_request
634 .execute_transport(&mut transport_client, &applied_settings)
635 .instrument(tracing::trace_span!(
636 "execute_request",
637 ?address,
638 settings = ?applied_settings,
639 method = request.method_name(),
640 ))
641 .await;
642
643 let execution_result = match result {
644 Ok(response) => {
645 tracing::trace!(response = ?response, "received {} response", response_name);
646 Ok(ExecutionResponse {
647 inner: response,
648 retries,
649 address: address.clone(),
650 })
651 }
652 Err(transport_error) => {
653 tracing::debug!(error = ?transport_error, "received error: {transport_error}");
654 Err(ExecutionError {
655 inner: DapiClientError::Transport(transport_error),
656 retries,
657 address: Some(address.clone()),
658 })
659 }
660 };
661
662 update_address_ban_status::<R::Response, DapiClientError>(
663 &self.address_list,
664 &execution_result,
665 &applied_settings,
666 );
667
668 match execution_result {
669 Ok(response) => return Ok(response),
670 Err(error) => {
671 if error.can_retry() && retries < max_retries {
672 if let DapiClientError::Transport(ref te) = error.inner {
674 last_transport_error = Some(te.clone());
675 }
676
677 retries += 1;
678 tracing::warn!(
679 ?error,
680 "retrying error with sleeping {} secs",
681 retry_delay.as_secs_f32()
682 );
683 transport::sleep(retry_delay).await;
684 continue;
685 }
686
687 return Err(error);
688 }
689 }
690 }
691 }
692 .instrument(tracing::info_span!("request routine"))
693 .await;
694
695 if let Err(error) = &result {
696 if !error.can_retry() {
697 tracing::error!(?error, "request failed");
698 }
699 }
700
701 #[cfg(feature = "dump")]
703 Self::dump_request_response(&dump_request, &result, dump_dir);
704
705 result
706 }
707}