1pub use dash_async::{block_on, AsyncError};
2
3use crate::error::Error;
4use rs_dapi_client::{
5 transport::sleep, update_address_ban_status, AddressList, CanRetry, ExecutionResult,
6 RequestSettings,
7};
8use std::future::Future;
9use std::time::Duration;
10
11impl From<AsyncError> for crate::Error {
12 fn from(error: AsyncError) -> Self {
13 Self::ContextProviderError(error.into())
14 }
15}
16
17pub async fn retry<Fut, FutureFactoryFn, R>(
74 address_list: &AddressList,
75 settings: RequestSettings,
76 mut future_factory_fn: FutureFactoryFn,
77) -> ExecutionResult<R, Error>
78where
79 Fut: Future<Output = ExecutionResult<R, Error>>,
80 FutureFactoryFn: FnMut(RequestSettings) -> Fut,
81 R: Send,
82{
83 let max_retries = settings.retries.unwrap_or_default();
84 let mut total_retries: usize = 0;
85 let mut current_settings = settings;
86
87 let mut last_meaningful_error: Option<rs_dapi_client::ExecutionError<Error>> = None;
90
91 loop {
92 let result = future_factory_fn(current_settings).await;
93
94 update_address_ban_status(address_list, &result, ¤t_settings.finalize());
96
97 match result {
98 Ok(response) => return Ok(response),
99 Err(error) => {
100 if error.is_no_available_addresses() {
102 if let Some(prev_error) = last_meaningful_error.take() {
103 tracing::error!(
104 retry = total_retries,
105 max_retries,
106 error = ?prev_error,
107 "no addresses available to retry"
108 );
109 return Err(rs_dapi_client::ExecutionError {
111 inner: Error::NoAvailableAddressesToRetry(Box::new(prev_error.inner)),
112 retries: total_retries,
113 address: prev_error.address,
114 });
115 }
116 return Err(error);
118 }
119
120 let requests_sent = error.retries + 1;
122 total_retries += requests_sent;
123
124 if !error.can_retry() {
125 let mut final_error = error;
127 final_error.retries = total_retries;
128 return Err(final_error);
129 }
130
131 if total_retries > max_retries {
132 tracing::error!(
134 retry = total_retries,
135 max_retries,
136 error = ?error,
137 "no more retries left, giving up"
138 );
139 let mut final_error = error;
140 final_error.retries = total_retries;
141 return Err(final_error);
142 }
143
144 tracing::warn!(
146 retry = total_retries,
147 max_retries,
148 error = ?error,
149 "retrying request"
150 );
151
152 current_settings.retries = Some(max_retries.saturating_sub(total_retries));
154
155 let delay = Duration::from_millis(10);
158 tracing::warn!(duration = ?delay, error = ?error, "request failed, retrying");
159
160 last_meaningful_error = Some(error);
162
163 sleep(delay).await;
164 }
165 }
166 }
167}
168
169#[cfg(test)]
170mod test {
171 use super::*;
172 use rs_dapi_client::ExecutionError;
173 use std::sync::{
174 atomic::{AtomicUsize, Ordering},
175 Arc,
176 };
177
178 use crate::error::StaleNodeError;
179 use rs_dapi_client::DapiClientError;
180
181 async fn retry_test_function(
182 settings: RequestSettings,
183 counter: Arc<AtomicUsize>,
184 ) -> ExecutionResult<(), Error> {
185 let retries = counter.load(Ordering::Relaxed);
187 let retries = if settings.retries.unwrap_or_default() < retries {
188 settings.retries.unwrap_or_default()
189 } else {
190 retries
191 };
192
193 counter.fetch_add(1 + retries, Ordering::Relaxed);
195
196 Err(ExecutionError {
197 inner: Error::StaleNode(StaleNodeError::Height {
198 expected_height: 100,
199 received_height: 50,
200 tolerance_blocks: 1,
201 }),
202 retries,
203 address: Some("http://localhost".parse().expect("valid address")),
204 })
205 }
206
207 #[test_case::test_matrix([1,2,3,5,7,8,10,11,23,49, usize::MAX])]
208 #[tokio::test]
209 async fn test_retry(expected_requests: usize) {
210 for _ in 0..1 {
211 let counter = Arc::new(AtomicUsize::new(0));
212
213 let address_list = AddressList::default();
214
215 let mut global_settings = RequestSettings::default();
217 global_settings.retries = Some(expected_requests - 1);
218
219 let closure = |s| {
220 let counter = counter.clone();
221 retry_test_function(s, counter)
222 };
223
224 retry(&address_list, global_settings, closure)
225 .await
226 .expect_err("should fail");
227
228 assert_eq!(
229 counter.load(Ordering::Relaxed),
230 expected_requests,
231 "test failed for expected {} requests",
232 expected_requests
233 );
234 }
235 }
236
237 #[tokio::test]
240 async fn test_retry_returns_last_meaningful_error_on_no_addresses() {
241 let call_count = Arc::new(AtomicUsize::new(0));
242 let address_list = AddressList::default();
243
244 let mut settings = RequestSettings::default();
245 settings.retries = Some(5);
246
247 let call_count_clone = call_count.clone();
248 let closure = move |_settings: RequestSettings| {
249 let count = call_count_clone.fetch_add(1, Ordering::Relaxed);
250 async move {
251 if count == 0 {
252 Err(ExecutionError {
253 inner: Error::StaleNode(StaleNodeError::Height {
254 expected_height: 100,
255 received_height: 50,
256 tolerance_blocks: 1,
257 }),
258 retries: 0,
259 address: Some("http://localhost:1".parse().unwrap()),
260 })
261 } else {
262 Err(ExecutionError {
263 inner: Error::DapiClientError(DapiClientError::NoAvailableAddresses),
264 retries: 0,
265 address: None,
266 })
267 }
268 }
269 };
270
271 let result: ExecutionResult<(), Error> = retry(&address_list, settings, closure).await;
272
273 let error = result.expect_err("should fail");
274 match &error.inner {
275 Error::NoAvailableAddressesToRetry(inner) => {
276 assert!(
277 matches!(**inner, Error::StaleNode(_)),
278 "inner error should be StaleNode, got: {:?}",
279 inner
280 );
281 }
282 _ => panic!(
283 "expected NoAvailableAddresses error, got: {:?}",
284 error.inner
285 ),
286 }
287 assert_eq!(
288 call_count.load(Ordering::Relaxed),
289 2,
290 "should have called twice"
291 );
292 }
293
294 #[tokio::test]
297 async fn test_retry_returns_no_addresses_if_no_previous_error() {
298 let address_list = AddressList::default();
299
300 let mut settings = RequestSettings::default();
301 settings.retries = Some(5);
302
303 let closure = move |_settings: RequestSettings| async move {
304 Err(ExecutionError {
305 inner: Error::DapiClientError(DapiClientError::NoAvailableAddresses),
306 retries: 0,
307 address: None,
308 })
309 };
310
311 let result: ExecutionResult<(), Error> = retry(&address_list, settings, closure).await;
312
313 let error = result.expect_err("should fail");
314 assert!(
315 matches!(
316 error.inner,
317 Error::DapiClientError(DapiClientError::NoAvailableAddresses)
318 ),
319 "should return 'no available addresses' when there's no previous meaningful error, got: {:?}",
320 error.inner
321 );
322 }
323}