Skip to main content

rs_dapi_client/
dapi_client.rs

1//! [DapiClient] definition.
2
3use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16    transport::{TransportClient, TransportRequest},
17    AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18    RequestSettings,
19};
20
21/// Intended minimum for the Envoy-advertised `RateLimit-Reset` ban duration.
22/// Note: the `> 0` filter applied before the clamp already rejects 0 → `None`,
23/// so this constant never actively clamps the lower bound — it documents intent
24/// (the smallest meaningful reset is 1 s) and acts as the `.clamp(MIN, MAX)`
25/// lower argument for clarity.
26pub(crate) const MIN_RATE_LIMIT_BAN_SECS: u64 = 1;
27/// Ceiling for the Envoy-advertised `RateLimit-Reset` ban duration.
28/// Prevents a misconfigured or hostile header from parking a healthy node for
29/// an unreasonably long time.
30pub(crate) const MAX_RATE_LIMIT_BAN_SECS: u64 = 600;
31
32/// General DAPI request error type.
33#[derive(Debug, thiserror::Error, Clone)]
34#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
35pub enum DapiClientError {
36    /// The error happened on transport layer
37    #[error("transport error: {0}")]
38    Transport(
39        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
40        TransportError,
41    ),
42    /// There are no valid DAPI addresses to use.
43    #[error("no available addresses to use")]
44    NoAvailableAddresses,
45    /// All available addresses have been exhausted (banned due to errors).
46    /// Contains the last meaningful error that caused addresses to be banned.
47    #[error("no available addresses to retry, last error: {0}")]
48    NoAvailableAddressesToRetry(
49        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
50        Box<TransportError>,
51    ),
52    /// [AddressListError] errors
53    #[error("address list error: {0}")]
54    AddressList(AddressListError),
55
56    #[cfg(feature = "mocks")]
57    #[error("mock error: {0}")]
58    /// Error happened in mock client
59    Mock(#[from] crate::mock::MockError),
60}
61
62impl CanRetry for DapiClientError {
63    fn can_retry(&self) -> bool {
64        use DapiClientError::*;
65        match self {
66            NoAvailableAddresses => false,
67            NoAvailableAddressesToRetry(_) => false,
68            Transport(transport_error) => transport_error.can_retry(),
69            AddressList(_) => false,
70            #[cfg(feature = "mocks")]
71            Mock(_) => false,
72        }
73    }
74
75    fn is_no_available_addresses(&self) -> bool {
76        matches!(
77            self,
78            DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
79        )
80    }
81
82    fn rate_limit_ban_duration(&self) -> Option<Duration> {
83        match self {
84            DapiClientError::Transport(te) => te.rate_limit_ban_duration(),
85            _ => None,
86        }
87    }
88}
89
90/// Serialization of [DapiClientError].
91///
92/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
93impl Mockable for DapiClientError {
94    #[cfg(feature = "mocks")]
95    fn mock_serialize(&self) -> Option<Vec<u8>> {
96        Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
97    }
98
99    #[cfg(feature = "mocks")]
100    fn mock_deserialize(data: &[u8]) -> Option<Self> {
101        Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
102    }
103}
104
105/// Access point to DAPI.
106#[derive(Debug, Clone)]
107pub struct DapiClient {
108    address_list: AddressList,
109    settings: RequestSettings,
110    pool: ConnectionPool,
111    #[cfg(not(target_arch = "wasm32"))]
112    /// Certificate Authority certificate to use for verifying the server's certificate.
113    pub ca_certificate: Option<Certificate>,
114    #[cfg(feature = "dump")]
115    pub(crate) dump_dir: Option<std::path::PathBuf>,
116}
117
118impl DapiClient {
119    /// Initialize new [DapiClient] and optionally override default settings.
120    pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
121        // multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case
122        let address_count = 3 * address_list.len();
123
124        Self {
125            address_list,
126            settings,
127            pool: ConnectionPool::new(address_count),
128            #[cfg(feature = "dump")]
129            dump_dir: None,
130            #[cfg(not(target_arch = "wasm32"))]
131            ca_certificate: None,
132        }
133    }
134
135    /// Set CA certificate to use when verifying the server's certificate.
136    ///
137    /// # Arguments
138    ///
139    /// * `pem_ca_cert` - CA certificate in PEM format.
140    ///
141    /// # Returns
142    /// [DapiClient] with CA certificate set.
143    #[cfg(not(target_arch = "wasm32"))]
144    pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
145        self.ca_certificate = Some(ca_cert);
146
147        self
148    }
149
150    /// Return the [DapiClient] address list.
151    pub fn address_list(&self) -> &AddressList {
152        &self.address_list
153    }
154
155    /// Get all non-banned addresses from the address list.
156    ///
157    /// Returns a vector of addresses that are not currently banned or whose ban period has expired.
158    /// This is useful for diagnostics, monitoring, or when you need to know which DAPI nodes are
159    /// currently available for making requests.
160    ///
161    /// # Examples
162    ///
163    /// ```no_run
164    /// use rs_dapi_client::{DapiClient, AddressList, RequestSettings};
165    ///
166    /// let address_list = "http://127.0.0.1:3000,http://127.0.0.1:3001".parse().unwrap();
167    /// let client = DapiClient::new(address_list, RequestSettings::default());
168    ///
169    /// // Get all currently available (non-banned) addresses
170    /// let live_addresses = client.get_live_addresses();
171    /// println!("Available DAPI nodes: {}", live_addresses.len());
172    /// ```
173    pub fn get_live_addresses(&self) -> Vec<crate::Address> {
174        self.address_list.get_live_addresses()
175    }
176}
177
178/// Ban address in case of retryable error or unban it
179/// if it was banned, and the request was successful.
180pub fn update_address_ban_status<R, E>(
181    address_list: &AddressList,
182    result: &ExecutionResult<R, E>,
183    applied_settings: &AppliedRequestSettings,
184) where
185    E: CanRetry + Display + Debug,
186{
187    match &result {
188        Ok(response) => {
189            // Unban the address if it was banned and node responded successfully this time
190            if address_list.is_banned(&response.address) {
191                if address_list.unban(&response.address) {
192                    tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
193                } else {
194                    // The address might be already removed from the list
195                    // by background process (i.e., SML update), and it's fine.
196                    tracing::debug!(
197                        address = ?response.address,
198                        "unable to unban address {} because it's not in the list anymore",
199                        response.address
200                    );
201                }
202            }
203        }
204        Err(error) => {
205            if error.can_retry() {
206                if let Some(address) = error.address.as_ref() {
207                    if applied_settings.ban_failed_address {
208                        let reason = Some(error.to_string());
209                        let period_opt = error.rate_limit_ban_duration();
210                        let banned = match period_opt {
211                            // Envoy advertised a reset window: ban for exactly that period.
212                            // ban_count is set to max(ban_count,1) so diagnostics see the node
213                            // as banned, but the exponential ladder is not inflated.
214                            Some(period) => address_list.ban_for(address, period, reason),
215                            // No rate-limit hint: normal exponential health-ban ladder.
216                            None => address_list.ban_with_reason(address, reason),
217                        };
218                        if banned {
219                            if let Some(period) = period_opt {
220                                tracing::debug!(
221                                    ?address,
222                                    ban_secs = period.as_secs(),
223                                    "rate-limited (ResourceExhausted): banning {address} for {}s (from RateLimit-Reset header)",
224                                    period.as_secs()
225                                );
226                            }
227                            tracing::warn!(
228                                ?address,
229                                ?error,
230                                "ban address {address} due to error: {error}"
231                            );
232                        } else {
233                            // The address might be already removed from the list
234                            // by background process (i.e., SML update), and it's fine.
235                            tracing::debug!(
236                                ?address,
237                                ?error,
238                                "unable to ban address {address} because it's not in the list anymore"
239                            );
240                        }
241                    } else {
242                        tracing::debug!(
243                            ?error,
244                            ?address,
245                            "we should ban the address {address} due to the error but banning is disabled"
246                        );
247                    }
248                } else {
249                    tracing::debug!(
250                        ?error,
251                        "we should ban an address due to the error but address is absent"
252                    );
253                }
254            }
255        }
256    };
257}
258
259#[cfg(test)]
260#[allow(clippy::items_after_test_module)]
261mod tests {
262    use super::*;
263
264    fn mock_address() -> crate::Address {
265        "http://127.0.0.1:3000".parse().expect("valid address")
266    }
267
268    fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
269        AppliedRequestSettings {
270            connect_timeout: None,
271            timeout: Duration::from_secs(10),
272            retries: 5,
273            ban_failed_address: ban,
274            max_decoding_message_size: None,
275            #[cfg(not(target_arch = "wasm32"))]
276            ca_certificate: None,
277        }
278    }
279
280    #[test]
281    fn test_can_retry_no_available_addresses() {
282        let err = DapiClientError::NoAvailableAddresses;
283        assert!(!err.can_retry());
284    }
285
286    #[test]
287    fn test_can_retry_no_available_addresses_to_retry() {
288        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
289        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
290        assert!(!err.can_retry());
291    }
292
293    #[test]
294    fn test_can_retry_transport_retryable() {
295        let transport_err =
296            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
297        let err = DapiClientError::Transport(transport_err);
298        assert!(err.can_retry());
299    }
300
301    #[test]
302    fn test_can_retry_transport_non_retryable() {
303        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
304        let err = DapiClientError::Transport(transport_err);
305        assert!(!err.can_retry());
306    }
307
308    #[test]
309    fn test_can_retry_address_list_error() {
310        let err =
311            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
312        assert!(!err.can_retry());
313    }
314
315    /// `rate_limit_ban_duration` returns `Some` only when the `ratelimit-reset`
316    /// header is present and positive on a `ResourceExhausted` response, and
317    /// the value is clamped to `[MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS]`.
318    #[test]
319    fn test_rate_limit_ban_duration_header_parse() {
320        use dapi_grpc::tonic::metadata::MetadataValue;
321
322        // Helper: build a ResourceExhausted status with a ratelimit-reset header.
323        let make_rl_status = |header: Option<&str>| -> dapi_grpc::tonic::Status {
324            let mut status = dapi_grpc::tonic::Status::resource_exhausted("429");
325            if let Some(v) = header {
326                status
327                    .metadata_mut()
328                    .insert("ratelimit-reset", MetadataValue::try_from(v).unwrap());
329            }
330            status
331        };
332
333        // Normal header value: returned clamped.
334        let s = make_rl_status(Some("45"));
335        let dur = TransportError::Grpc(s).rate_limit_ban_duration();
336        assert_eq!(dur, Some(Duration::from_secs(45)));
337
338        // Value above MAX → clamped to MAX.
339        let s = make_rl_status(Some("9999"));
340        let dur = TransportError::Grpc(s).rate_limit_ban_duration();
341        assert_eq!(dur, Some(Duration::from_secs(MAX_RATE_LIMIT_BAN_SECS)));
342
343        // Clamp edge: exactly MIN (1) → 1 s (passes through unchanged).
344        let s = make_rl_status(Some("1"));
345        assert_eq!(
346            TransportError::Grpc(s).rate_limit_ban_duration(),
347            Some(Duration::from_secs(1))
348        );
349
350        // Clamp edge: exactly MAX (600) → 600 s (not clamped).
351        let s = make_rl_status(Some("600"));
352        assert_eq!(
353            TransportError::Grpc(s).rate_limit_ban_duration(),
354            Some(Duration::from_secs(600))
355        );
356
357        // One above MAX (601) → clamped to 600 s.
358        let s = make_rl_status(Some("601"));
359        assert_eq!(
360            TransportError::Grpc(s).rate_limit_ban_duration(),
361            Some(Duration::from_secs(600))
362        );
363
364        // Value below MIN (0) → filtered to None before clamp.
365        let s = make_rl_status(Some("0"));
366        assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none());
367
368        // Non-numeric → None.
369        let s = make_rl_status(Some("garbage"));
370        assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none());
371
372        // Header absent → None.
373        let s = make_rl_status(None);
374        assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none());
375
376        // Non-ResourceExhausted code → None regardless of header.
377        let mut unavail = dapi_grpc::tonic::Status::unavailable("down");
378        unavail
379            .metadata_mut()
380            .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap());
381        assert!(TransportError::Grpc(unavail)
382            .rate_limit_ban_duration()
383            .is_none());
384    }
385
386    /// When `ResourceExhausted` carries a valid `ratelimit-reset` header,
387    /// `update_address_ban_status` calls `ban_for` (exact period, no ladder
388    /// inflation); when the header is absent it falls through to `ban_with_reason`
389    /// (normal exponential ladder).
390    #[test]
391    fn test_update_address_ban_status_rate_limit_ban_path() {
392        use dapi_grpc::tonic::metadata::MetadataValue;
393
394        let mut address_list = AddressList::new();
395        let addr = mock_address();
396        address_list.add(addr.clone());
397
398        // Build a ResourceExhausted status with ratelimit-reset: 45.
399        let mut status = dapi_grpc::tonic::Status::resource_exhausted("429");
400        status
401            .metadata_mut()
402            .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap());
403
404        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
405            inner: DapiClientError::Transport(TransportError::Grpc(status)),
406            retries: 0,
407            address: Some(addr.clone()),
408        });
409        let before = chrono::Utc::now();
410        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
411        let after = chrono::Utc::now();
412
413        let info = address_list.ban_info();
414        let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap();
415
416        // Node is banned for ~45 s.
417        assert!(entry.banned, "rate-limited node must be banned");
418        assert_eq!(entry.ban_count, 1, "ban_count must be 1 after ban_for");
419        let until = entry.banned_until.expect("banned_until set");
420        let lo = (until - before).num_milliseconds() as f64 / 1000.0;
421        let hi = (until - after).num_milliseconds() as f64 / 1000.0;
422        assert!(
423            lo >= 44.9 && hi <= 45.1,
424            "ban window must be ~45 s, got lo={lo} hi={hi}"
425        );
426    }
427
428    /// When `ResourceExhausted` has NO `ratelimit-reset` header,
429    /// `update_address_ban_status` must fall back to the normal `ban_with_reason`
430    /// ladder (not produce a zero-second or panic ban).
431    #[test]
432    fn test_update_address_ban_status_rate_limit_no_header_uses_ladder() {
433        let mut address_list = AddressList::new();
434        let addr = mock_address();
435        address_list.add(addr.clone());
436
437        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
438            inner: DapiClientError::Transport(TransportError::Grpc(
439                dapi_grpc::tonic::Status::resource_exhausted("429"),
440            )),
441            retries: 0,
442            address: Some(addr.clone()),
443        });
444        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
445
446        // The ban ladder is invoked: first ban → ban_count = 1, window = 60 s.
447        let info = address_list.ban_info();
448        let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap();
449        assert!(
450            entry.banned,
451            "node must be banned on ResourceExhausted without header"
452        );
453        assert_eq!(
454            entry.ban_count, 1,
455            "first health-ladder ban → ban_count = 1"
456        );
457    }
458
459    #[cfg(feature = "mocks")]
460    #[test]
461    fn test_can_retry_mock_error() {
462        let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
463            "test".to_string(),
464        ));
465        assert!(!err.can_retry());
466    }
467
468    #[test]
469    fn test_is_no_available_addresses() {
470        assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
471
472        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
473        assert!(
474            DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
475                .is_no_available_addresses()
476        );
477
478        let transport_err =
479            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
480        assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
481    }
482
483    #[test]
484    fn test_update_address_ban_status_success_unbans() {
485        let mut address_list = AddressList::new();
486        let addr = mock_address();
487        address_list.add(addr.clone());
488        address_list.ban(&addr);
489        assert!(address_list.is_banned(&addr));
490
491        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
492            inner: 42,
493            retries: 0,
494            address: addr.clone(),
495        });
496
497        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
498
499        assert!(!address_list.is_banned(&addr));
500    }
501
502    #[test]
503    fn test_update_address_ban_status_success_on_unbanned_is_noop() {
504        let mut address_list = AddressList::new();
505        let addr = mock_address();
506        address_list.add(addr.clone());
507
508        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
509            inner: 42,
510            retries: 0,
511            address: addr.clone(),
512        });
513
514        // Should not panic or change anything
515        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
516        assert!(!address_list.is_banned(&addr));
517    }
518
519    #[test]
520    fn test_update_address_ban_status_retryable_error_bans_address() {
521        let mut address_list = AddressList::new();
522        let addr = mock_address();
523        address_list.add(addr.clone());
524
525        let transport_err =
526            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
527        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
528            inner: DapiClientError::Transport(transport_err),
529            retries: 0,
530            address: Some(addr.clone()),
531        });
532
533        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
534        assert!(address_list.is_banned(&addr));
535
536        // The ban reason must be propagated from the error via this call path,
537        // not just the ban itself.
538        let info = address_list.ban_info();
539        assert_eq!(info.len(), 1);
540        let reason = info[0].reason.as_deref().expect("ban reason recorded");
541        assert!(
542            reason.contains("temporary"),
543            "ban reason should carry the underlying error, got: {reason}"
544        );
545    }
546
547    #[test]
548    fn test_update_address_ban_status_retryable_error_ban_disabled() {
549        let mut address_list = AddressList::new();
550        let addr = mock_address();
551        address_list.add(addr.clone());
552
553        let transport_err =
554            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
555        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
556            inner: DapiClientError::Transport(transport_err),
557            retries: 0,
558            address: Some(addr.clone()),
559        });
560
561        update_address_ban_status(&address_list, &result, &make_applied_settings(false));
562        // With ban disabled, the address should NOT be banned
563        assert!(!address_list.is_banned(&addr));
564    }
565
566    #[test]
567    fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
568        let mut address_list = AddressList::new();
569        let addr = mock_address();
570        address_list.add(addr.clone());
571
572        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
573            inner: DapiClientError::NoAvailableAddresses,
574            retries: 0,
575            address: Some(addr.clone()),
576        });
577
578        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
579        assert!(!address_list.is_banned(&addr));
580    }
581
582    #[test]
583    fn test_update_address_ban_status_retryable_error_no_address() {
584        let address_list = AddressList::new();
585
586        let transport_err =
587            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
588        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
589            inner: DapiClientError::Transport(transport_err),
590            retries: 0,
591            address: None,
592        });
593
594        // Should not panic when address is None
595        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
596    }
597
598    #[test]
599    fn test_update_address_ban_status_unban_removed_address() {
600        let mut address_list = AddressList::new();
601        let addr = mock_address();
602        address_list.add(addr.clone());
603        address_list.ban(&addr);
604
605        // Remove the address
606        address_list.remove(&addr);
607
608        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
609            inner: 42,
610            retries: 0,
611            address: addr.clone(),
612        });
613
614        // Should not panic when trying to unban a removed address
615        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
616    }
617
618    #[test]
619    fn test_update_address_ban_status_ban_removed_address() {
620        let address_list = AddressList::new();
621        let addr = mock_address();
622
623        let transport_err =
624            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
625        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
626            inner: DapiClientError::Transport(transport_err),
627            retries: 0,
628            address: Some(addr),
629        });
630
631        // Should not panic when trying to ban an address not in the list
632        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
633    }
634
635    #[test]
636    fn test_dapi_client_new() {
637        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
638            .parse()
639            .unwrap();
640        let client = DapiClient::new(address_list, RequestSettings::default());
641        assert_eq!(client.address_list().len(), 2);
642    }
643
644    #[test]
645    fn test_dapi_client_get_live_addresses() {
646        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
647            .parse()
648            .unwrap();
649        let client = DapiClient::new(address_list, RequestSettings::default());
650        let live = client.get_live_addresses();
651        assert_eq!(live.len(), 2);
652    }
653
654    #[cfg(not(target_arch = "wasm32"))]
655    #[test]
656    fn test_dapi_client_with_ca_certificate() {
657        let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
658        let client = DapiClient::new(address_list, RequestSettings::default());
659        let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
660        let client = client.with_ca_certificate(cert);
661        assert!(client.ca_certificate.is_some());
662    }
663
664    #[cfg(feature = "mocks")]
665    #[test]
666    fn test_dapi_client_error_mock_serialize_deserialize() {
667        use dapi_grpc::mock::Mockable;
668
669        let err = DapiClientError::NoAvailableAddresses;
670        let serialized = err.mock_serialize().expect("should serialize");
671        let deserialized =
672            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
673        assert!(matches!(
674            deserialized,
675            DapiClientError::NoAvailableAddresses
676        ));
677    }
678
679    #[cfg(feature = "mocks")]
680    #[test]
681    fn test_dapi_client_error_transport_mock_roundtrip() {
682        use dapi_grpc::mock::Mockable;
683
684        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
685        let err = DapiClientError::Transport(transport_err);
686        let serialized = err.mock_serialize().expect("should serialize");
687        let deserialized =
688            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
689        assert!(matches!(deserialized, DapiClientError::Transport(_)));
690    }
691
692    #[test]
693    fn test_dapi_client_error_display() {
694        let err = DapiClientError::NoAvailableAddresses;
695        let display = format!("{}", err);
696        assert!(display.contains("no available addresses"));
697
698        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
699        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
700        let display = format!("{}", err);
701        assert!(display.contains("no available addresses to retry"));
702
703        let err =
704            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
705        let display = format!("{}", err);
706        assert!(display.contains("address list error"));
707    }
708}
709
710#[async_trait]
711impl DapiRequestExecutor for DapiClient {
712    /// Execute the [DapiRequest](crate::DapiRequest).
713    async fn execute<R>(
714        &self,
715        request: R,
716        settings: RequestSettings,
717    ) -> ExecutionResult<R::Response, DapiClientError>
718    where
719        R: TransportRequest + Mockable,
720        R::Response: Mockable,
721        TransportError: Mockable,
722    {
723        // Join settings of different sources to get final version of the settings for this execution:
724        let applied_settings = self
725            .settings
726            .override_by(R::SETTINGS_OVERRIDES)
727            .override_by(settings)
728            .finalize();
729        #[cfg(not(target_arch = "wasm32"))]
730        let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
731
732        // Save dump dir for later use
733        #[cfg(feature = "dump")]
734        let dump_dir = self.dump_dir.clone();
735        #[cfg(feature = "dump")]
736        let dump_request = request.clone();
737
738        let max_retries = applied_settings.retries;
739        let retry_delay = Duration::from_millis(10);
740
741        let mut retries: usize = 0;
742        // Track the last transport error for when all addresses get exhausted
743        let mut last_transport_error: Option<TransportError> = None;
744
745        let result: ExecutionResult<R::Response, DapiClientError> = async {
746            loop {
747                // Try to get an address to initialize transport on:
748                let Some(address) = self.address_list.get_live_address() else {
749                    // No available addresses - wrap with last meaningful error if we have one
750                    let error = if let Some(transport_error) = last_transport_error.take() {
751                        tracing::debug!(
752                            "no addresses available, returning last transport error"
753                        );
754                        DapiClientError::NoAvailableAddressesToRetry(Box::new(
755                            transport_error,
756                        ))
757                    } else {
758                        DapiClientError::NoAvailableAddresses
759                    };
760
761                    return Err(ExecutionError {
762                        inner: error,
763                        retries,
764                        address: None,
765                    });
766                };
767
768                // Rec 3 — explicit trace event so the resolved DAPI endpoint
769                // appears in flat plain-text log output (not just the span context).
770                tracing::trace!(
771                    target: "dapi_client::dispatch",
772                    ?address,
773                    method = request.method_name(),
774                    request_type = request.request_name(),
775                    "dispatching request to DAPI endpoint"
776                );
777                tracing::trace!(
778                    ?request,
779                    "calling {} with {} request",
780                    request.method_name(),
781                    request.request_name(),
782                );
783
784                let transport_request = request.clone();
785                let response_name = request.response_name();
786
787                // Try to create transport client
788                let transport_client_result = R::Client::with_uri_and_settings(
789                    address.uri().clone(),
790                    &applied_settings,
791                    &self.pool,
792                );
793
794                let mut transport_client = match transport_client_result {
795                    Ok(client) => client,
796                    Err(transport_error) => {
797                        let can_retry_error = transport_error.can_retry();
798
799                        // Clone error before moving it
800                        let cloned_error = transport_error.clone();
801
802                        let execution_error = ExecutionError {
803                            inner: DapiClientError::Transport(transport_error),
804                            retries,
805                            address: Some(address.clone()),
806                        };
807
808                        update_address_ban_status::<R::Response, DapiClientError>(
809                            &self.address_list,
810                            &Err(execution_error.clone()),
811                            &applied_settings,
812                        );
813
814                        if can_retry_error && retries < max_retries {
815                            // Store last transport error
816                            last_transport_error = Some(cloned_error);
817
818                            retries += 1;
819                            tracing::warn!(
820                                error = ?execution_error,
821                                "retrying error with sleeping {} secs",
822                                retry_delay.as_secs_f32()
823                            );
824                            transport::sleep(retry_delay).await;
825                            continue;
826                        }
827
828                        return Err(execution_error);
829                    }
830                };
831
832                // Execute the transport request
833                let result = transport_request
834                    .execute_transport(&mut transport_client, &applied_settings)
835                    .instrument(tracing::trace_span!(
836                        "execute_request",
837                        ?address,
838                        settings = ?applied_settings,
839                        method = request.method_name(),
840                    ))
841                    .await;
842
843                let execution_result = match result {
844                    Ok(response) => {
845                        tracing::trace!(response = ?response, "received {} response", response_name);
846                        Ok(ExecutionResponse {
847                            inner: response,
848                            retries,
849                            address: address.clone(),
850                        })
851                    }
852                    Err(transport_error) => {
853                        tracing::debug!(error = ?transport_error, "received error: {transport_error}");
854                        Err(ExecutionError {
855                            inner: DapiClientError::Transport(transport_error),
856                            retries,
857                            address: Some(address.clone()),
858                        })
859                    }
860                };
861
862                update_address_ban_status::<R::Response, DapiClientError>(
863                    &self.address_list,
864                    &execution_result,
865                    &applied_settings,
866                );
867
868                match execution_result {
869                    Ok(response) => return Ok(response),
870                    Err(error) => {
871                        if error.can_retry() && retries < max_retries {
872                            // Store last transport error
873                            if let DapiClientError::Transport(ref te) = error.inner {
874                                last_transport_error = Some(te.clone());
875                            }
876
877                            retries += 1;
878                            tracing::warn!(
879                                ?error,
880                                "retrying error with sleeping {} secs",
881                                retry_delay.as_secs_f32()
882                            );
883                            transport::sleep(retry_delay).await;
884                            continue;
885                        }
886
887                        return Err(error);
888                    }
889                }
890            }
891        }
892        .instrument(tracing::info_span!("request routine"))
893        .await;
894
895        if let Err(error) = &result {
896            if !error.can_retry() {
897                tracing::error!(?error, "request failed");
898            }
899        }
900
901        // Dump request and response to disk if dump_dir is set:
902        #[cfg(feature = "dump")]
903        Self::dump_request_response(&dump_request, &result, dump_dir);
904
905        result
906    }
907}