Skip to main content

drive_abci/query/
service.rs

1use crate::error::query::QueryError;
2use crate::error::Error;
3use crate::metrics::{abci_response_code_metric_label, query_duration_metric};
4use crate::platform_types::platform::Platform;
5use crate::platform_types::platform_state::PlatformState;
6use crate::platform_types::platform_state::PlatformStateV0Methods;
7use crate::query::QueryValidationResult;
8use crate::rpc::core::DefaultCoreRPC;
9use crate::utils::spawn_blocking_task_with_name_if_supported;
10use async_trait::async_trait;
11use dapi_grpc::drive::v0::drive_internal_server::DriveInternal;
12use dapi_grpc::drive::v0::{GetProofsRequest, GetProofsResponse};
13use dapi_grpc::platform::v0::platform_server::Platform as PlatformService;
14use dapi_grpc::platform::v0::{
15    BroadcastStateTransitionRequest, BroadcastStateTransitionResponse, GetAddressInfoRequest,
16    GetAddressInfoResponse, GetAddressesBranchStateRequest, GetAddressesBranchStateResponse,
17    GetAddressesInfosRequest, GetAddressesInfosResponse, GetAddressesTrunkStateRequest,
18    GetAddressesTrunkStateResponse, GetConsensusParamsRequest, GetConsensusParamsResponse,
19    GetContestedResourceIdentityVotesRequest, GetContestedResourceIdentityVotesResponse,
20    GetContestedResourceVoteStateRequest, GetContestedResourceVoteStateResponse,
21    GetContestedResourceVotersForIdentityRequest, GetContestedResourceVotersForIdentityResponse,
22    GetContestedResourcesRequest, GetContestedResourcesResponse, GetCurrentQuorumsInfoRequest,
23    GetCurrentQuorumsInfoResponse, GetDataContractHistoryRequest, GetDataContractHistoryResponse,
24    GetDataContractRequest, GetDataContractResponse, GetDataContractsRequest,
25    GetDataContractsResponse, GetDocumentHistoryRequest, GetDocumentHistoryResponse,
26    GetDocumentsRequest, GetDocumentsResponse, GetEpochsInfoRequest, GetEpochsInfoResponse,
27    GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest,
28    GetEvonodesProposedEpochBlocksResponse, GetFinalizedEpochInfosRequest,
29    GetFinalizedEpochInfosResponse, GetGroupActionSignersRequest, GetGroupActionSignersResponse,
30    GetGroupActionsRequest, GetGroupActionsResponse, GetGroupInfoRequest, GetGroupInfoResponse,
31    GetGroupInfosRequest, GetGroupInfosResponse, GetIdentitiesBalancesRequest,
32    GetIdentitiesBalancesResponse, GetIdentitiesContractKeysRequest,
33    GetIdentitiesContractKeysResponse, GetIdentitiesTokenBalancesRequest,
34    GetIdentitiesTokenBalancesResponse, GetIdentitiesTokenInfosRequest,
35    GetIdentitiesTokenInfosResponse, GetIdentityBalanceAndRevisionRequest,
36    GetIdentityBalanceAndRevisionResponse, GetIdentityBalanceRequest, GetIdentityBalanceResponse,
37    GetIdentityByNonUniquePublicKeyHashRequest, GetIdentityByNonUniquePublicKeyHashResponse,
38    GetIdentityByPublicKeyHashRequest, GetIdentityByPublicKeyHashResponse,
39    GetIdentityContractNonceRequest, GetIdentityContractNonceResponse, GetIdentityKeysRequest,
40    GetIdentityKeysResponse, GetIdentityNonceRequest, GetIdentityNonceResponse, GetIdentityRequest,
41    GetIdentityResponse, GetIdentityTokenBalancesRequest, GetIdentityTokenBalancesResponse,
42    GetIdentityTokenInfosRequest, GetIdentityTokenInfosResponse,
43    GetMostRecentShieldedAnchorRequest, GetMostRecentShieldedAnchorResponse,
44    GetPathElementsRequest, GetPathElementsResponse, GetPrefundedSpecializedBalanceRequest,
45    GetPrefundedSpecializedBalanceResponse, GetProtocolVersionUpgradeStateRequest,
46    GetProtocolVersionUpgradeStateResponse, GetProtocolVersionUpgradeVoteStatusRequest,
47    GetProtocolVersionUpgradeVoteStatusResponse, GetRecentAddressBalanceChangesRequest,
48    GetRecentAddressBalanceChangesResponse, GetRecentCompactedAddressBalanceChangesRequest,
49    GetRecentCompactedAddressBalanceChangesResponse, GetShieldedAnchorsRequest,
50    GetShieldedAnchorsResponse, GetShieldedEncryptedNotesRequest,
51    GetShieldedEncryptedNotesResponse, GetShieldedNotesCountRequest, GetShieldedNotesCountResponse,
52    GetShieldedNullifiersRequest, GetShieldedNullifiersResponse, GetShieldedPoolStateRequest,
53    GetShieldedPoolStateResponse, GetStatusRequest, GetStatusResponse, GetTokenContractInfoRequest,
54    GetTokenContractInfoResponse, GetTokenDirectPurchasePricesRequest,
55    GetTokenDirectPurchasePricesResponse, GetTokenPerpetualDistributionLastClaimRequest,
56    GetTokenPerpetualDistributionLastClaimResponse, GetTokenPreProgrammedDistributionsRequest,
57    GetTokenPreProgrammedDistributionsResponse, GetTokenStatusesRequest, GetTokenStatusesResponse,
58    GetTokenTotalSupplyRequest, GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest,
59    GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse,
60    WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse,
61};
62use dapi_grpc::tonic::{Code, Request, Response, Status};
63use dpp::version::PlatformVersion;
64use std::fmt::Debug;
65use std::sync::atomic::Ordering;
66use std::sync::Arc;
67use std::thread::sleep;
68use std::time::Duration;
69use tracing::Instrument;
70
71/// Service to handle platform queries
72pub struct QueryService {
73    platform: Arc<Platform<DefaultCoreRPC>>,
74}
75
76type QueryMethod<RQ, RS> = fn(
77    &Platform<DefaultCoreRPC>,
78    RQ,
79    &PlatformState,
80    &PlatformVersion,
81) -> Result<QueryValidationResult<RS>, Error>;
82
83impl QueryService {
84    /// Creates new QueryService
85    pub fn new(platform: Arc<Platform<DefaultCoreRPC>>) -> Self {
86        Self { platform }
87    }
88
89    async fn handle_blocking_query<RQ, RS>(
90        &self,
91        request: Request<RQ>,
92        query_method: QueryMethod<RQ, RS>,
93        endpoint_name: &str,
94    ) -> Result<Response<RS>, Status>
95    where
96        RS: Clone + Send + 'static,
97        RQ: Debug + Send + Clone + 'static,
98    {
99        let mut response_duration_metric = query_duration_metric(endpoint_name);
100
101        let platform = Arc::clone(&self.platform);
102
103        let request_debug = format!("{:?}", &request);
104
105        let result = spawn_blocking_task_with_name_if_supported("query", move || {
106            let mut result;
107
108            let query_request = request.into_inner();
109
110            let mut query_counter = 0;
111
112            loop {
113                let platform_state = platform.state.load();
114
115                let platform_version = platform_state
116                    .current_platform_version()
117                    .map_err(|_| Status::unavailable("platform is not initialized"))?;
118
119                // Query is using Platform execution state and Drive state to during the execution.
120                // They are updating every block in finalize block ABCI handler.
121                // The problem is that these two operations aren't atomic and some latency between
122                // them could lead to data races. `committed_block_height_guard` counter that represents
123                // the latest the height of latest committed Drive state and logic bellow ensures
124                // that query is executed only after/before both states are updated.
125                let mut needs_restart = false;
126
127                loop {
128                    let committed_block_height_guard = platform
129                        .committed_block_height_guard
130                        .load(Ordering::Relaxed);
131                    let mut counter = 0;
132                    if platform_state.last_committed_block_height() == committed_block_height_guard
133                    {
134                        break;
135                    } else {
136                        counter += 1;
137                        sleep(Duration::from_millis(10))
138                    }
139
140                    // We try for up to 1 second
141                    if counter >= 100 {
142                        query_counter += 1;
143                        needs_restart = true;
144                        break;
145                    }
146                }
147
148                if query_counter > 3 {
149                    return Err(query_error_into_status(QueryError::NotServiceable(
150                        "platform is saturated (did not attempt query)".to_string(),
151                    )));
152                }
153
154                if needs_restart {
155                    continue;
156                }
157
158                result = query_method(
159                    &platform,
160                    query_request.clone(),
161                    &platform_state,
162                    platform_version,
163                );
164
165                let committed_block_height_guard = platform
166                    .committed_block_height_guard
167                    .load(Ordering::Relaxed);
168
169                if platform_state.last_committed_block_height() == committed_block_height_guard {
170                    // in this case the query almost certainly executed correctly
171                    break;
172                } else {
173                    query_counter += 1;
174
175                    if query_counter > 2 {
176                        // This should never be possible
177                        return Err(query_error_into_status(QueryError::NotServiceable(
178                            "platform is saturated".to_string(),
179                        )));
180                    }
181                }
182            }
183
184            let mut query_result = result.map_err(error_into_status)?;
185
186            if query_result.is_valid() {
187                let response = query_result
188                    .into_data()
189                    .map_err(|error| error_into_status(error.into()))?;
190
191                Ok(Response::new(response))
192            } else {
193                let error = query_result.errors.swap_remove(0);
194
195                Err(query_error_into_status(error))
196            }
197        })?
198        .instrument(tracing::trace_span!("query", endpoint_name))
199        .await
200        .map_err(|error| Status::internal(format!("query thread failed: {}", error)))?;
201
202        // Query logging and metrics
203        let code = match &result {
204            Ok(_) => Code::Ok,
205            Err(status) => status.code(),
206        };
207
208        let code_label = format!("{:?}", code).to_lowercase();
209
210        // Add code to response duration metric
211        let label = abci_response_code_metric_label(code);
212        response_duration_metric.add_label(label);
213
214        match code {
215            // User errors
216            Code::Ok
217            | Code::InvalidArgument
218            | Code::NotFound
219            | Code::AlreadyExists
220            | Code::ResourceExhausted
221            | Code::PermissionDenied
222            | Code::Unavailable
223            | Code::Aborted
224            | Code::FailedPrecondition
225            | Code::OutOfRange
226            | Code::Cancelled
227            | Code::DeadlineExceeded
228            | Code::Unauthenticated => {
229                let elapsed_time = response_duration_metric.elapsed().as_secs_f64();
230
231                tracing::trace!(
232                    request = request_debug,
233                    elapsed_time,
234                    endpoint_name,
235                    code = code_label,
236                    "query '{}' executed with code {:?} in {} secs",
237                    endpoint_name,
238                    code,
239                    elapsed_time
240                );
241            }
242            // System errors
243            Code::Unknown | Code::Unimplemented | Code::Internal | Code::DataLoss => {
244                tracing::error!(
245                    request = request_debug,
246                    endpoint_name,
247                    code = code_label,
248                    "query '{}' execution failed with code {:?}",
249                    endpoint_name,
250                    code
251                );
252            }
253        }
254
255        result
256    }
257}
258
259fn respond_with_unimplemented<RS>(name: &str) -> Result<Response<RS>, Status> {
260    tracing::error!("{} endpoint is called but it's not supported", name);
261
262    Err(Status::unimplemented("the endpoint is not supported"))
263}
264
265#[async_trait]
266impl PlatformService for QueryService {
267    async fn broadcast_state_transition(
268        &self,
269        _request: Request<BroadcastStateTransitionRequest>,
270    ) -> Result<Response<BroadcastStateTransitionResponse>, Status> {
271        respond_with_unimplemented("broadcast_state_transition")
272    }
273
274    async fn get_identity(
275        &self,
276        request: Request<GetIdentityRequest>,
277    ) -> Result<Response<GetIdentityResponse>, Status> {
278        self.handle_blocking_query(
279            request,
280            Platform::<DefaultCoreRPC>::query_identity,
281            "get_identity",
282        )
283        .await
284    }
285
286    async fn get_identities_contract_keys(
287        &self,
288        request: Request<GetIdentitiesContractKeysRequest>,
289    ) -> Result<Response<GetIdentitiesContractKeysResponse>, Status> {
290        self.handle_blocking_query(
291            request,
292            Platform::<DefaultCoreRPC>::query_identities_contract_keys,
293            "get_identities_contract_keys",
294        )
295        .await
296    }
297
298    async fn get_identity_keys(
299        &self,
300        request: Request<GetIdentityKeysRequest>,
301    ) -> Result<Response<GetIdentityKeysResponse>, Status> {
302        self.handle_blocking_query(
303            request,
304            Platform::<DefaultCoreRPC>::query_keys,
305            "get_identity_keys",
306        )
307        .await
308    }
309
310    async fn get_identity_nonce(
311        &self,
312        request: Request<GetIdentityNonceRequest>,
313    ) -> Result<Response<GetIdentityNonceResponse>, Status> {
314        self.handle_blocking_query(
315            request,
316            Platform::<DefaultCoreRPC>::query_identity_nonce,
317            "get_identity_nonce",
318        )
319        .await
320    }
321
322    async fn get_identity_contract_nonce(
323        &self,
324        request: Request<GetIdentityContractNonceRequest>,
325    ) -> Result<Response<GetIdentityContractNonceResponse>, Status> {
326        self.handle_blocking_query(
327            request,
328            Platform::<DefaultCoreRPC>::query_identity_contract_nonce,
329            "get_identity_contract_nonce",
330        )
331        .await
332    }
333
334    async fn get_identity_balance(
335        &self,
336        request: Request<GetIdentityBalanceRequest>,
337    ) -> Result<Response<GetIdentityBalanceResponse>, Status> {
338        self.handle_blocking_query(
339            request,
340            Platform::<DefaultCoreRPC>::query_balance,
341            "get_identity_balance",
342        )
343        .await
344    }
345
346    async fn get_identity_balance_and_revision(
347        &self,
348        request: Request<GetIdentityBalanceAndRevisionRequest>,
349    ) -> Result<Response<GetIdentityBalanceAndRevisionResponse>, Status> {
350        self.handle_blocking_query(
351            request,
352            Platform::<DefaultCoreRPC>::query_balance_and_revision,
353            "get_identity_balance_and_revision",
354        )
355        .await
356    }
357
358    async fn get_data_contract(
359        &self,
360        request: Request<GetDataContractRequest>,
361    ) -> Result<Response<GetDataContractResponse>, Status> {
362        self.handle_blocking_query(
363            request,
364            Platform::<DefaultCoreRPC>::query_data_contract,
365            "get_data_contract",
366        )
367        .await
368    }
369
370    async fn get_data_contract_history(
371        &self,
372        request: Request<GetDataContractHistoryRequest>,
373    ) -> Result<Response<GetDataContractHistoryResponse>, Status> {
374        self.handle_blocking_query(
375            request,
376            Platform::<DefaultCoreRPC>::query_data_contract_history,
377            "get_data_contract_history",
378        )
379        .await
380    }
381
382    async fn get_data_contracts(
383        &self,
384        request: Request<GetDataContractsRequest>,
385    ) -> Result<Response<GetDataContractsResponse>, Status> {
386        self.handle_blocking_query(
387            request,
388            Platform::<DefaultCoreRPC>::query_data_contracts,
389            "get_data_contracts",
390        )
391        .await
392    }
393
394    async fn get_document_history(
395        &self,
396        request: Request<GetDocumentHistoryRequest>,
397    ) -> Result<Response<GetDocumentHistoryResponse>, Status> {
398        self.handle_blocking_query(
399            request,
400            Platform::<DefaultCoreRPC>::query_document_history,
401            "get_document_history",
402        )
403        .await
404    }
405
406    async fn get_documents(
407        &self,
408        request: Request<GetDocumentsRequest>,
409    ) -> Result<Response<GetDocumentsResponse>, Status> {
410        self.handle_blocking_query(
411            request,
412            Platform::<DefaultCoreRPC>::query_documents,
413            "get_documents",
414        )
415        .await
416    }
417
418    async fn get_identity_by_public_key_hash(
419        &self,
420        request: Request<GetIdentityByPublicKeyHashRequest>,
421    ) -> Result<Response<GetIdentityByPublicKeyHashResponse>, Status> {
422        self.handle_blocking_query(
423            request,
424            Platform::<DefaultCoreRPC>::query_identity_by_public_key_hash,
425            "get_identity_by_public_key_hash",
426        )
427        .await
428    }
429
430    async fn get_identity_by_non_unique_public_key_hash(
431        &self,
432        request: Request<GetIdentityByNonUniquePublicKeyHashRequest>,
433    ) -> Result<Response<GetIdentityByNonUniquePublicKeyHashResponse>, Status> {
434        self.handle_blocking_query(
435            request,
436            Platform::<DefaultCoreRPC>::query_identity_by_non_unique_public_key_hash,
437            "get_identity_by_non_unique_public_key_hash",
438        )
439        .await
440    }
441
442    async fn wait_for_state_transition_result(
443        &self,
444        _request: Request<WaitForStateTransitionResultRequest>,
445    ) -> Result<Response<WaitForStateTransitionResultResponse>, Status> {
446        respond_with_unimplemented("wait_for_state_transition_result")
447    }
448
449    async fn get_consensus_params(
450        &self,
451        _request: Request<GetConsensusParamsRequest>,
452    ) -> Result<Response<GetConsensusParamsResponse>, Status> {
453        respond_with_unimplemented("get_consensus_params")
454    }
455
456    async fn get_protocol_version_upgrade_state(
457        &self,
458        request: Request<GetProtocolVersionUpgradeStateRequest>,
459    ) -> Result<Response<GetProtocolVersionUpgradeStateResponse>, Status> {
460        self.handle_blocking_query(
461            request,
462            Platform::<DefaultCoreRPC>::query_version_upgrade_state,
463            "get_protocol_version_upgrade_state",
464        )
465        .await
466    }
467
468    async fn get_protocol_version_upgrade_vote_status(
469        &self,
470        request: Request<GetProtocolVersionUpgradeVoteStatusRequest>,
471    ) -> Result<Response<GetProtocolVersionUpgradeVoteStatusResponse>, Status> {
472        self.handle_blocking_query(
473            request,
474            Platform::<DefaultCoreRPC>::query_version_upgrade_vote_status,
475            "get_protocol_version_upgrade_vote_status",
476        )
477        .await
478    }
479
480    async fn get_epochs_info(
481        &self,
482        request: Request<GetEpochsInfoRequest>,
483    ) -> Result<Response<GetEpochsInfoResponse>, Status> {
484        self.handle_blocking_query(
485            request,
486            Platform::<DefaultCoreRPC>::query_epoch_infos,
487            "get_epochs_info",
488        )
489        .await
490    }
491
492    async fn get_path_elements(
493        &self,
494        request: Request<GetPathElementsRequest>,
495    ) -> Result<Response<GetPathElementsResponse>, Status> {
496        self.handle_blocking_query(
497            request,
498            Platform::<DefaultCoreRPC>::query_path_elements,
499            "get_path_elements",
500        )
501        .await
502    }
503
504    async fn get_contested_resources(
505        &self,
506        request: Request<GetContestedResourcesRequest>,
507    ) -> Result<Response<GetContestedResourcesResponse>, Status> {
508        self.handle_blocking_query(
509            request,
510            Platform::<DefaultCoreRPC>::query_contested_resources,
511            "get_contested_resources",
512        )
513        .await
514    }
515
516    async fn get_contested_resource_vote_state(
517        &self,
518        request: Request<GetContestedResourceVoteStateRequest>,
519    ) -> Result<Response<GetContestedResourceVoteStateResponse>, Status> {
520        self.handle_blocking_query(
521            request,
522            Platform::<DefaultCoreRPC>::query_contested_resource_vote_state,
523            "get_contested_resource_vote_state",
524        )
525        .await
526    }
527
528    async fn get_contested_resource_voters_for_identity(
529        &self,
530        request: Request<GetContestedResourceVotersForIdentityRequest>,
531    ) -> Result<Response<GetContestedResourceVotersForIdentityResponse>, Status> {
532        self.handle_blocking_query(
533            request,
534            Platform::<DefaultCoreRPC>::query_contested_resource_voters_for_identity,
535            "get_contested_resource_voters_for_identity",
536        )
537        .await
538    }
539
540    async fn get_contested_resource_identity_votes(
541        &self,
542        request: Request<GetContestedResourceIdentityVotesRequest>,
543    ) -> Result<Response<GetContestedResourceIdentityVotesResponse>, Status> {
544        self.handle_blocking_query(
545            request,
546            Platform::<DefaultCoreRPC>::query_contested_resource_identity_votes,
547            "get_contested_resource_identity_votes",
548        )
549        .await
550    }
551
552    async fn get_vote_polls_by_end_date(
553        &self,
554        request: Request<GetVotePollsByEndDateRequest>,
555    ) -> Result<Response<GetVotePollsByEndDateResponse>, Status> {
556        self.handle_blocking_query(
557            request,
558            Platform::<DefaultCoreRPC>::query_vote_polls_by_end_date_query,
559            "get_vote_polls_by_end_date",
560        )
561        .await
562    }
563
564    async fn get_prefunded_specialized_balance(
565        &self,
566        request: Request<GetPrefundedSpecializedBalanceRequest>,
567    ) -> Result<Response<GetPrefundedSpecializedBalanceResponse>, Status> {
568        self.handle_blocking_query(
569            request,
570            Platform::<DefaultCoreRPC>::query_prefunded_specialized_balance,
571            "get_prefunded_specialized_balance",
572        )
573        .await
574    }
575
576    async fn get_total_credits_in_platform(
577        &self,
578        request: Request<GetTotalCreditsInPlatformRequest>,
579    ) -> Result<Response<GetTotalCreditsInPlatformResponse>, Status> {
580        self.handle_blocking_query(
581            request,
582            Platform::<DefaultCoreRPC>::query_total_credits_in_platform,
583            "get_total_credits_in_platform",
584        )
585        .await
586    }
587
588    async fn get_identities_balances(
589        &self,
590        request: Request<GetIdentitiesBalancesRequest>,
591    ) -> Result<Response<GetIdentitiesBalancesResponse>, Status> {
592        self.handle_blocking_query(
593            request,
594            Platform::<DefaultCoreRPC>::query_identities_balances,
595            "get_identities_balances",
596        )
597        .await
598    }
599
600    async fn get_status(
601        &self,
602        request: Request<GetStatusRequest>,
603    ) -> Result<Response<GetStatusResponse>, Status> {
604        self.handle_blocking_query(
605            request,
606            Platform::<DefaultCoreRPC>::query_partial_status,
607            "query_partial_status",
608        )
609        .await
610    }
611
612    async fn get_evonodes_proposed_epoch_blocks_by_ids(
613        &self,
614        request: Request<GetEvonodesProposedEpochBlocksByIdsRequest>,
615    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
616        self.handle_blocking_query(
617            request,
618            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_evonode_ids,
619            "query_proposed_block_counts_by_evonode_ids",
620        )
621        .await
622    }
623
624    async fn get_evonodes_proposed_epoch_blocks_by_range(
625        &self,
626        request: Request<GetEvonodesProposedEpochBlocksByRangeRequest>,
627    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
628        self.handle_blocking_query(
629            request,
630            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_range,
631            "query_proposed_block_counts_by_range",
632        )
633        .await
634    }
635
636    async fn get_current_quorums_info(
637        &self,
638        request: Request<GetCurrentQuorumsInfoRequest>,
639    ) -> Result<Response<GetCurrentQuorumsInfoResponse>, Status> {
640        self.handle_blocking_query(
641            request,
642            Platform::<DefaultCoreRPC>::query_current_quorums_info,
643            "query_current_quorums_info",
644        )
645        .await
646    }
647
648    async fn get_identity_token_balances(
649        &self,
650        request: Request<GetIdentityTokenBalancesRequest>,
651    ) -> Result<Response<GetIdentityTokenBalancesResponse>, Status> {
652        self.handle_blocking_query(
653            request,
654            Platform::<DefaultCoreRPC>::query_identity_token_balances,
655            "query_identity_token_balances",
656        )
657        .await
658    }
659
660    async fn get_identities_token_balances(
661        &self,
662        request: Request<GetIdentitiesTokenBalancesRequest>,
663    ) -> Result<Response<GetIdentitiesTokenBalancesResponse>, Status> {
664        self.handle_blocking_query(
665            request,
666            Platform::<DefaultCoreRPC>::query_identities_token_balances,
667            "query_identities_token_balances",
668        )
669        .await
670    }
671
672    async fn get_identity_token_infos(
673        &self,
674        request: Request<GetIdentityTokenInfosRequest>,
675    ) -> Result<Response<GetIdentityTokenInfosResponse>, Status> {
676        self.handle_blocking_query(
677            request,
678            Platform::<DefaultCoreRPC>::query_identity_token_infos,
679            "query_identity_token_infos",
680        )
681        .await
682    }
683
684    async fn get_identities_token_infos(
685        &self,
686        request: Request<GetIdentitiesTokenInfosRequest>,
687    ) -> Result<Response<GetIdentitiesTokenInfosResponse>, Status> {
688        self.handle_blocking_query(
689            request,
690            Platform::<DefaultCoreRPC>::query_identities_token_infos,
691            "query_identities_token_infos",
692        )
693        .await
694    }
695
696    async fn get_token_statuses(
697        &self,
698        request: Request<GetTokenStatusesRequest>,
699    ) -> Result<Response<GetTokenStatusesResponse>, Status> {
700        self.handle_blocking_query(
701            request,
702            Platform::<DefaultCoreRPC>::query_token_statuses,
703            "get_token_statuses",
704        )
705        .await
706    }
707
708    async fn get_token_pre_programmed_distributions(
709        &self,
710        request: Request<GetTokenPreProgrammedDistributionsRequest>,
711    ) -> Result<Response<GetTokenPreProgrammedDistributionsResponse>, Status> {
712        self.handle_blocking_query(
713            request,
714            Platform::<DefaultCoreRPC>::query_token_pre_programmed_distributions,
715            "get_token_pre_programmed_distributions",
716        )
717        .await
718    }
719
720    async fn get_token_total_supply(
721        &self,
722        request: Request<GetTokenTotalSupplyRequest>,
723    ) -> Result<Response<GetTokenTotalSupplyResponse>, Status> {
724        self.handle_blocking_query(
725            request,
726            Platform::<DefaultCoreRPC>::query_token_total_supply,
727            "get_token_total_supply",
728        )
729        .await
730    }
731
732    async fn get_group_info(
733        &self,
734        request: Request<GetGroupInfoRequest>,
735    ) -> Result<Response<GetGroupInfoResponse>, Status> {
736        self.handle_blocking_query(
737            request,
738            Platform::<DefaultCoreRPC>::query_group_info,
739            "get_group_info",
740        )
741        .await
742    }
743
744    async fn get_group_infos(
745        &self,
746        request: Request<GetGroupInfosRequest>,
747    ) -> Result<Response<GetGroupInfosResponse>, Status> {
748        self.handle_blocking_query(
749            request,
750            Platform::<DefaultCoreRPC>::query_group_infos,
751            "get_group_infos",
752        )
753        .await
754    }
755
756    async fn get_group_actions(
757        &self,
758        request: Request<GetGroupActionsRequest>,
759    ) -> Result<Response<GetGroupActionsResponse>, Status> {
760        self.handle_blocking_query(
761            request,
762            Platform::<DefaultCoreRPC>::query_group_actions,
763            "get_group_actions",
764        )
765        .await
766    }
767
768    async fn get_group_action_signers(
769        &self,
770        request: Request<GetGroupActionSignersRequest>,
771    ) -> Result<Response<GetGroupActionSignersResponse>, Status> {
772        self.handle_blocking_query(
773            request,
774            Platform::<DefaultCoreRPC>::query_group_action_signers,
775            "get_group_action_signers",
776        )
777        .await
778    }
779
780    async fn get_token_direct_purchase_prices(
781        &self,
782        request: Request<GetTokenDirectPurchasePricesRequest>,
783    ) -> Result<Response<GetTokenDirectPurchasePricesResponse>, Status> {
784        self.handle_blocking_query(
785            request,
786            Platform::<DefaultCoreRPC>::query_token_direct_purchase_prices,
787            "get_token_direct_purchase_prices",
788        )
789        .await
790    }
791
792    async fn get_token_contract_info(
793        &self,
794        request: Request<GetTokenContractInfoRequest>,
795    ) -> Result<Response<GetTokenContractInfoResponse>, Status> {
796        self.handle_blocking_query(
797            request,
798            Platform::<DefaultCoreRPC>::query_token_contract_info,
799            "get_token_contract_info",
800        )
801        .await
802    }
803
804    async fn get_token_perpetual_distribution_last_claim(
805        &self,
806        request: Request<GetTokenPerpetualDistributionLastClaimRequest>,
807    ) -> Result<Response<GetTokenPerpetualDistributionLastClaimResponse>, Status> {
808        self.handle_blocking_query(
809            request,
810            Platform::<DefaultCoreRPC>::query_token_perpetual_distribution_last_claim,
811            "get_token_perpetual_distribution_last_claim",
812        )
813        .await
814    }
815
816    async fn get_finalized_epoch_infos(
817        &self,
818        request: Request<GetFinalizedEpochInfosRequest>,
819    ) -> Result<Response<GetFinalizedEpochInfosResponse>, Status> {
820        self.handle_blocking_query(
821            request,
822            Platform::<DefaultCoreRPC>::query_finalized_epoch_infos,
823            "get_finalized_epoch_infos",
824        )
825        .await
826    }
827
828    async fn get_address_info(
829        &self,
830        request: Request<GetAddressInfoRequest>,
831    ) -> Result<Response<GetAddressInfoResponse>, Status> {
832        self.handle_blocking_query(
833            request,
834            Platform::<DefaultCoreRPC>::query_address_info,
835            "get_address_info",
836        )
837        .await
838    }
839
840    async fn get_addresses_infos(
841        &self,
842        request: Request<GetAddressesInfosRequest>,
843    ) -> Result<Response<GetAddressesInfosResponse>, Status> {
844        self.handle_blocking_query(
845            request,
846            Platform::<DefaultCoreRPC>::query_addresses_infos,
847            "get_addresses_infos",
848        )
849        .await
850    }
851
852    async fn get_addresses_trunk_state(
853        &self,
854        request: Request<GetAddressesTrunkStateRequest>,
855    ) -> Result<Response<GetAddressesTrunkStateResponse>, Status> {
856        self.handle_blocking_query(
857            request,
858            Platform::<DefaultCoreRPC>::query_addresses_trunk_state,
859            "get_addresses_trunk_state",
860        )
861        .await
862    }
863
864    async fn get_addresses_branch_state(
865        &self,
866        request: Request<GetAddressesBranchStateRequest>,
867    ) -> Result<Response<GetAddressesBranchStateResponse>, Status> {
868        self.handle_blocking_query(
869            request,
870            Platform::<DefaultCoreRPC>::query_addresses_branch_state,
871            "get_addresses_branch_state",
872        )
873        .await
874    }
875
876    async fn get_recent_address_balance_changes(
877        &self,
878        request: Request<GetRecentAddressBalanceChangesRequest>,
879    ) -> Result<Response<GetRecentAddressBalanceChangesResponse>, Status> {
880        self.handle_blocking_query(
881            request,
882            Platform::<DefaultCoreRPC>::query_recent_address_balance_changes,
883            "get_recent_address_balance_changes",
884        )
885        .await
886    }
887
888    async fn get_recent_compacted_address_balance_changes(
889        &self,
890        request: Request<GetRecentCompactedAddressBalanceChangesRequest>,
891    ) -> Result<Response<GetRecentCompactedAddressBalanceChangesResponse>, Status> {
892        self.handle_blocking_query(
893            request,
894            Platform::<DefaultCoreRPC>::query_recent_compacted_address_balance_changes,
895            "get_recent_compacted_address_balance_changes",
896        )
897        .await
898    }
899
900    async fn get_shielded_encrypted_notes(
901        &self,
902        request: Request<GetShieldedEncryptedNotesRequest>,
903    ) -> Result<Response<GetShieldedEncryptedNotesResponse>, Status> {
904        self.handle_blocking_query(
905            request,
906            Platform::<DefaultCoreRPC>::query_shielded_encrypted_notes,
907            "get_shielded_encrypted_notes",
908        )
909        .await
910    }
911
912    async fn get_shielded_anchors(
913        &self,
914        request: Request<GetShieldedAnchorsRequest>,
915    ) -> Result<Response<GetShieldedAnchorsResponse>, Status> {
916        self.handle_blocking_query(
917            request,
918            Platform::<DefaultCoreRPC>::query_shielded_anchors,
919            "get_shielded_anchors",
920        )
921        .await
922    }
923
924    async fn get_most_recent_shielded_anchor(
925        &self,
926        request: Request<GetMostRecentShieldedAnchorRequest>,
927    ) -> Result<Response<GetMostRecentShieldedAnchorResponse>, Status> {
928        self.handle_blocking_query(
929            request,
930            Platform::<DefaultCoreRPC>::query_most_recent_shielded_anchor,
931            "get_most_recent_shielded_anchor",
932        )
933        .await
934    }
935
936    async fn get_shielded_pool_state(
937        &self,
938        request: Request<GetShieldedPoolStateRequest>,
939    ) -> Result<Response<GetShieldedPoolStateResponse>, Status> {
940        self.handle_blocking_query(
941            request,
942            Platform::<DefaultCoreRPC>::query_shielded_pool_state,
943            "get_shielded_pool_state",
944        )
945        .await
946    }
947
948    async fn get_shielded_notes_count(
949        &self,
950        request: Request<GetShieldedNotesCountRequest>,
951    ) -> Result<Response<GetShieldedNotesCountResponse>, Status> {
952        self.handle_blocking_query(
953            request,
954            Platform::<DefaultCoreRPC>::query_shielded_notes_count,
955            "get_shielded_notes_count",
956        )
957        .await
958    }
959
960    async fn get_shielded_nullifiers(
961        &self,
962        request: Request<GetShieldedNullifiersRequest>,
963    ) -> Result<Response<GetShieldedNullifiersResponse>, Status> {
964        self.handle_blocking_query(
965            request,
966            Platform::<DefaultCoreRPC>::query_shielded_nullifiers,
967            "get_shielded_nullifiers",
968        )
969        .await
970    }
971}
972
973#[async_trait]
974impl DriveInternal for QueryService {
975    async fn get_proofs(
976        &self,
977        request: Request<GetProofsRequest>,
978    ) -> Result<Response<GetProofsResponse>, Status> {
979        self.handle_blocking_query(
980            request,
981            Platform::<DefaultCoreRPC>::query_proofs,
982            "get_proofs",
983        )
984        .await
985    }
986}
987
988fn query_error_into_status(error: QueryError) -> Status {
989    match error {
990        QueryError::NotFound(message) => Status::not_found(message),
991        QueryError::InvalidArgument(message) => Status::invalid_argument(message),
992        QueryError::Query(error) => Status::invalid_argument(error.to_string()),
993        _ => {
994            tracing::error!("unexpected query error: {:?}", error);
995
996            Status::unknown(error.to_string())
997        }
998    }
999}
1000
1001fn error_into_status(error: Error) -> Status {
1002    Status::internal(format!("query: {}", error))
1003}