Skip to main content

dash_sdk/
sync.rs

1pub use dash_async::{block_on, AsyncError};
2
3use crate::error::Error;
4use rs_dapi_client::{
5    transport::sleep, update_address_ban_status, AddressList, CanRetry, ExecutionResult,
6    RequestSettings,
7};
8use std::future::Future;
9use std::time::Duration;
10
11impl From<AsyncError> for crate::Error {
12    fn from(error: AsyncError) -> Self {
13        Self::ContextProviderError(error.into())
14    }
15}
16
17/// Retry the provided closure.
18///
19/// This function is used to retry async code. It takes into account number of retries already executed by lower
20/// layers and stops retrying once the maximum number of retries is reached.
21///
22/// The `settings` should contain maximum number of retries that should be executed. In case of failure, total number of
23/// requests sent is expected to be at least `settings.retries + 1` (initial request + `retries` configured in settings).
24/// The actual number of requests sent can be higher, as the lower layers can retry the request multiple times.
25///
26/// `future_factory_fn` should be a `FnMut()` closure that returns a future that should be retried.
27/// It takes [`RequestSettings`] as an argument and returns [`ExecutionResult`].
28/// Retry mechanism can change [`RequestSettings`] between invocations of the `future_factory_fn` closure
29/// to limit the number of retries for lower layers.
30///
31/// ## Parameters
32///
33/// - `address_list` - list of addresses to be used for the requests.
34/// - `settings` - global settings with any request-specific settings overrides applied.
35/// - `future_factory_fn` - closure that returns a future that should be retried. It should take [`RequestSettings`] as
36///   an argument and return [`ExecutionResult`].
37///
38/// ## Returns
39///
40/// Returns future that resolves to [`ExecutionResult`].
41///
42/// ## Example
43///
44/// ```rust
45/// # use dash_sdk::RequestSettings;
46/// # use dash_sdk::error::{Error,StaleNodeError};
47/// # use rs_dapi_client::{ExecutionResult, ExecutionError};
48/// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error> {
49/// // do something
50///     Err(ExecutionError {
51///         inner: Error::StaleNode(StaleNodeError::Height{
52///             expected_height: 10,
53///             received_height: 3,
54///             tolerance_blocks: 1,
55///         }),
56///        retries: 0,
57///       address: None,
58///    })
59/// }
60/// #[tokio::main]
61///     async fn main() {
62///     let address_list = rs_dapi_client::AddressList::default();
63///     let global_settings = RequestSettings::default();
64///     dash_sdk::sync::retry(&address_list, global_settings, retry_test_function).await.expect_err("should fail");
65/// }
66/// ```
67///
68/// ## Troubleshooting
69///
70/// Compiler error: `no method named retry found for closure`:
71/// - ensure returned value is [`ExecutionResult`].
72/// - consider adding `.await` at the end of the closure.
73pub async fn retry<Fut, FutureFactoryFn, R>(
74    address_list: &AddressList,
75    settings: RequestSettings,
76    mut future_factory_fn: FutureFactoryFn,
77) -> ExecutionResult<R, Error>
78where
79    Fut: Future<Output = ExecutionResult<R, Error>>,
80    FutureFactoryFn: FnMut(RequestSettings) -> Fut,
81    R: Send,
82{
83    let max_retries = settings.retries.unwrap_or_default();
84    let mut total_retries: usize = 0;
85    let mut current_settings = settings;
86
87    // Store the last meaningful error (not "no available addresses")
88    // so we can return it if we exhaust all addresses
89    let mut last_meaningful_error: Option<rs_dapi_client::ExecutionError<Error>> = None;
90
91    loop {
92        let result = future_factory_fn(current_settings).await;
93
94        // Ban or unban the address based on the result
95        update_address_ban_status(address_list, &result, &current_settings.finalize());
96
97        match result {
98            Ok(response) => return Ok(response),
99            Err(error) => {
100                // Check if this is a "no available addresses" error and we have a previous meaningful error
101                if error.is_no_available_addresses() {
102                    if let Some(prev_error) = last_meaningful_error.take() {
103                        tracing::error!(
104                            retry = total_retries,
105                            max_retries,
106                            error = ?prev_error,
107                            "no addresses available to retry"
108                        );
109                        // Wrap the last meaningful error in NoAvailableAddresses
110                        return Err(rs_dapi_client::ExecutionError {
111                            inner: Error::NoAvailableAddressesToRetry(Box::new(prev_error.inner)),
112                            retries: total_retries,
113                            address: prev_error.address,
114                        });
115                    }
116                    // No previous error, return the "no available addresses" error as-is
117                    return Err(error);
118                }
119
120                // Count requests sent in this attempt
121                let requests_sent = error.retries + 1;
122                total_retries += requests_sent;
123
124                if !error.can_retry() {
125                    // Non-retryable error, return immediately
126                    let mut final_error = error;
127                    final_error.retries = total_retries;
128                    return Err(final_error);
129                }
130
131                if total_retries > max_retries {
132                    // Exceeded max retries
133                    tracing::error!(
134                        retry = total_retries,
135                        max_retries,
136                        error = ?error,
137                        "no more retries left, giving up"
138                    );
139                    let mut final_error = error;
140                    final_error.retries = total_retries;
141                    return Err(final_error);
142                }
143
144                // Log retry decision (matches original `when()` callback)
145                tracing::warn!(
146                    retry = total_retries,
147                    max_retries,
148                    error = ?error,
149                    "retrying request"
150                );
151
152                // Update settings for next retry - limit retries for lower layer
153                current_settings.retries = Some(max_retries.saturating_sub(total_retries));
154
155                // Small delay to avoid spamming (we use different server, so no real delay needed)
156                // Log before sleep (matches original `notify()` callback)
157                let delay = Duration::from_millis(10);
158                tracing::warn!(duration = ?delay, error = ?error, "request failed, retrying");
159
160                // Store this as the last meaningful error before retrying
161                last_meaningful_error = Some(error);
162
163                sleep(delay).await;
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use super::*;
172    use rs_dapi_client::ExecutionError;
173    use std::sync::{
174        atomic::{AtomicUsize, Ordering},
175        Arc,
176    };
177
178    use crate::error::StaleNodeError;
179    use rs_dapi_client::DapiClientError;
180
181    async fn retry_test_function(
182        settings: RequestSettings,
183        counter: Arc<AtomicUsize>,
184    ) -> ExecutionResult<(), Error> {
185        // num or retries increases with each call
186        let retries = counter.load(Ordering::Relaxed);
187        let retries = if settings.retries.unwrap_or_default() < retries {
188            settings.retries.unwrap_or_default()
189        } else {
190            retries
191        };
192
193        // we sent 1 initial request plus `retries` retries
194        counter.fetch_add(1 + retries, Ordering::Relaxed);
195
196        Err(ExecutionError {
197            inner: Error::StaleNode(StaleNodeError::Height {
198                expected_height: 100,
199                received_height: 50,
200                tolerance_blocks: 1,
201            }),
202            retries,
203            address: Some("http://localhost".parse().expect("valid address")),
204        })
205    }
206
207    #[test_case::test_matrix([1,2,3,5,7,8,10,11,23,49, usize::MAX])]
208    #[tokio::test]
209    async fn test_retry(expected_requests: usize) {
210        for _ in 0..1 {
211            let counter = Arc::new(AtomicUsize::new(0));
212
213            let address_list = AddressList::default();
214
215            // we retry 5 times, and expect 5 retries + 1 initial request
216            let mut global_settings = RequestSettings::default();
217            global_settings.retries = Some(expected_requests - 1);
218
219            let closure = |s| {
220                let counter = counter.clone();
221                retry_test_function(s, counter)
222            };
223
224            retry(&address_list, global_settings, closure)
225                .await
226                .expect_err("should fail");
227
228            assert_eq!(
229                counter.load(Ordering::Relaxed),
230                expected_requests,
231                "test failed for expected {} requests",
232                expected_requests
233            );
234        }
235    }
236
237    /// Test that when we get "no available addresses" error, we return the last meaningful error
238    /// wrapped in NoAvailableAddresses.
239    #[tokio::test]
240    async fn test_retry_returns_last_meaningful_error_on_no_addresses() {
241        let call_count = Arc::new(AtomicUsize::new(0));
242        let address_list = AddressList::default();
243
244        let mut settings = RequestSettings::default();
245        settings.retries = Some(5);
246
247        let call_count_clone = call_count.clone();
248        let closure = move |_settings: RequestSettings| {
249            let count = call_count_clone.fetch_add(1, Ordering::Relaxed);
250            async move {
251                if count == 0 {
252                    Err(ExecutionError {
253                        inner: Error::StaleNode(StaleNodeError::Height {
254                            expected_height: 100,
255                            received_height: 50,
256                            tolerance_blocks: 1,
257                        }),
258                        retries: 0,
259                        address: Some("http://localhost:1".parse().unwrap()),
260                    })
261                } else {
262                    Err(ExecutionError {
263                        inner: Error::DapiClientError(DapiClientError::NoAvailableAddresses),
264                        retries: 0,
265                        address: None,
266                    })
267                }
268            }
269        };
270
271        let result: ExecutionResult<(), Error> = retry(&address_list, settings, closure).await;
272
273        let error = result.expect_err("should fail");
274        match &error.inner {
275            Error::NoAvailableAddressesToRetry(inner) => {
276                assert!(
277                    matches!(**inner, Error::StaleNode(_)),
278                    "inner error should be StaleNode, got: {:?}",
279                    inner
280                );
281            }
282            _ => panic!(
283                "expected NoAvailableAddresses error, got: {:?}",
284                error.inner
285            ),
286        }
287        assert_eq!(
288            call_count.load(Ordering::Relaxed),
289            2,
290            "should have called twice"
291        );
292    }
293
294    /// Test that if we get "no available addresses" on the first call (no previous error),
295    /// we still return it as-is (not wrapped).
296    #[tokio::test]
297    async fn test_retry_returns_no_addresses_if_no_previous_error() {
298        let address_list = AddressList::default();
299
300        let mut settings = RequestSettings::default();
301        settings.retries = Some(5);
302
303        let closure = move |_settings: RequestSettings| async move {
304            Err(ExecutionError {
305                inner: Error::DapiClientError(DapiClientError::NoAvailableAddresses),
306                retries: 0,
307                address: None,
308            })
309        };
310
311        let result: ExecutionResult<(), Error> = retry(&address_list, settings, closure).await;
312
313        let error = result.expect_err("should fail");
314        assert!(
315            matches!(
316                error.inner,
317                Error::DapiClientError(DapiClientError::NoAvailableAddresses)
318            ),
319            "should return 'no available addresses' when there's no previous meaningful error, got: {:?}",
320            error.inner
321        );
322    }
323}