dash_sdk/core/
transaction.rs1use crate::platform::fetch_current_no_parameters::FetchCurrent;
2use crate::platform::types::epoch::Epoch;
3use crate::{Error, Sdk};
4use bip37_bloom_filter::{BloomFilter, BloomFilterData};
5use dapi_grpc::core::v0::{
6 transactions_with_proofs_request, transactions_with_proofs_response, GetTransactionRequest,
7 GetTransactionResponse, TransactionsWithProofsRequest, TransactionsWithProofsResponse,
8};
9use dpp::dashcore::consensus::Decodable;
10use dpp::dashcore::{Address, InstantLock, MerkleBlock, OutPoint, Transaction, Txid};
11use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProof;
12use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof;
13use dpp::prelude::AssetLockProof;
14
15use rs_dapi_client::{DapiRequestExecutor, IntoInner, RequestSettings};
16use std::time::Duration;
17use tokio::time::{sleep, timeout};
18
19impl Sdk {
20 pub async fn start_instant_send_lock_stream(
22 &self,
23 from_block_hash: Vec<u8>,
24 address: &Address,
25 ) -> Result<dapi_grpc::tonic::Streaming<TransactionsWithProofsResponse>, Error> {
26 let address_bytes = address.as_unchecked().payload_to_vec();
27
28 let bloom_filter = BloomFilter::builder(1, 0.001)
30 .expect("this FP rate allows up to 10000 items")
31 .add_element(&address_bytes)
32 .build();
33
34 let bloom_filter_proto = {
35 let BloomFilterData {
36 v_data,
37 n_hash_funcs,
38 n_tweak,
39 n_flags,
40 } = bloom_filter.into();
41 dapi_grpc::core::v0::BloomFilter {
42 v_data,
43 n_hash_funcs,
44 n_tweak,
45 n_flags,
46 }
47 };
48
49 let core_transactions_stream = TransactionsWithProofsRequest {
50 bloom_filter: Some(bloom_filter_proto),
51 count: 0, send_transaction_hashes: true,
53 from_block: Some(transactions_with_proofs_request::FromBlock::FromBlockHash(
54 from_block_hash,
55 )),
56 };
57 self.execute(core_transactions_stream, RequestSettings::default())
58 .await
59 .into_inner()
60 .map_err(|e| e.into())
61 }
62
63 pub async fn wait_for_asset_lock_proof_for_transaction(
65 &self,
66 mut stream: dapi_grpc::tonic::Streaming<TransactionsWithProofsResponse>,
67 transaction: &Transaction,
68 time_out: Option<Duration>,
69 ) -> Result<AssetLockProof, Error> {
70 let transaction_id = transaction.txid();
71
72 let _span = tracing::debug_span!(
73 "wait_for_asset_lock_proof_for_transaction",
74 transaction_id = transaction_id.to_string(),
75 )
76 .entered();
77
78 tracing::debug!("waiting for messages from stream");
79
80 let stream_processing = async {
82 loop {
83 let message = stream
85 .message()
86 .await
87 .map_err(|e| Error::Generic(format!("can't receive message: {e}")))?;
88
89 let Some(TransactionsWithProofsResponse { responses }) = message else {
90 return Err(Error::Generic("stream closed unexpectedly".to_string()));
91 };
92
93 match responses {
94 Some(
95 transactions_with_proofs_response::Responses::InstantSendLockMessages(
96 instant_send_lock_messages,
97 ),
98 ) => {
99 tracing::debug!(
100 "received {} instant lock message(s)",
101 instant_send_lock_messages.messages.len()
102 );
103
104 for instant_lock_bytes in instant_send_lock_messages.messages {
105 let instant_lock =
106 InstantLock::consensus_decode(&mut instant_lock_bytes.as_slice())
107 .map_err(|e| {
108 tracing::error!("invalid asset lock: {}", e);
109
110 Error::CoreError(e.into())
111 })?;
112
113 if instant_lock.txid == transaction_id {
114 let asset_lock_proof =
115 AssetLockProof::Instant(InstantAssetLockProof {
116 instant_lock,
117 transaction: transaction.clone(),
118 output_index: 0,
119 });
120
121 tracing::debug!(
122 ?asset_lock_proof,
123 "instant lock is matching to the broadcasted transaction, returning instant asset lock proof"
124 );
125
126 return Ok(asset_lock_proof);
127 } else {
128 tracing::debug!(
129 "instant lock is not matching, waiting for the next message"
130 );
131 }
132 }
133 }
134 Some(transactions_with_proofs_response::Responses::RawMerkleBlock(
135 raw_merkle_block,
136 )) => {
137 tracing::debug!("received merkle block");
138
139 let merkle_block =
140 MerkleBlock::consensus_decode(&mut raw_merkle_block.as_slice())
141 .map_err(|e| {
142 tracing::error!("can't decode merkle block: {}", e);
143
144 Error::CoreError(e.into())
145 })?;
146
147 let mut matches: Vec<Txid> = vec![];
148 let mut index: Vec<u32> = vec![];
149
150 merkle_block.extract_matches(&mut matches, &mut index)?;
151
152 if !matches.contains(&transaction_id) {
154 tracing::debug!(
155 "merkle block doesn't contain the transaction, waiting for the next message"
156 );
157
158 continue;
159 }
160
161 tracing::debug!(
162 "merkle block contains the transaction, obtaining core chain locked height"
163 );
164
165 let mut core_chain_locked_height;
170 loop {
171 let GetTransactionResponse {
172 height,
173 is_chain_locked,
174 ..
175 } = self
176 .execute(
177 GetTransactionRequest {
178 id: transaction_id.to_string(),
179 },
180 RequestSettings::default(),
181 )
182 .await .into_inner()?;
184
185 core_chain_locked_height = height;
186
187 if is_chain_locked {
188 break;
189 }
190
191 tracing::trace!("the transaction is on height {} but not chainlocked. try again in 1 sec", height);
192
193 sleep(Duration::from_secs(1)).await;
194 }
195
196 tracing::debug!(
197 "the transaction is chainlocked on height {}, waiting platform for reaching the same core height",
198 core_chain_locked_height
199 );
200
201 loop {
203 let (_epoch, metadata) =
204 Epoch::fetch_current_with_metadata(self).await?;
205
206 if metadata.core_chain_locked_height >= core_chain_locked_height {
207 break;
208 }
209
210 tracing::trace!(
211 "platform chain locked core height {} but we need {}. try again in 1 sec",
212 metadata.core_chain_locked_height,
213 core_chain_locked_height,
214 );
215
216 sleep(Duration::from_secs(1)).await;
217 }
218
219 let asset_lock_proof = AssetLockProof::Chain(ChainAssetLockProof {
220 core_chain_locked_height,
221 out_point: OutPoint {
222 txid: transaction.txid(),
223 vout: 0,
224 },
225 });
226
227 tracing::debug!(
228 ?asset_lock_proof,
229 "merkle block contains the broadcasted transaction, returning chain asset lock proof"
230 );
231
232 return Ok(asset_lock_proof);
233 }
234 Some(transactions_with_proofs_response::Responses::RawTransactions(_)) => {
235 tracing::trace!("received transaction(s), ignoring")
236 }
237 None => tracing::trace!(
238 "received empty response as a workaround for the bug in tonic, ignoring"
239 ),
240 }
241 }
242 };
243
244 match time_out {
246 Some(duration) => timeout(duration, stream_processing).await.map_err(|_| {
247 Error::TimeoutReached(duration, String::from("receiving asset lock proof"))
248 })?,
249 None => stream_processing.await,
250 }
251 }
252}