dash_sdk/core/
transaction.rs

1use crate::platform::fetch_current_no_parameters::FetchCurrent;
2use crate::platform::types::epoch::Epoch;
3use crate::{Error, Sdk};
4use bip37_bloom_filter::{BloomFilter, BloomFilterData};
5use dapi_grpc::core::v0::{
6    transactions_with_proofs_request, transactions_with_proofs_response, GetTransactionRequest,
7    GetTransactionResponse, TransactionsWithProofsRequest, TransactionsWithProofsResponse,
8};
9use dpp::dashcore::consensus::Decodable;
10use dpp::dashcore::{Address, InstantLock, MerkleBlock, OutPoint, Transaction, Txid};
11use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProof;
12use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof;
13use dpp::prelude::AssetLockProof;
14
15use rs_dapi_client::{DapiRequestExecutor, IntoInner, RequestSettings};
16use std::time::Duration;
17use tokio::time::{sleep, timeout};
18
19impl Sdk {
20    /// Starts the stream to listen for instant send lock messages
21    pub async fn start_instant_send_lock_stream(
22        &self,
23        from_block_hash: Vec<u8>,
24        address: &Address,
25    ) -> Result<dapi_grpc::tonic::Streaming<TransactionsWithProofsResponse>, Error> {
26        let address_bytes = address.as_unchecked().payload_to_vec();
27
28        // create the bloom filter
29        let bloom_filter = BloomFilter::builder(1, 0.001)
30            .expect("this FP rate allows up to 10000 items")
31            .add_element(&address_bytes)
32            .build();
33
34        let bloom_filter_proto = {
35            let BloomFilterData {
36                v_data,
37                n_hash_funcs,
38                n_tweak,
39                n_flags,
40            } = bloom_filter.into();
41            dapi_grpc::core::v0::BloomFilter {
42                v_data,
43                n_hash_funcs,
44                n_tweak,
45                n_flags,
46            }
47        };
48
49        let core_transactions_stream = TransactionsWithProofsRequest {
50            bloom_filter: Some(bloom_filter_proto),
51            count: 0, // Subscribing to new transactions as well
52            send_transaction_hashes: true,
53            from_block: Some(transactions_with_proofs_request::FromBlock::FromBlockHash(
54                from_block_hash,
55            )),
56        };
57        self.execute(core_transactions_stream, RequestSettings::default())
58            .await
59            .into_inner()
60            .map_err(|e| e.into())
61    }
62
63    /// Waits for a response for the asset lock proof
64    pub async fn wait_for_asset_lock_proof_for_transaction(
65        &self,
66        mut stream: dapi_grpc::tonic::Streaming<TransactionsWithProofsResponse>,
67        transaction: &Transaction,
68        time_out: Option<Duration>,
69    ) -> Result<AssetLockProof, Error> {
70        let transaction_id = transaction.txid();
71
72        let _span = tracing::debug_span!(
73            "wait_for_asset_lock_proof_for_transaction",
74            transaction_id = transaction_id.to_string(),
75        )
76        .entered();
77
78        tracing::debug!("waiting for messages from stream");
79
80        // Define an inner async block to handle the stream processing.
81        let stream_processing = async {
82            loop {
83                // TODO: We should retry if Err is returned
84                let message = stream
85                    .message()
86                    .await
87                    .map_err(|e| Error::Generic(format!("can't receive message: {e}")))?;
88
89                let Some(TransactionsWithProofsResponse { responses }) = message else {
90                    return Err(Error::Generic("stream closed unexpectedly".to_string()));
91                };
92
93                match responses {
94                    Some(
95                        transactions_with_proofs_response::Responses::InstantSendLockMessages(
96                            instant_send_lock_messages,
97                        ),
98                    ) => {
99                        tracing::debug!(
100                            "received {} instant lock message(s)",
101                            instant_send_lock_messages.messages.len()
102                        );
103
104                        for instant_lock_bytes in instant_send_lock_messages.messages {
105                            let instant_lock =
106                                InstantLock::consensus_decode(&mut instant_lock_bytes.as_slice())
107                                    .map_err(|e| {
108                                    tracing::error!("invalid asset lock: {}", e);
109
110                                    Error::CoreError(e.into())
111                                })?;
112
113                            if instant_lock.txid == transaction_id {
114                                let asset_lock_proof =
115                                    AssetLockProof::Instant(InstantAssetLockProof {
116                                        instant_lock,
117                                        transaction: transaction.clone(),
118                                        output_index: 0,
119                                    });
120
121                                tracing::debug!(
122                                    ?asset_lock_proof,
123                                    "instant lock is matching to the broadcasted transaction, returning instant asset lock proof"
124                                );
125
126                                return Ok(asset_lock_proof);
127                            } else {
128                                tracing::debug!(
129                                    "instant lock is not matching, waiting for the next message"
130                                );
131                            }
132                        }
133                    }
134                    Some(transactions_with_proofs_response::Responses::RawMerkleBlock(
135                        raw_merkle_block,
136                    )) => {
137                        tracing::debug!("received merkle block");
138
139                        let merkle_block =
140                            MerkleBlock::consensus_decode(&mut raw_merkle_block.as_slice())
141                                .map_err(|e| {
142                                    tracing::error!("can't decode merkle block: {}", e);
143
144                                    Error::CoreError(e.into())
145                                })?;
146
147                        let mut matches: Vec<Txid> = vec![];
148                        let mut index: Vec<u32> = vec![];
149
150                        merkle_block.extract_matches(&mut matches, &mut index)?;
151
152                        // Continue receiving messages until we find the transaction
153                        if !matches.contains(&transaction_id) {
154                            tracing::debug!(
155                                "merkle block doesn't contain the transaction, waiting for the next message"
156                            );
157
158                            continue;
159                        }
160
161                        tracing::debug!(
162                            "merkle block contains the transaction, obtaining core chain locked height"
163                        );
164
165                        // TODO: This a temporary implementation until we have headers stream running in background
166                        //  so we can always get actual height and chain locks
167
168                        // Wait until the block is chainlocked
169                        let mut core_chain_locked_height;
170                        loop {
171                            let GetTransactionResponse {
172                                height,
173                                is_chain_locked,
174                                ..
175                            } = self
176                                .execute(
177                                    GetTransactionRequest {
178                                        id: transaction_id.to_string(),
179                                    },
180                                    RequestSettings::default(),
181                                )
182                                .await // TODO: We need better way to handle execution errors
183                                .into_inner()?;
184
185                            core_chain_locked_height = height;
186
187                            if is_chain_locked {
188                                break;
189                            }
190
191                            tracing::trace!("the transaction is on height {} but not chainlocked. try again in 1 sec", height);
192
193                            sleep(Duration::from_secs(1)).await;
194                        }
195
196                        tracing::debug!(
197                            "the transaction is chainlocked on height {}, waiting platform for reaching the same core height",
198                            core_chain_locked_height
199                        );
200
201                        // Wait until platform chain is on the block's chain locked height
202                        loop {
203                            let (_epoch, metadata) =
204                                Epoch::fetch_current_with_metadata(self).await?;
205
206                            if metadata.core_chain_locked_height >= core_chain_locked_height {
207                                break;
208                            }
209
210                            tracing::trace!(
211                                "platform chain locked core height {} but we need {}. try again in 1 sec",
212                                metadata.core_chain_locked_height,
213                                core_chain_locked_height,
214                            );
215
216                            sleep(Duration::from_secs(1)).await;
217                        }
218
219                        let asset_lock_proof = AssetLockProof::Chain(ChainAssetLockProof {
220                            core_chain_locked_height,
221                            out_point: OutPoint {
222                                txid: transaction.txid(),
223                                vout: 0,
224                            },
225                        });
226
227                        tracing::debug!(
228                                ?asset_lock_proof,
229                                "merkle block contains the broadcasted transaction, returning chain asset lock proof"
230                            );
231
232                        return Ok(asset_lock_proof);
233                    }
234                    Some(transactions_with_proofs_response::Responses::RawTransactions(_)) => {
235                        tracing::trace!("received transaction(s), ignoring")
236                    }
237                    None => tracing::trace!(
238                        "received empty response as a workaround for the bug in tonic, ignoring"
239                    ),
240                }
241            }
242        };
243
244        // Apply the timeout if `time_out_ms` is Some, otherwise just await the processing.
245        match time_out {
246            Some(duration) => timeout(duration, stream_processing).await.map_err(|_| {
247                Error::TimeoutReached(duration, String::from("receiving asset lock proof"))
248            })?,
249            None => stream_processing.await,
250        }
251    }
252}