rs_dapi_client/
dapi_client.rs

1//! [DapiClient] definition.
2
3use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16    transport::{TransportClient, TransportRequest},
17    AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18    RequestSettings,
19};
20
21/// General DAPI request error type.
22#[derive(Debug, thiserror::Error, Clone)]
23#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
24pub enum DapiClientError {
25    /// The error happened on transport layer
26    #[error("transport error: {0}")]
27    Transport(
28        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
29        TransportError,
30    ),
31    /// There are no valid DAPI addresses to use.
32    #[error("no available addresses to use")]
33    NoAvailableAddresses,
34    /// All available addresses have been exhausted (banned due to errors).
35    /// Contains the last meaningful error that caused addresses to be banned.
36    #[error("no available addresses to retry, last error: {0}")]
37    NoAvailableAddressesToRetry(
38        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39        Box<TransportError>,
40    ),
41    /// [AddressListError] errors
42    #[error("address list error: {0}")]
43    AddressList(AddressListError),
44
45    #[cfg(feature = "mocks")]
46    #[error("mock error: {0}")]
47    /// Error happened in mock client
48    Mock(#[from] crate::mock::MockError),
49}
50
51impl CanRetry for DapiClientError {
52    fn can_retry(&self) -> bool {
53        use DapiClientError::*;
54        match self {
55            NoAvailableAddresses => false,
56            NoAvailableAddressesToRetry(_) => false,
57            Transport(transport_error) => transport_error.can_retry(),
58            AddressList(_) => false,
59            #[cfg(feature = "mocks")]
60            Mock(_) => false,
61        }
62    }
63
64    fn is_no_available_addresses(&self) -> bool {
65        matches!(
66            self,
67            DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
68        )
69    }
70}
71
72/// Serialization of [DapiClientError].
73///
74/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
75impl Mockable for DapiClientError {
76    #[cfg(feature = "mocks")]
77    fn mock_serialize(&self) -> Option<Vec<u8>> {
78        Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
79    }
80
81    #[cfg(feature = "mocks")]
82    fn mock_deserialize(data: &[u8]) -> Option<Self> {
83        Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
84    }
85}
86
87/// Access point to DAPI.
88#[derive(Debug, Clone)]
89pub struct DapiClient {
90    address_list: AddressList,
91    settings: RequestSettings,
92    pool: ConnectionPool,
93    #[cfg(not(target_arch = "wasm32"))]
94    /// Certificate Authority certificate to use for verifying the server's certificate.
95    pub ca_certificate: Option<Certificate>,
96    #[cfg(feature = "dump")]
97    pub(crate) dump_dir: Option<std::path::PathBuf>,
98}
99
100impl DapiClient {
101    /// Initialize new [DapiClient] and optionally override default settings.
102    pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
103        // multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case
104        let address_count = 3 * address_list.len();
105
106        Self {
107            address_list,
108            settings,
109            pool: ConnectionPool::new(address_count),
110            #[cfg(feature = "dump")]
111            dump_dir: None,
112            #[cfg(not(target_arch = "wasm32"))]
113            ca_certificate: None,
114        }
115    }
116
117    /// Set CA certificate to use when verifying the server's certificate.
118    ///
119    /// # Arguments
120    ///
121    /// * `pem_ca_cert` - CA certificate in PEM format.
122    ///
123    /// # Returns
124    /// [DapiClient] with CA certificate set.
125    #[cfg(not(target_arch = "wasm32"))]
126    pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
127        self.ca_certificate = Some(ca_cert);
128
129        self
130    }
131
132    /// Return the [DapiClient] address list.
133    pub fn address_list(&self) -> &AddressList {
134        &self.address_list
135    }
136
137    /// Get all non-banned addresses from the address list.
138    ///
139    /// Returns a vector of addresses that are not currently banned or whose ban period has expired.
140    /// This is useful for diagnostics, monitoring, or when you need to know which DAPI nodes are
141    /// currently available for making requests.
142    ///
143    /// # Examples
144    ///
145    /// ```no_run
146    /// use rs_dapi_client::{DapiClient, AddressList, RequestSettings};
147    ///
148    /// let address_list = "http://127.0.0.1:3000,http://127.0.0.1:3001".parse().unwrap();
149    /// let client = DapiClient::new(address_list, RequestSettings::default());
150    ///
151    /// // Get all currently available (non-banned) addresses
152    /// let live_addresses = client.get_live_addresses();
153    /// println!("Available DAPI nodes: {}", live_addresses.len());
154    /// ```
155    pub fn get_live_addresses(&self) -> Vec<crate::Address> {
156        self.address_list.get_live_addresses()
157    }
158}
159
160/// Ban address in case of retryable error or unban it
161/// if it was banned, and the request was successful.
162pub fn update_address_ban_status<R, E>(
163    address_list: &AddressList,
164    result: &ExecutionResult<R, E>,
165    applied_settings: &AppliedRequestSettings,
166) where
167    E: CanRetry + Display + Debug,
168{
169    match &result {
170        Ok(response) => {
171            // Unban the address if it was banned and node responded successfully this time
172            if address_list.is_banned(&response.address) {
173                if address_list.unban(&response.address) {
174                    tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
175                } else {
176                    // The address might be already removed from the list
177                    // by background process (i.e., SML update), and it's fine.
178                    tracing::debug!(
179                        address = ?response.address,
180                        "unable to unban address {} because it's not in the list anymore",
181                        response.address
182                    );
183                }
184            }
185        }
186        Err(error) => {
187            if error.can_retry() {
188                if let Some(address) = error.address.as_ref() {
189                    if applied_settings.ban_failed_address {
190                        if address_list.ban(address) {
191                            tracing::warn!(
192                                ?address,
193                                ?error,
194                                "ban address {address} due to error: {error}"
195                            );
196                        } else {
197                            // The address might be already removed from the list
198                            // by background process (i.e., SML update), and it's fine.
199                            tracing::debug!(
200                                ?address,
201                                ?error,
202                                "unable to ban address {address} because it's not in the list anymore"
203                            );
204                        }
205                    } else {
206                        tracing::debug!(
207                            ?error,
208                            ?address,
209                            "we should ban the address {address} due to the error but banning is disabled"
210                        );
211                    }
212                } else {
213                    tracing::debug!(
214                        ?error,
215                        "we should ban an address due to the error but address is absent"
216                    );
217                }
218            }
219        }
220    };
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    fn mock_address() -> crate::Address {
228        "http://127.0.0.1:3000".parse().expect("valid address")
229    }
230
231    fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
232        AppliedRequestSettings {
233            connect_timeout: None,
234            timeout: Duration::from_secs(10),
235            retries: 5,
236            ban_failed_address: ban,
237            max_decoding_message_size: None,
238            #[cfg(not(target_arch = "wasm32"))]
239            ca_certificate: None,
240        }
241    }
242
243    #[test]
244    fn test_can_retry_no_available_addresses() {
245        let err = DapiClientError::NoAvailableAddresses;
246        assert!(!err.can_retry());
247    }
248
249    #[test]
250    fn test_can_retry_no_available_addresses_to_retry() {
251        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
252        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
253        assert!(!err.can_retry());
254    }
255
256    #[test]
257    fn test_can_retry_transport_retryable() {
258        let transport_err =
259            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
260        let err = DapiClientError::Transport(transport_err);
261        assert!(err.can_retry());
262    }
263
264    #[test]
265    fn test_can_retry_transport_non_retryable() {
266        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
267        let err = DapiClientError::Transport(transport_err);
268        assert!(!err.can_retry());
269    }
270
271    #[test]
272    fn test_can_retry_address_list_error() {
273        let err =
274            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
275        assert!(!err.can_retry());
276    }
277
278    #[cfg(feature = "mocks")]
279    #[test]
280    fn test_can_retry_mock_error() {
281        let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
282            "test".to_string(),
283        ));
284        assert!(!err.can_retry());
285    }
286
287    #[test]
288    fn test_is_no_available_addresses() {
289        assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
290
291        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
292        assert!(
293            DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
294                .is_no_available_addresses()
295        );
296
297        let transport_err =
298            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
299        assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
300    }
301
302    #[test]
303    fn test_update_address_ban_status_success_unbans() {
304        let mut address_list = AddressList::new();
305        let addr = mock_address();
306        address_list.add(addr.clone());
307        address_list.ban(&addr);
308        assert!(address_list.is_banned(&addr));
309
310        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
311            inner: 42,
312            retries: 0,
313            address: addr.clone(),
314        });
315
316        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
317
318        assert!(!address_list.is_banned(&addr));
319    }
320
321    #[test]
322    fn test_update_address_ban_status_success_on_unbanned_is_noop() {
323        let mut address_list = AddressList::new();
324        let addr = mock_address();
325        address_list.add(addr.clone());
326
327        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
328            inner: 42,
329            retries: 0,
330            address: addr.clone(),
331        });
332
333        // Should not panic or change anything
334        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
335        assert!(!address_list.is_banned(&addr));
336    }
337
338    #[test]
339    fn test_update_address_ban_status_retryable_error_bans_address() {
340        let mut address_list = AddressList::new();
341        let addr = mock_address();
342        address_list.add(addr.clone());
343
344        let transport_err =
345            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
346        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
347            inner: DapiClientError::Transport(transport_err),
348            retries: 0,
349            address: Some(addr.clone()),
350        });
351
352        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
353        assert!(address_list.is_banned(&addr));
354    }
355
356    #[test]
357    fn test_update_address_ban_status_retryable_error_ban_disabled() {
358        let mut address_list = AddressList::new();
359        let addr = mock_address();
360        address_list.add(addr.clone());
361
362        let transport_err =
363            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
364        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
365            inner: DapiClientError::Transport(transport_err),
366            retries: 0,
367            address: Some(addr.clone()),
368        });
369
370        update_address_ban_status(&address_list, &result, &make_applied_settings(false));
371        // With ban disabled, the address should NOT be banned
372        assert!(!address_list.is_banned(&addr));
373    }
374
375    #[test]
376    fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
377        let mut address_list = AddressList::new();
378        let addr = mock_address();
379        address_list.add(addr.clone());
380
381        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
382            inner: DapiClientError::NoAvailableAddresses,
383            retries: 0,
384            address: Some(addr.clone()),
385        });
386
387        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
388        assert!(!address_list.is_banned(&addr));
389    }
390
391    #[test]
392    fn test_update_address_ban_status_retryable_error_no_address() {
393        let address_list = AddressList::new();
394
395        let transport_err =
396            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
397        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
398            inner: DapiClientError::Transport(transport_err),
399            retries: 0,
400            address: None,
401        });
402
403        // Should not panic when address is None
404        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
405    }
406
407    #[test]
408    fn test_update_address_ban_status_unban_removed_address() {
409        let mut address_list = AddressList::new();
410        let addr = mock_address();
411        address_list.add(addr.clone());
412        address_list.ban(&addr);
413
414        // Remove the address
415        address_list.remove(&addr);
416
417        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
418            inner: 42,
419            retries: 0,
420            address: addr.clone(),
421        });
422
423        // Should not panic when trying to unban a removed address
424        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
425    }
426
427    #[test]
428    fn test_update_address_ban_status_ban_removed_address() {
429        let address_list = AddressList::new();
430        let addr = mock_address();
431
432        let transport_err =
433            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
434        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
435            inner: DapiClientError::Transport(transport_err),
436            retries: 0,
437            address: Some(addr),
438        });
439
440        // Should not panic when trying to ban an address not in the list
441        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
442    }
443
444    #[test]
445    fn test_dapi_client_new() {
446        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
447            .parse()
448            .unwrap();
449        let client = DapiClient::new(address_list, RequestSettings::default());
450        assert_eq!(client.address_list().len(), 2);
451    }
452
453    #[test]
454    fn test_dapi_client_get_live_addresses() {
455        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
456            .parse()
457            .unwrap();
458        let client = DapiClient::new(address_list, RequestSettings::default());
459        let live = client.get_live_addresses();
460        assert_eq!(live.len(), 2);
461    }
462
463    #[cfg(not(target_arch = "wasm32"))]
464    #[test]
465    fn test_dapi_client_with_ca_certificate() {
466        let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
467        let client = DapiClient::new(address_list, RequestSettings::default());
468        let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
469        let client = client.with_ca_certificate(cert);
470        assert!(client.ca_certificate.is_some());
471    }
472
473    #[cfg(feature = "mocks")]
474    #[test]
475    fn test_dapi_client_error_mock_serialize_deserialize() {
476        use dapi_grpc::mock::Mockable;
477
478        let err = DapiClientError::NoAvailableAddresses;
479        let serialized = err.mock_serialize().expect("should serialize");
480        let deserialized =
481            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
482        assert!(matches!(
483            deserialized,
484            DapiClientError::NoAvailableAddresses
485        ));
486    }
487
488    #[cfg(feature = "mocks")]
489    #[test]
490    fn test_dapi_client_error_transport_mock_roundtrip() {
491        use dapi_grpc::mock::Mockable;
492
493        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
494        let err = DapiClientError::Transport(transport_err);
495        let serialized = err.mock_serialize().expect("should serialize");
496        let deserialized =
497            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
498        assert!(matches!(deserialized, DapiClientError::Transport(_)));
499    }
500
501    #[test]
502    fn test_dapi_client_error_display() {
503        let err = DapiClientError::NoAvailableAddresses;
504        let display = format!("{}", err);
505        assert!(display.contains("no available addresses"));
506
507        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
508        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
509        let display = format!("{}", err);
510        assert!(display.contains("no available addresses to retry"));
511
512        let err =
513            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
514        let display = format!("{}", err);
515        assert!(display.contains("address list error"));
516    }
517}
518
519#[async_trait]
520impl DapiRequestExecutor for DapiClient {
521    /// Execute the [DapiRequest](crate::DapiRequest).
522    async fn execute<R>(
523        &self,
524        request: R,
525        settings: RequestSettings,
526    ) -> ExecutionResult<R::Response, DapiClientError>
527    where
528        R: TransportRequest + Mockable,
529        R::Response: Mockable,
530        TransportError: Mockable,
531    {
532        // Join settings of different sources to get final version of the settings for this execution:
533        let applied_settings = self
534            .settings
535            .override_by(R::SETTINGS_OVERRIDES)
536            .override_by(settings)
537            .finalize();
538        #[cfg(not(target_arch = "wasm32"))]
539        let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
540
541        // Save dump dir for later use
542        #[cfg(feature = "dump")]
543        let dump_dir = self.dump_dir.clone();
544        #[cfg(feature = "dump")]
545        let dump_request = request.clone();
546
547        let max_retries = applied_settings.retries;
548        let retry_delay = Duration::from_millis(10);
549
550        let mut retries: usize = 0;
551        // Track the last transport error for when all addresses get exhausted
552        let mut last_transport_error: Option<TransportError> = None;
553
554        let result: ExecutionResult<R::Response, DapiClientError> = async {
555            loop {
556                // Try to get an address to initialize transport on:
557                let Some(address) = self.address_list.get_live_address() else {
558                    // No available addresses - wrap with last meaningful error if we have one
559                    let error = if let Some(transport_error) = last_transport_error.take() {
560                        tracing::debug!(
561                            "no addresses available, returning last transport error"
562                        );
563                        DapiClientError::NoAvailableAddressesToRetry(Box::new(
564                            transport_error,
565                        ))
566                    } else {
567                        DapiClientError::NoAvailableAddresses
568                    };
569
570                    return Err(ExecutionError {
571                        inner: error,
572                        retries,
573                        address: None,
574                    });
575                };
576
577                tracing::trace!(
578                    ?request,
579                    "calling {} with {} request",
580                    request.method_name(),
581                    request.request_name(),
582                );
583
584                let transport_request = request.clone();
585                let response_name = request.response_name();
586
587                // Try to create transport client
588                let transport_client_result = R::Client::with_uri_and_settings(
589                    address.uri().clone(),
590                    &applied_settings,
591                    &self.pool,
592                );
593
594                let mut transport_client = match transport_client_result {
595                    Ok(client) => client,
596                    Err(transport_error) => {
597                        let can_retry_error = transport_error.can_retry();
598
599                        // Clone error before moving it
600                        let cloned_error = transport_error.clone();
601
602                        let execution_error = ExecutionError {
603                            inner: DapiClientError::Transport(transport_error),
604                            retries,
605                            address: Some(address.clone()),
606                        };
607
608                        update_address_ban_status::<R::Response, DapiClientError>(
609                            &self.address_list,
610                            &Err(execution_error.clone()),
611                            &applied_settings,
612                        );
613
614                        if can_retry_error && retries < max_retries {
615                            // Store last transport error
616                            last_transport_error = Some(cloned_error);
617
618                            retries += 1;
619                            tracing::warn!(
620                                error = ?execution_error,
621                                "retrying error with sleeping {} secs",
622                                retry_delay.as_secs_f32()
623                            );
624                            transport::sleep(retry_delay).await;
625                            continue;
626                        }
627
628                        return Err(execution_error);
629                    }
630                };
631
632                // Execute the transport request
633                let result = transport_request
634                    .execute_transport(&mut transport_client, &applied_settings)
635                    .instrument(tracing::trace_span!(
636                        "execute_request",
637                        ?address,
638                        settings = ?applied_settings,
639                        method = request.method_name(),
640                    ))
641                    .await;
642
643                let execution_result = match result {
644                    Ok(response) => {
645                        tracing::trace!(response = ?response, "received {} response", response_name);
646                        Ok(ExecutionResponse {
647                            inner: response,
648                            retries,
649                            address: address.clone(),
650                        })
651                    }
652                    Err(transport_error) => {
653                        tracing::debug!(error = ?transport_error, "received error: {transport_error}");
654                        Err(ExecutionError {
655                            inner: DapiClientError::Transport(transport_error),
656                            retries,
657                            address: Some(address.clone()),
658                        })
659                    }
660                };
661
662                update_address_ban_status::<R::Response, DapiClientError>(
663                    &self.address_list,
664                    &execution_result,
665                    &applied_settings,
666                );
667
668                match execution_result {
669                    Ok(response) => return Ok(response),
670                    Err(error) => {
671                        if error.can_retry() && retries < max_retries {
672                            // Store last transport error
673                            if let DapiClientError::Transport(ref te) = error.inner {
674                                last_transport_error = Some(te.clone());
675                            }
676
677                            retries += 1;
678                            tracing::warn!(
679                                ?error,
680                                "retrying error with sleeping {} secs",
681                                retry_delay.as_secs_f32()
682                            );
683                            transport::sleep(retry_delay).await;
684                            continue;
685                        }
686
687                        return Err(error);
688                    }
689                }
690            }
691        }
692        .instrument(tracing::info_span!("request routine"))
693        .await;
694
695        if let Err(error) = &result {
696            if !error.can_retry() {
697                tracing::error!(?error, "request failed");
698            }
699        }
700
701        // Dump request and response to disk if dump_dir is set:
702        #[cfg(feature = "dump")]
703        Self::dump_request_response(&dump_request, &result, dump_dir);
704
705        result
706    }
707}