Skip to main content

rs_dapi_client/
dapi_client.rs

1//! [DapiClient] definition.
2
3use dapi_grpc::mock::Mockable;
4use dapi_grpc::tonic::async_trait;
5#[cfg(not(target_arch = "wasm32"))]
6use dapi_grpc::tonic::transport::Certificate;
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9use tracing::Instrument;
10
11use crate::address_list::AddressListError;
12use crate::connection_pool::ConnectionPool;
13use crate::request_settings::AppliedRequestSettings;
14use crate::transport::{self, TransportError};
15use crate::{
16    transport::{TransportClient, TransportRequest},
17    AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
18    RequestSettings,
19};
20
21/// General DAPI request error type.
22#[derive(Debug, thiserror::Error, Clone)]
23#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
24pub enum DapiClientError {
25    /// The error happened on transport layer
26    #[error("transport error: {0}")]
27    Transport(
28        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
29        TransportError,
30    ),
31    /// There are no valid DAPI addresses to use.
32    #[error("no available addresses to use")]
33    NoAvailableAddresses,
34    /// All available addresses have been exhausted (banned due to errors).
35    /// Contains the last meaningful error that caused addresses to be banned.
36    #[error("no available addresses to retry, last error: {0}")]
37    NoAvailableAddressesToRetry(
38        #[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39        Box<TransportError>,
40    ),
41    /// [AddressListError] errors
42    #[error("address list error: {0}")]
43    AddressList(AddressListError),
44
45    #[cfg(feature = "mocks")]
46    #[error("mock error: {0}")]
47    /// Error happened in mock client
48    Mock(#[from] crate::mock::MockError),
49}
50
51impl CanRetry for DapiClientError {
52    fn can_retry(&self) -> bool {
53        use DapiClientError::*;
54        match self {
55            NoAvailableAddresses => false,
56            NoAvailableAddressesToRetry(_) => false,
57            Transport(transport_error) => transport_error.can_retry(),
58            AddressList(_) => false,
59            #[cfg(feature = "mocks")]
60            Mock(_) => false,
61        }
62    }
63
64    fn is_no_available_addresses(&self) -> bool {
65        matches!(
66            self,
67            DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_)
68        )
69    }
70}
71
72/// Serialization of [DapiClientError].
73///
74/// We need to do manual serialization because of the generic type parameter which doesn't support serde derive.
75impl Mockable for DapiClientError {
76    #[cfg(feature = "mocks")]
77    fn mock_serialize(&self) -> Option<Vec<u8>> {
78        Some(serde_json::to_vec(self).expect("serialize DAPI client error"))
79    }
80
81    #[cfg(feature = "mocks")]
82    fn mock_deserialize(data: &[u8]) -> Option<Self> {
83        Some(serde_json::from_slice(data).expect("deserialize DAPI client error"))
84    }
85}
86
87/// Access point to DAPI.
88#[derive(Debug, Clone)]
89pub struct DapiClient {
90    address_list: AddressList,
91    settings: RequestSettings,
92    pool: ConnectionPool,
93    #[cfg(not(target_arch = "wasm32"))]
94    /// Certificate Authority certificate to use for verifying the server's certificate.
95    pub ca_certificate: Option<Certificate>,
96    #[cfg(feature = "dump")]
97    pub(crate) dump_dir: Option<std::path::PathBuf>,
98}
99
100impl DapiClient {
101    /// Initialize new [DapiClient] and optionally override default settings.
102    pub fn new(address_list: AddressList, settings: RequestSettings) -> Self {
103        // multiply by 3 as we need to store core and platform addresses, and we want some spare capacity just in case
104        let address_count = 3 * address_list.len();
105
106        Self {
107            address_list,
108            settings,
109            pool: ConnectionPool::new(address_count),
110            #[cfg(feature = "dump")]
111            dump_dir: None,
112            #[cfg(not(target_arch = "wasm32"))]
113            ca_certificate: None,
114        }
115    }
116
117    /// Set CA certificate to use when verifying the server's certificate.
118    ///
119    /// # Arguments
120    ///
121    /// * `pem_ca_cert` - CA certificate in PEM format.
122    ///
123    /// # Returns
124    /// [DapiClient] with CA certificate set.
125    #[cfg(not(target_arch = "wasm32"))]
126    pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
127        self.ca_certificate = Some(ca_cert);
128
129        self
130    }
131
132    /// Return the [DapiClient] address list.
133    pub fn address_list(&self) -> &AddressList {
134        &self.address_list
135    }
136
137    /// Get all non-banned addresses from the address list.
138    ///
139    /// Returns a vector of addresses that are not currently banned or whose ban period has expired.
140    /// This is useful for diagnostics, monitoring, or when you need to know which DAPI nodes are
141    /// currently available for making requests.
142    ///
143    /// # Examples
144    ///
145    /// ```no_run
146    /// use rs_dapi_client::{DapiClient, AddressList, RequestSettings};
147    ///
148    /// let address_list = "http://127.0.0.1:3000,http://127.0.0.1:3001".parse().unwrap();
149    /// let client = DapiClient::new(address_list, RequestSettings::default());
150    ///
151    /// // Get all currently available (non-banned) addresses
152    /// let live_addresses = client.get_live_addresses();
153    /// println!("Available DAPI nodes: {}", live_addresses.len());
154    /// ```
155    pub fn get_live_addresses(&self) -> Vec<crate::Address> {
156        self.address_list.get_live_addresses()
157    }
158}
159
160/// Ban address in case of retryable error or unban it
161/// if it was banned, and the request was successful.
162pub fn update_address_ban_status<R, E>(
163    address_list: &AddressList,
164    result: &ExecutionResult<R, E>,
165    applied_settings: &AppliedRequestSettings,
166) where
167    E: CanRetry + Display + Debug,
168{
169    match &result {
170        Ok(response) => {
171            // Unban the address if it was banned and node responded successfully this time
172            if address_list.is_banned(&response.address) {
173                if address_list.unban(&response.address) {
174                    tracing::debug!(address = ?response.address, "unban successfully responded address {}", response.address);
175                } else {
176                    // The address might be already removed from the list
177                    // by background process (i.e., SML update), and it's fine.
178                    tracing::debug!(
179                        address = ?response.address,
180                        "unable to unban address {} because it's not in the list anymore",
181                        response.address
182                    );
183                }
184            }
185        }
186        Err(error) => {
187            if error.can_retry() {
188                if let Some(address) = error.address.as_ref() {
189                    if applied_settings.ban_failed_address {
190                        if address_list.ban(address) {
191                            tracing::warn!(
192                                ?address,
193                                ?error,
194                                "ban address {address} due to error: {error}"
195                            );
196                        } else {
197                            // The address might be already removed from the list
198                            // by background process (i.e., SML update), and it's fine.
199                            tracing::debug!(
200                                ?address,
201                                ?error,
202                                "unable to ban address {address} because it's not in the list anymore"
203                            );
204                        }
205                    } else {
206                        tracing::debug!(
207                            ?error,
208                            ?address,
209                            "we should ban the address {address} due to the error but banning is disabled"
210                        );
211                    }
212                } else {
213                    tracing::debug!(
214                        ?error,
215                        "we should ban an address due to the error but address is absent"
216                    );
217                }
218            }
219        }
220    };
221}
222
223#[cfg(test)]
224#[allow(clippy::items_after_test_module)]
225mod tests {
226    use super::*;
227
228    fn mock_address() -> crate::Address {
229        "http://127.0.0.1:3000".parse().expect("valid address")
230    }
231
232    fn make_applied_settings(ban: bool) -> AppliedRequestSettings {
233        AppliedRequestSettings {
234            connect_timeout: None,
235            timeout: Duration::from_secs(10),
236            retries: 5,
237            ban_failed_address: ban,
238            max_decoding_message_size: None,
239            #[cfg(not(target_arch = "wasm32"))]
240            ca_certificate: None,
241        }
242    }
243
244    #[test]
245    fn test_can_retry_no_available_addresses() {
246        let err = DapiClientError::NoAvailableAddresses;
247        assert!(!err.can_retry());
248    }
249
250    #[test]
251    fn test_can_retry_no_available_addresses_to_retry() {
252        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
253        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
254        assert!(!err.can_retry());
255    }
256
257    #[test]
258    fn test_can_retry_transport_retryable() {
259        let transport_err =
260            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
261        let err = DapiClientError::Transport(transport_err);
262        assert!(err.can_retry());
263    }
264
265    #[test]
266    fn test_can_retry_transport_non_retryable() {
267        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::not_found("permanent"));
268        let err = DapiClientError::Transport(transport_err);
269        assert!(!err.can_retry());
270    }
271
272    #[test]
273    fn test_can_retry_address_list_error() {
274        let err =
275            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
276        assert!(!err.can_retry());
277    }
278
279    #[cfg(feature = "mocks")]
280    #[test]
281    fn test_can_retry_mock_error() {
282        let err = DapiClientError::Mock(crate::mock::MockError::MockExpectationNotFound(
283            "test".to_string(),
284        ));
285        assert!(!err.can_retry());
286    }
287
288    #[test]
289    fn test_is_no_available_addresses() {
290        assert!(DapiClientError::NoAvailableAddresses.is_no_available_addresses());
291
292        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
293        assert!(
294            DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err))
295                .is_no_available_addresses()
296        );
297
298        let transport_err =
299            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
300        assert!(!DapiClientError::Transport(transport_err).is_no_available_addresses());
301    }
302
303    #[test]
304    fn test_update_address_ban_status_success_unbans() {
305        let mut address_list = AddressList::new();
306        let addr = mock_address();
307        address_list.add(addr.clone());
308        address_list.ban(&addr);
309        assert!(address_list.is_banned(&addr));
310
311        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
312            inner: 42,
313            retries: 0,
314            address: addr.clone(),
315        });
316
317        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
318
319        assert!(!address_list.is_banned(&addr));
320    }
321
322    #[test]
323    fn test_update_address_ban_status_success_on_unbanned_is_noop() {
324        let mut address_list = AddressList::new();
325        let addr = mock_address();
326        address_list.add(addr.clone());
327
328        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
329            inner: 42,
330            retries: 0,
331            address: addr.clone(),
332        });
333
334        // Should not panic or change anything
335        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
336        assert!(!address_list.is_banned(&addr));
337    }
338
339    #[test]
340    fn test_update_address_ban_status_retryable_error_bans_address() {
341        let mut address_list = AddressList::new();
342        let addr = mock_address();
343        address_list.add(addr.clone());
344
345        let transport_err =
346            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
347        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
348            inner: DapiClientError::Transport(transport_err),
349            retries: 0,
350            address: Some(addr.clone()),
351        });
352
353        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
354        assert!(address_list.is_banned(&addr));
355    }
356
357    #[test]
358    fn test_update_address_ban_status_retryable_error_ban_disabled() {
359        let mut address_list = AddressList::new();
360        let addr = mock_address();
361        address_list.add(addr.clone());
362
363        let transport_err =
364            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
365        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
366            inner: DapiClientError::Transport(transport_err),
367            retries: 0,
368            address: Some(addr.clone()),
369        });
370
371        update_address_ban_status(&address_list, &result, &make_applied_settings(false));
372        // With ban disabled, the address should NOT be banned
373        assert!(!address_list.is_banned(&addr));
374    }
375
376    #[test]
377    fn test_update_address_ban_status_non_retryable_error_does_not_ban() {
378        let mut address_list = AddressList::new();
379        let addr = mock_address();
380        address_list.add(addr.clone());
381
382        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
383            inner: DapiClientError::NoAvailableAddresses,
384            retries: 0,
385            address: Some(addr.clone()),
386        });
387
388        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
389        assert!(!address_list.is_banned(&addr));
390    }
391
392    #[test]
393    fn test_update_address_ban_status_retryable_error_no_address() {
394        let address_list = AddressList::new();
395
396        let transport_err =
397            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
398        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
399            inner: DapiClientError::Transport(transport_err),
400            retries: 0,
401            address: None,
402        });
403
404        // Should not panic when address is None
405        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
406    }
407
408    #[test]
409    fn test_update_address_ban_status_unban_removed_address() {
410        let mut address_list = AddressList::new();
411        let addr = mock_address();
412        address_list.add(addr.clone());
413        address_list.ban(&addr);
414
415        // Remove the address
416        address_list.remove(&addr);
417
418        let result: ExecutionResult<i32, DapiClientError> = Ok(ExecutionResponse {
419            inner: 42,
420            retries: 0,
421            address: addr.clone(),
422        });
423
424        // Should not panic when trying to unban a removed address
425        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
426    }
427
428    #[test]
429    fn test_update_address_ban_status_ban_removed_address() {
430        let address_list = AddressList::new();
431        let addr = mock_address();
432
433        let transport_err =
434            TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("temporary"));
435        let result: ExecutionResult<i32, DapiClientError> = Err(ExecutionError {
436            inner: DapiClientError::Transport(transport_err),
437            retries: 0,
438            address: Some(addr),
439        });
440
441        // Should not panic when trying to ban an address not in the list
442        update_address_ban_status(&address_list, &result, &make_applied_settings(true));
443    }
444
445    #[test]
446    fn test_dapi_client_new() {
447        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
448            .parse()
449            .unwrap();
450        let client = DapiClient::new(address_list, RequestSettings::default());
451        assert_eq!(client.address_list().len(), 2);
452    }
453
454    #[test]
455    fn test_dapi_client_get_live_addresses() {
456        let address_list: AddressList = "http://127.0.0.1:3000,http://127.0.0.1:3001"
457            .parse()
458            .unwrap();
459        let client = DapiClient::new(address_list, RequestSettings::default());
460        let live = client.get_live_addresses();
461        assert_eq!(live.len(), 2);
462    }
463
464    #[cfg(not(target_arch = "wasm32"))]
465    #[test]
466    fn test_dapi_client_with_ca_certificate() {
467        let address_list: AddressList = "http://127.0.0.1:3000".parse().unwrap();
468        let client = DapiClient::new(address_list, RequestSettings::default());
469        let cert = dapi_grpc::tonic::transport::Certificate::from_pem("fake-pem-data");
470        let client = client.with_ca_certificate(cert);
471        assert!(client.ca_certificate.is_some());
472    }
473
474    #[cfg(feature = "mocks")]
475    #[test]
476    fn test_dapi_client_error_mock_serialize_deserialize() {
477        use dapi_grpc::mock::Mockable;
478
479        let err = DapiClientError::NoAvailableAddresses;
480        let serialized = err.mock_serialize().expect("should serialize");
481        let deserialized =
482            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
483        assert!(matches!(
484            deserialized,
485            DapiClientError::NoAvailableAddresses
486        ));
487    }
488
489    #[cfg(feature = "mocks")]
490    #[test]
491    fn test_dapi_client_error_transport_mock_roundtrip() {
492        use dapi_grpc::mock::Mockable;
493
494        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test"));
495        let err = DapiClientError::Transport(transport_err);
496        let serialized = err.mock_serialize().expect("should serialize");
497        let deserialized =
498            DapiClientError::mock_deserialize(&serialized).expect("should deserialize");
499        assert!(matches!(deserialized, DapiClientError::Transport(_)));
500    }
501
502    #[test]
503    fn test_dapi_client_error_display() {
504        let err = DapiClientError::NoAvailableAddresses;
505        let display = format!("{}", err);
506        assert!(display.contains("no available addresses"));
507
508        let transport_err = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("gone"));
509        let err = DapiClientError::NoAvailableAddressesToRetry(Box::new(transport_err));
510        let display = format!("{}", err);
511        assert!(display.contains("no available addresses to retry"));
512
513        let err =
514            DapiClientError::AddressList(AddressListError::InvalidAddressUri("bad".to_string()));
515        let display = format!("{}", err);
516        assert!(display.contains("address list error"));
517    }
518}
519
520#[async_trait]
521impl DapiRequestExecutor for DapiClient {
522    /// Execute the [DapiRequest](crate::DapiRequest).
523    async fn execute<R>(
524        &self,
525        request: R,
526        settings: RequestSettings,
527    ) -> ExecutionResult<R::Response, DapiClientError>
528    where
529        R: TransportRequest + Mockable,
530        R::Response: Mockable,
531        TransportError: Mockable,
532    {
533        // Join settings of different sources to get final version of the settings for this execution:
534        let applied_settings = self
535            .settings
536            .override_by(R::SETTINGS_OVERRIDES)
537            .override_by(settings)
538            .finalize();
539        #[cfg(not(target_arch = "wasm32"))]
540        let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
541
542        // Save dump dir for later use
543        #[cfg(feature = "dump")]
544        let dump_dir = self.dump_dir.clone();
545        #[cfg(feature = "dump")]
546        let dump_request = request.clone();
547
548        let max_retries = applied_settings.retries;
549        let retry_delay = Duration::from_millis(10);
550
551        let mut retries: usize = 0;
552        // Track the last transport error for when all addresses get exhausted
553        let mut last_transport_error: Option<TransportError> = None;
554
555        let result: ExecutionResult<R::Response, DapiClientError> = async {
556            loop {
557                // Try to get an address to initialize transport on:
558                let Some(address) = self.address_list.get_live_address() else {
559                    // No available addresses - wrap with last meaningful error if we have one
560                    let error = if let Some(transport_error) = last_transport_error.take() {
561                        tracing::debug!(
562                            "no addresses available, returning last transport error"
563                        );
564                        DapiClientError::NoAvailableAddressesToRetry(Box::new(
565                            transport_error,
566                        ))
567                    } else {
568                        DapiClientError::NoAvailableAddresses
569                    };
570
571                    return Err(ExecutionError {
572                        inner: error,
573                        retries,
574                        address: None,
575                    });
576                };
577
578                tracing::trace!(
579                    ?request,
580                    "calling {} with {} request",
581                    request.method_name(),
582                    request.request_name(),
583                );
584
585                let transport_request = request.clone();
586                let response_name = request.response_name();
587
588                // Try to create transport client
589                let transport_client_result = R::Client::with_uri_and_settings(
590                    address.uri().clone(),
591                    &applied_settings,
592                    &self.pool,
593                );
594
595                let mut transport_client = match transport_client_result {
596                    Ok(client) => client,
597                    Err(transport_error) => {
598                        let can_retry_error = transport_error.can_retry();
599
600                        // Clone error before moving it
601                        let cloned_error = transport_error.clone();
602
603                        let execution_error = ExecutionError {
604                            inner: DapiClientError::Transport(transport_error),
605                            retries,
606                            address: Some(address.clone()),
607                        };
608
609                        update_address_ban_status::<R::Response, DapiClientError>(
610                            &self.address_list,
611                            &Err(execution_error.clone()),
612                            &applied_settings,
613                        );
614
615                        if can_retry_error && retries < max_retries {
616                            // Store last transport error
617                            last_transport_error = Some(cloned_error);
618
619                            retries += 1;
620                            tracing::warn!(
621                                error = ?execution_error,
622                                "retrying error with sleeping {} secs",
623                                retry_delay.as_secs_f32()
624                            );
625                            transport::sleep(retry_delay).await;
626                            continue;
627                        }
628
629                        return Err(execution_error);
630                    }
631                };
632
633                // Execute the transport request
634                let result = transport_request
635                    .execute_transport(&mut transport_client, &applied_settings)
636                    .instrument(tracing::trace_span!(
637                        "execute_request",
638                        ?address,
639                        settings = ?applied_settings,
640                        method = request.method_name(),
641                    ))
642                    .await;
643
644                let execution_result = match result {
645                    Ok(response) => {
646                        tracing::trace!(response = ?response, "received {} response", response_name);
647                        Ok(ExecutionResponse {
648                            inner: response,
649                            retries,
650                            address: address.clone(),
651                        })
652                    }
653                    Err(transport_error) => {
654                        tracing::debug!(error = ?transport_error, "received error: {transport_error}");
655                        Err(ExecutionError {
656                            inner: DapiClientError::Transport(transport_error),
657                            retries,
658                            address: Some(address.clone()),
659                        })
660                    }
661                };
662
663                update_address_ban_status::<R::Response, DapiClientError>(
664                    &self.address_list,
665                    &execution_result,
666                    &applied_settings,
667                );
668
669                match execution_result {
670                    Ok(response) => return Ok(response),
671                    Err(error) => {
672                        if error.can_retry() && retries < max_retries {
673                            // Store last transport error
674                            if let DapiClientError::Transport(ref te) = error.inner {
675                                last_transport_error = Some(te.clone());
676                            }
677
678                            retries += 1;
679                            tracing::warn!(
680                                ?error,
681                                "retrying error with sleeping {} secs",
682                                retry_delay.as_secs_f32()
683                            );
684                            transport::sleep(retry_delay).await;
685                            continue;
686                        }
687
688                        return Err(error);
689                    }
690                }
691            }
692        }
693        .instrument(tracing::info_span!("request routine"))
694        .await;
695
696        if let Err(error) = &result {
697            if !error.can_retry() {
698                tracing::error!(?error, "request failed");
699            }
700        }
701
702        // Dump request and response to disk if dump_dir is set:
703        #[cfg(feature = "dump")]
704        Self::dump_request_response(&dump_request, &result, dump_dir);
705
706        result
707    }
708}