1use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16 transport::{TransportClient, TransportRequest},
17 AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18 RequestSettings,
19};
20
21#[derive(Debug, thiserror::Error, Clone)]
23#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
24pub enum DapiClientError {
25 #[error("transport error: {0}")]
27 Transport(
28 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
29 TransportError,
30 ),
31 #[error("no available addresses to use")]
33 NoAvailableAddresses,
34 #[error("no available addresses to retry, last error: {0}")]
37 NoAvailableAddressesToRetry(
38 #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39 Box<TransportError>,
40 ),
41 #[error("address list error: {0}")]
43 AddressList(AddressListError),
44
45 #[cfg(feature = "mocks")]
46 #[error("mock error: {0}")]
47 Mock(#[from] crate::mock::MockError),
49}
50
51impl CanRetry for DapiClientError {
52 fn can_retry(&self) -> bool {
53 use DapiClientError::*;
54 match self {
55 NoAvailableAddresses => false,
56 NoAvailableAddressesToRetry(_) => false,
57 Transport(transport_error) => transport_error.can_retry(),
58 AddressList(_) => false,
59 #[cfg(feature = "mocks")]
60 Mock(_) => false,
61 }
62 }
63
64 fn is_no_available_addresses(&self) -> bool {
65 matches!(
66 self,
67 DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
68 )
69 }
70}
71
72impl Mockable for DapiClientError {
76 #[cfg(feature = "mocks")]
77 fn mock_serialize(&self) -> Option<Vec<u8>> {
78 Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
79 }
80
81 #[cfg(feature = "mocks")]
82 fn mock_deserialize(data: &[u8]) -> Option<Self> {
83 Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct DapiClient {
90 address_list: AddressList,
91 settings: RequestSettings,
92 pool: ConnectionPool,
93 #[cfg(not(target_arch = "wasm32"))]
94 pub ca_certificate: Option<Certificate>,
96 #[cfg(feature = "dump")]
97 pub(crate) dump_dir: Option<std::path::PathBuf>,
98}
99
100impl DapiClient {
101 pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
103 let address_count = 3 * address_list.len();
105
106 Self {
107 address_list,
108 settings,
109 pool: ConnectionPool::new(address_count),
110 #[cfg(feature = "dump")]
111 dump_dir: None,
112 #[cfg(not(target_arch = "wasm32"))]
113 ca_certificate: None,
114 }
115 }
116
117 #[cfg(not(target_arch = "wasm32"))]
126 pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
127 self.ca_certificate = Some(ca_cert);
128
129 self
130 }
131
132 pub fn address_list(&self) -> &AddressList {
134 &self.address_list
135 }
136
137 pub fn get_live_addresses(&self) -> Vec<crate::Address> {
156 self.address_list.get_live_addresses()
157 }
158}
159
160pub fn update_address_ban_status<R, E>(
163 address_list: &AddressList,
164 result: &ExecutionResult<R, E>,
165 applied_settings: &AppliedRequestSettings,
166) where
167 E: CanRetry + Display + Debug,
168{
169 match &result {
170 Ok(response) => {
171 if address_list.is_banned(&response.address) {
173 if address_list.unban(&response.address) {
174 tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
175 } else {
176 tracing::debug!(
179 address = ?response.address,
180 "unable to unban address {} because it's not in the list anymore",
181 response.address
182 );
183 }
184 }
185 }
186 Err(error) => {
187 if error.can_retry() {
188 if let Some(address) = error.address.as_ref() {
189 if applied_settings.ban_failed_address {
190 if address_list.ban(address) {
191 tracing::warn!(
192 ?address,
193 ?error,
194 "ban address {address} due to error: {error}"
195 );
196 } else {
197 tracing::debug!(
200 ?address,
201 ?error,
202 "unable to ban address {address} because it's not in the list anymore"
203 );
204 }
205 } else {
206 tracing::debug!(
207 ?error,
208 ?address,
209 "we should ban the address {address} due to the error but banning is disabled"
210 );
211 }
212 } else {
213 tracing::debug!(
214 ?error,
215 "we should ban an address due to the error but address is absent"
216 );
217 }
218 }
219 }
220 };
221}
222
223#[cfg(test)]
224#[allow(clippy::items_after_test_module)]
225mod tests {
226 use super::*;
227
228 fn mock_address() -> crate::Address {
229 "http://127.0.0.1:3000".parse().expect("valid address")
230 }
231
232 fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
233 AppliedRequestSettings {
234 connect_timeout: None,
235 timeout: Duration::from_secs(10),
236 retries: 5,
237 ban_failed_address: ban,
238 max_decoding_message_size: None,
239 #[cfg(not(target_arch = "wasm32"))]
240 ca_certificate: None,
241 }
242 }
243
244 #[test]
245 fn test_can_retry_no_available_addresses() {
246 let err = DapiClientError::NoAvailableAddresses;
247 assert!(!err.can_retry());
248 }
249
250 #[test]
251 fn test_can_retry_no_available_addresses_to_retry() {
252 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
253 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
254 assert!(!err.can_retry());
255 }
256
257 #[test]
258 fn test_can_retry_transport_retryable() {
259 let transport_err =
260 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
261 let err = DapiClientError::Transport(transport_err);
262 assert!(err.can_retry());
263 }
264
265 #[test]
266 fn test_can_retry_transport_non_retryable() {
267 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
268 let err = DapiClientError::Transport(transport_err);
269 assert!(!err.can_retry());
270 }
271
272 #[test]
273 fn test_can_retry_address_list_error() {
274 let err =
275 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
276 assert!(!err.can_retry());
277 }
278
279 #[cfg(feature = "mocks")]
280 #[test]
281 fn test_can_retry_mock_error() {
282 let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
283 "test".to_string(),
284 ));
285 assert!(!err.can_retry());
286 }
287
288 #[test]
289 fn test_is_no_available_addresses() {
290 assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
291
292 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
293 assert!(
294 DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
295 .is_no_available_addresses()
296 );
297
298 let transport_err =
299 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
300 assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
301 }
302
303 #[test]
304 fn test_update_address_ban_status_success_unbans() {
305 let mut address_list = AddressList::new();
306 let addr = mock_address();
307 address_list.add(addr.clone());
308 address_list.ban(&addr);
309 assert!(address_list.is_banned(&addr));
310
311 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
312 inner: 42,
313 retries: 0,
314 address: addr.clone(),
315 });
316
317 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
318
319 assert!(!address_list.is_banned(&addr));
320 }
321
322 #[test]
323 fn test_update_address_ban_status_success_on_unbanned_is_noop() {
324 let mut address_list = AddressList::new();
325 let addr = mock_address();
326 address_list.add(addr.clone());
327
328 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
329 inner: 42,
330 retries: 0,
331 address: addr.clone(),
332 });
333
334 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
336 assert!(!address_list.is_banned(&addr));
337 }
338
339 #[test]
340 fn test_update_address_ban_status_retryable_error_bans_address() {
341 let mut address_list = AddressList::new();
342 let addr = mock_address();
343 address_list.add(addr.clone());
344
345 let transport_err =
346 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
347 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
348 inner: DapiClientError::Transport(transport_err),
349 retries: 0,
350 address: Some(addr.clone()),
351 });
352
353 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
354 assert!(address_list.is_banned(&addr));
355 }
356
357 #[test]
358 fn test_update_address_ban_status_retryable_error_ban_disabled() {
359 let mut address_list = AddressList::new();
360 let addr = mock_address();
361 address_list.add(addr.clone());
362
363 let transport_err =
364 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
365 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
366 inner: DapiClientError::Transport(transport_err),
367 retries: 0,
368 address: Some(addr.clone()),
369 });
370
371 update_address_ban_status(&address_list, &result, &make_applied_settings(false));
372 assert!(!address_list.is_banned(&addr));
374 }
375
376 #[test]
377 fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
378 let mut address_list = AddressList::new();
379 let addr = mock_address();
380 address_list.add(addr.clone());
381
382 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
383 inner: DapiClientError::NoAvailableAddresses,
384 retries: 0,
385 address: Some(addr.clone()),
386 });
387
388 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
389 assert!(!address_list.is_banned(&addr));
390 }
391
392 #[test]
393 fn test_update_address_ban_status_retryable_error_no_address() {
394 let address_list = AddressList::new();
395
396 let transport_err =
397 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
398 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
399 inner: DapiClientError::Transport(transport_err),
400 retries: 0,
401 address: None,
402 });
403
404 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
406 }
407
408 #[test]
409 fn test_update_address_ban_status_unban_removed_address() {
410 let mut address_list = AddressList::new();
411 let addr = mock_address();
412 address_list.add(addr.clone());
413 address_list.ban(&addr);
414
415 address_list.remove(&addr);
417
418 let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
419 inner: 42,
420 retries: 0,
421 address: addr.clone(),
422 });
423
424 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
426 }
427
428 #[test]
429 fn test_update_address_ban_status_ban_removed_address() {
430 let address_list = AddressList::new();
431 let addr = mock_address();
432
433 let transport_err =
434 TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
435 let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
436 inner: DapiClientError::Transport(transport_err),
437 retries: 0,
438 address: Some(addr),
439 });
440
441 update_address_ban_status(&address_list, &result, &make_applied_settings(true));
443 }
444
445 #[test]
446 fn test_dapi_client_new() {
447 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
448 .parse()
449 .unwrap();
450 let client = DapiClient::new(address_list, RequestSettings::default());
451 assert_eq!(client.address_list().len(), 2);
452 }
453
454 #[test]
455 fn test_dapi_client_get_live_addresses() {
456 let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
457 .parse()
458 .unwrap();
459 let client = DapiClient::new(address_list, RequestSettings::default());
460 let live = client.get_live_addresses();
461 assert_eq!(live.len(), 2);
462 }
463
464 #[cfg(not(target_arch = "wasm32"))]
465 #[test]
466 fn test_dapi_client_with_ca_certificate() {
467 let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
468 let client = DapiClient::new(address_list, RequestSettings::default());
469 let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
470 let client = client.with_ca_certificate(cert);
471 assert!(client.ca_certificate.is_some());
472 }
473
474 #[cfg(feature = "mocks")]
475 #[test]
476 fn test_dapi_client_error_mock_serialize_deserialize() {
477 use dapi_grpc::mock::Mockable;
478
479 let err = DapiClientError::NoAvailableAddresses;
480 let serialized = err.mock_serialize().expect("should serialize");
481 let deserialized =
482 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
483 assert!(matches!(
484 deserialized,
485 DapiClientError::NoAvailableAddresses
486 ));
487 }
488
489 #[cfg(feature = "mocks")]
490 #[test]
491 fn test_dapi_client_error_transport_mock_roundtrip() {
492 use dapi_grpc::mock::Mockable;
493
494 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
495 let err = DapiClientError::Transport(transport_err);
496 let serialized = err.mock_serialize().expect("should serialize");
497 let deserialized =
498 DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
499 assert!(matches!(deserialized, DapiClientError::Transport(_)));
500 }
501
502 #[test]
503 fn test_dapi_client_error_display() {
504 let err = DapiClientError::NoAvailableAddresses;
505 let display = format!("{}", err);
506 assert!(display.contains("no available addresses"));
507
508 let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
509 let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
510 let display = format!("{}", err);
511 assert!(display.contains("no available addresses to retry"));
512
513 let err =
514 DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
515 let display = format!("{}", err);
516 assert!(display.contains("address list error"));
517 }
518}
519
520#[async_trait]
521impl DapiRequestExecutor for DapiClient {
522 async fn execute<R>(
524 &self,
525 request: R,
526 settings: RequestSettings,
527 ) -> ExecutionResult<R::Response, DapiClientError>
528 where
529 R: TransportRequest + Mockable,
530 R::Response: Mockable,
531 TransportError: Mockable,
532 {
533 let applied_settings = self
535 .settings
536 .override_by(R::SETTINGS_OVERRIDES)
537 .override_by(settings)
538 .finalize();
539 #[cfg(not(target_arch = "wasm32"))]
540 let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
541
542 #[cfg(feature = "dump")]
544 let dump_dir = self.dump_dir.clone();
545 #[cfg(feature = "dump")]
546 let dump_request = request.clone();
547
548 let max_retries = applied_settings.retries;
549 let retry_delay = Duration::from_millis(10);
550
551 let mut retries: usize = 0;
552 let mut last_transport_error: Option<TransportError> = None;
554
555 let result: ExecutionResult<R::Response, DapiClientError> = async {
556 loop {
557 let Some(address) = self.address_list.get_live_address() else {
559 let error = if let Some(transport_error) = last_transport_error.take() {
561 tracing::debug!(
562 "no addresses available, returning last transport error"
563 );
564 DapiClientError::NoAvailableAddressesToRetry(Box::new(
565 transport_error,
566 ))
567 } else {
568 DapiClientError::NoAvailableAddresses
569 };
570
571 return Err(ExecutionError {
572 inner: error,
573 retries,
574 address: None,
575 });
576 };
577
578 tracing::trace!(
581 target: "dapi_client::dispatch",
582 ?address,
583 method = request.method_name(),
584 request_type = request.request_name(),
585 "dispatching request to DAPI endpoint"
586 );
587 tracing::trace!(
588 ?request,
589 "calling {} with {} request",
590 request.method_name(),
591 request.request_name(),
592 );
593
594 let transport_request = request.clone();
595 let response_name = request.response_name();
596
597 let transport_client_result = R::Client::with_uri_and_settings(
599 address.uri().clone(),
600 &applied_settings,
601 &self.pool,
602 );
603
604 let mut transport_client = match transport_client_result {
605 Ok(client) => client,
606 Err(transport_error) => {
607 let can_retry_error = transport_error.can_retry();
608
609 let cloned_error = transport_error.clone();
611
612 let execution_error = ExecutionError {
613 inner: DapiClientError::Transport(transport_error),
614 retries,
615 address: Some(address.clone()),
616 };
617
618 update_address_ban_status::<R::Response, DapiClientError>(
619 &self.address_list,
620 &Err(execution_error.clone()),
621 &applied_settings,
622 );
623
624 if can_retry_error && retries < max_retries {
625 last_transport_error = Some(cloned_error);
627
628 retries += 1;
629 tracing::warn!(
630 error = ?execution_error,
631 "retrying error with sleeping {} secs",
632 retry_delay.as_secs_f32()
633 );
634 transport::sleep(retry_delay).await;
635 continue;
636 }
637
638 return Err(execution_error);
639 }
640 };
641
642 let result = transport_request
644 .execute_transport(&mut transport_client, &applied_settings)
645 .instrument(tracing::trace_span!(
646 "execute_request",
647 ?address,
648 settings = ?applied_settings,
649 method = request.method_name(),
650 ))
651 .await;
652
653 let execution_result = match result {
654 Ok(response) => {
655 tracing::trace!(response = ?response, "received {} response", response_name);
656 Ok(ExecutionResponse {
657 inner: response,
658 retries,
659 address: address.clone(),
660 })
661 }
662 Err(transport_error) => {
663 tracing::debug!(error = ?transport_error, "received error: {transport_error}");
664 Err(ExecutionError {
665 inner: DapiClientError::Transport(transport_error),
666 retries,
667 address: Some(address.clone()),
668 })
669 }
670 };
671
672 update_address_ban_status::<R::Response, DapiClientError>(
673 &self.address_list,
674 &execution_result,
675 &applied_settings,
676 );
677
678 match execution_result {
679 Ok(response) => return Ok(response),
680 Err(error) => {
681 if error.can_retry() && retries < max_retries {
682 if let DapiClientError::Transport(ref te) = error.inner {
684 last_transport_error = Some(te.clone());
685 }
686
687 retries += 1;
688 tracing::warn!(
689 ?error,
690 "retrying error with sleeping {} secs",
691 retry_delay.as_secs_f32()
692 );
693 transport::sleep(retry_delay).await;
694 continue;
695 }
696
697 return Err(error);
698 }
699 }
700 }
701 }
702 .instrument(tracing::info_span!("request routine"))
703 .await;
704
705 if let Err(error) = &result {
706 if !error.can_retry() {
707 tracing::error!(?error, "request failed");
708 }
709 }
710
711 #[cfg(feature = "dump")]
713 Self::dump_request_response(&dump_request, &result, dump_dir);
714
715 result
716 }
717}