drive_abci/query/
service.rs

1use crate::error::query::QueryError;
2use crate::error::Error;
3use crate::metrics::{abci_response_code_metric_label, query_duration_metric};
4use crate::platform_types::platform::Platform;
5use crate::platform_types::platform_state::PlatformState;
6use crate::platform_types::platform_state::PlatformStateV0Methods;
7use crate::query::QueryValidationResult;
8use crate::rpc::core::DefaultCoreRPC;
9use crate::utils::spawn_blocking_task_with_name_if_supported;
10use async_trait::async_trait;
11use dapi_grpc::drive::v0::drive_internal_server::DriveInternal;
12use dapi_grpc::drive::v0::{GetProofsRequest, GetProofsResponse};
13use dapi_grpc::platform::v0::platform_server::Platform as PlatformService;
14use dapi_grpc::platform::v0::{
15    BroadcastStateTransitionRequest, BroadcastStateTransitionResponse, GetAddressInfoRequest,
16    GetAddressInfoResponse, GetAddressesBranchStateRequest, GetAddressesBranchStateResponse,
17    GetAddressesInfosRequest, GetAddressesInfosResponse, GetAddressesTrunkStateRequest,
18    GetAddressesTrunkStateResponse, GetConsensusParamsRequest, GetConsensusParamsResponse,
19    GetContestedResourceIdentityVotesRequest, GetContestedResourceIdentityVotesResponse,
20    GetContestedResourceVoteStateRequest, GetContestedResourceVoteStateResponse,
21    GetContestedResourceVotersForIdentityRequest, GetContestedResourceVotersForIdentityResponse,
22    GetContestedResourcesRequest, GetContestedResourcesResponse, GetCurrentQuorumsInfoRequest,
23    GetCurrentQuorumsInfoResponse, GetDataContractHistoryRequest, GetDataContractHistoryResponse,
24    GetDataContractRequest, GetDataContractResponse, GetDataContractsRequest,
25    GetDataContractsResponse, GetDocumentsRequest, GetDocumentsResponse, GetEpochsInfoRequest,
26    GetEpochsInfoResponse, GetEvonodesProposedEpochBlocksByIdsRequest,
27    GetEvonodesProposedEpochBlocksByRangeRequest, GetEvonodesProposedEpochBlocksResponse,
28    GetFinalizedEpochInfosRequest, GetFinalizedEpochInfosResponse, GetGroupActionSignersRequest,
29    GetGroupActionSignersResponse, GetGroupActionsRequest, GetGroupActionsResponse,
30    GetGroupInfoRequest, GetGroupInfoResponse, GetGroupInfosRequest, GetGroupInfosResponse,
31    GetIdentitiesBalancesRequest, GetIdentitiesBalancesResponse, GetIdentitiesContractKeysRequest,
32    GetIdentitiesContractKeysResponse, GetIdentitiesTokenBalancesRequest,
33    GetIdentitiesTokenBalancesResponse, GetIdentitiesTokenInfosRequest,
34    GetIdentitiesTokenInfosResponse, GetIdentityBalanceAndRevisionRequest,
35    GetIdentityBalanceAndRevisionResponse, GetIdentityBalanceRequest, GetIdentityBalanceResponse,
36    GetIdentityByNonUniquePublicKeyHashRequest, GetIdentityByNonUniquePublicKeyHashResponse,
37    GetIdentityByPublicKeyHashRequest, GetIdentityByPublicKeyHashResponse,
38    GetIdentityContractNonceRequest, GetIdentityContractNonceResponse, GetIdentityKeysRequest,
39    GetIdentityKeysResponse, GetIdentityNonceRequest, GetIdentityNonceResponse, GetIdentityRequest,
40    GetIdentityResponse, GetIdentityTokenBalancesRequest, GetIdentityTokenBalancesResponse,
41    GetIdentityTokenInfosRequest, GetIdentityTokenInfosResponse,
42    GetMostRecentShieldedAnchorRequest, GetMostRecentShieldedAnchorResponse,
43    GetNullifiersBranchStateRequest, GetNullifiersBranchStateResponse,
44    GetNullifiersTrunkStateRequest, GetNullifiersTrunkStateResponse, GetPathElementsRequest,
45    GetPathElementsResponse, GetPrefundedSpecializedBalanceRequest,
46    GetPrefundedSpecializedBalanceResponse, GetProtocolVersionUpgradeStateRequest,
47    GetProtocolVersionUpgradeStateResponse, GetProtocolVersionUpgradeVoteStatusRequest,
48    GetProtocolVersionUpgradeVoteStatusResponse, GetRecentAddressBalanceChangesRequest,
49    GetRecentAddressBalanceChangesResponse, GetRecentCompactedAddressBalanceChangesRequest,
50    GetRecentCompactedAddressBalanceChangesResponse, GetRecentCompactedNullifierChangesRequest,
51    GetRecentCompactedNullifierChangesResponse, GetRecentNullifierChangesRequest,
52    GetRecentNullifierChangesResponse, GetShieldedAnchorsRequest, GetShieldedAnchorsResponse,
53    GetShieldedEncryptedNotesRequest, GetShieldedEncryptedNotesResponse,
54    GetShieldedNullifiersRequest, GetShieldedNullifiersResponse, GetShieldedPoolStateRequest,
55    GetShieldedPoolStateResponse, GetStatusRequest, GetStatusResponse, GetTokenContractInfoRequest,
56    GetTokenContractInfoResponse, GetTokenDirectPurchasePricesRequest,
57    GetTokenDirectPurchasePricesResponse, GetTokenPerpetualDistributionLastClaimRequest,
58    GetTokenPerpetualDistributionLastClaimResponse, GetTokenPreProgrammedDistributionsRequest,
59    GetTokenPreProgrammedDistributionsResponse, GetTokenStatusesRequest, GetTokenStatusesResponse,
60    GetTokenTotalSupplyRequest, GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest,
61    GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse,
62    WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse,
63};
64use dapi_grpc::tonic::{Code, Request, Response, Status};
65use dpp::version::PlatformVersion;
66use std::fmt::Debug;
67use std::sync::atomic::Ordering;
68use std::sync::Arc;
69use std::thread::sleep;
70use std::time::Duration;
71use tracing::Instrument;
72
73/// Service to handle platform queries
74pub struct QueryService {
75    platform: Arc<Platform<DefaultCoreRPC>>,
76}
77
78type QueryMethod<RQ, RS> = fn(
79    &Platform<DefaultCoreRPC>,
80    RQ,
81    &PlatformState,
82    &PlatformVersion,
83) -> Result<QueryValidationResult<RS>, Error>;
84
85impl QueryService {
86    /// Creates new QueryService
87    pub fn new(platform: Arc<Platform<DefaultCoreRPC>>) -> Self {
88        Self { platform }
89    }
90
91    async fn handle_blocking_query<RQ, RS>(
92        &self,
93        request: Request<RQ>,
94        query_method: QueryMethod<RQ, RS>,
95        endpoint_name: &str,
96    ) -> Result<Response<RS>, Status>
97    where
98        RS: Clone + Send + 'static,
99        RQ: Debug + Send + Clone + 'static,
100    {
101        let mut response_duration_metric = query_duration_metric(endpoint_name);
102
103        let platform = Arc::clone(&self.platform);
104
105        let request_debug = format!("{:?}", &request);
106
107        let result = spawn_blocking_task_with_name_if_supported("query", move || {
108            let mut result;
109
110            let query_request = request.into_inner();
111
112            let mut query_counter = 0;
113
114            loop {
115                let platform_state = platform.state.load();
116
117                let platform_version = platform_state
118                    .current_platform_version()
119                    .map_err(|_| Status::unavailable("platform is not initialized"))?;
120
121                // Query is using Platform execution state and Drive state to during the execution.
122                // They are updating every block in finalize block ABCI handler.
123                // The problem is that these two operations aren't atomic and some latency between
124                // them could lead to data races. `committed_block_height_guard` counter that represents
125                // the latest the height of latest committed Drive state and logic bellow ensures
126                // that query is executed only after/before both states are updated.
127                let mut needs_restart = false;
128
129                loop {
130                    let committed_block_height_guard = platform
131                        .committed_block_height_guard
132                        .load(Ordering::Relaxed);
133                    let mut counter = 0;
134                    if platform_state.last_committed_block_height() == committed_block_height_guard
135                    {
136                        break;
137                    } else {
138                        counter += 1;
139                        sleep(Duration::from_millis(10))
140                    }
141
142                    // We try for up to 1 second
143                    if counter >= 100 {
144                        query_counter += 1;
145                        needs_restart = true;
146                        break;
147                    }
148                }
149
150                if query_counter > 3 {
151                    return Err(query_error_into_status(QueryError::NotServiceable(
152                        "platform is saturated (did not attempt query)".to_string(),
153                    )));
154                }
155
156                if needs_restart {
157                    continue;
158                }
159
160                result = query_method(
161                    &platform,
162                    query_request.clone(),
163                    &platform_state,
164                    platform_version,
165                );
166
167                let committed_block_height_guard = platform
168                    .committed_block_height_guard
169                    .load(Ordering::Relaxed);
170
171                if platform_state.last_committed_block_height() == committed_block_height_guard {
172                    // in this case the query almost certainly executed correctly
173                    break;
174                } else {
175                    query_counter += 1;
176
177                    if query_counter > 2 {
178                        // This should never be possible
179                        return Err(query_error_into_status(QueryError::NotServiceable(
180                            "platform is saturated".to_string(),
181                        )));
182                    }
183                }
184            }
185
186            let mut query_result = result.map_err(error_into_status)?;
187
188            if query_result.is_valid() {
189                let response = query_result
190                    .into_data()
191                    .map_err(|error| error_into_status(error.into()))?;
192
193                Ok(Response::new(response))
194            } else {
195                let error = query_result.errors.swap_remove(0);
196
197                Err(query_error_into_status(error))
198            }
199        })?
200        .instrument(tracing::trace_span!("query", endpoint_name))
201        .await
202        .map_err(|error| Status::internal(format!("query thread failed: {}", error)))?;
203
204        // Query logging and metrics
205        let code = match &result {
206            Ok(_) => Code::Ok,
207            Err(status) => status.code(),
208        };
209
210        let code_label = format!("{:?}", code).to_lowercase();
211
212        // Add code to response duration metric
213        let label = abci_response_code_metric_label(code);
214        response_duration_metric.add_label(label);
215
216        match code {
217            // User errors
218            Code::Ok
219            | Code::InvalidArgument
220            | Code::NotFound
221            | Code::AlreadyExists
222            | Code::ResourceExhausted
223            | Code::PermissionDenied
224            | Code::Unavailable
225            | Code::Aborted
226            | Code::FailedPrecondition
227            | Code::OutOfRange
228            | Code::Cancelled
229            | Code::DeadlineExceeded
230            | Code::Unauthenticated => {
231                let elapsed_time = response_duration_metric.elapsed().as_secs_f64();
232
233                tracing::trace!(
234                    request = request_debug,
235                    elapsed_time,
236                    endpoint_name,
237                    code = code_label,
238                    "query '{}' executed with code {:?} in {} secs",
239                    endpoint_name,
240                    code,
241                    elapsed_time
242                );
243            }
244            // System errors
245            Code::Unknown | Code::Unimplemented | Code::Internal | Code::DataLoss => {
246                tracing::error!(
247                    request = request_debug,
248                    endpoint_name,
249                    code = code_label,
250                    "query '{}' execution failed with code {:?}",
251                    endpoint_name,
252                    code
253                );
254            }
255        }
256
257        result
258    }
259}
260
261fn respond_with_unimplemented<RS>(name: &str) -> Result<Response<RS>, Status> {
262    tracing::error!("{} endpoint is called but it's not supported", name);
263
264    Err(Status::unimplemented("the endpoint is not supported"))
265}
266
267#[async_trait]
268impl PlatformService for QueryService {
269    async fn broadcast_state_transition(
270        &self,
271        _request: Request<BroadcastStateTransitionRequest>,
272    ) -> Result<Response<BroadcastStateTransitionResponse>, Status> {
273        respond_with_unimplemented("broadcast_state_transition")
274    }
275
276    async fn get_identity(
277        &self,
278        request: Request<GetIdentityRequest>,
279    ) -> Result<Response<GetIdentityResponse>, Status> {
280        self.handle_blocking_query(
281            request,
282            Platform::<DefaultCoreRPC>::query_identity,
283            "get_identity",
284        )
285        .await
286    }
287
288    async fn get_identities_contract_keys(
289        &self,
290        request: Request<GetIdentitiesContractKeysRequest>,
291    ) -> Result<Response<GetIdentitiesContractKeysResponse>, Status> {
292        self.handle_blocking_query(
293            request,
294            Platform::<DefaultCoreRPC>::query_identities_contract_keys,
295            "get_identities_contract_keys",
296        )
297        .await
298    }
299
300    async fn get_identity_keys(
301        &self,
302        request: Request<GetIdentityKeysRequest>,
303    ) -> Result<Response<GetIdentityKeysResponse>, Status> {
304        self.handle_blocking_query(
305            request,
306            Platform::<DefaultCoreRPC>::query_keys,
307            "get_identity_keys",
308        )
309        .await
310    }
311
312    async fn get_identity_nonce(
313        &self,
314        request: Request<GetIdentityNonceRequest>,
315    ) -> Result<Response<GetIdentityNonceResponse>, Status> {
316        self.handle_blocking_query(
317            request,
318            Platform::<DefaultCoreRPC>::query_identity_nonce,
319            "get_identity_nonce",
320        )
321        .await
322    }
323
324    async fn get_identity_contract_nonce(
325        &self,
326        request: Request<GetIdentityContractNonceRequest>,
327    ) -> Result<Response<GetIdentityContractNonceResponse>, Status> {
328        self.handle_blocking_query(
329            request,
330            Platform::<DefaultCoreRPC>::query_identity_contract_nonce,
331            "get_identity_contract_nonce",
332        )
333        .await
334    }
335
336    async fn get_identity_balance(
337        &self,
338        request: Request<GetIdentityBalanceRequest>,
339    ) -> Result<Response<GetIdentityBalanceResponse>, Status> {
340        self.handle_blocking_query(
341            request,
342            Platform::<DefaultCoreRPC>::query_balance,
343            "get_identity_balance",
344        )
345        .await
346    }
347
348    async fn get_identity_balance_and_revision(
349        &self,
350        request: Request<GetIdentityBalanceAndRevisionRequest>,
351    ) -> Result<Response<GetIdentityBalanceAndRevisionResponse>, Status> {
352        self.handle_blocking_query(
353            request,
354            Platform::<DefaultCoreRPC>::query_balance_and_revision,
355            "get_identity_balance_and_revision",
356        )
357        .await
358    }
359
360    async fn get_data_contract(
361        &self,
362        request: Request<GetDataContractRequest>,
363    ) -> Result<Response<GetDataContractResponse>, Status> {
364        self.handle_blocking_query(
365            request,
366            Platform::<DefaultCoreRPC>::query_data_contract,
367            "get_data_contract",
368        )
369        .await
370    }
371
372    async fn get_data_contract_history(
373        &self,
374        request: Request<GetDataContractHistoryRequest>,
375    ) -> Result<Response<GetDataContractHistoryResponse>, Status> {
376        self.handle_blocking_query(
377            request,
378            Platform::<DefaultCoreRPC>::query_data_contract_history,
379            "get_data_contract_history",
380        )
381        .await
382    }
383
384    async fn get_data_contracts(
385        &self,
386        request: Request<GetDataContractsRequest>,
387    ) -> Result<Response<GetDataContractsResponse>, Status> {
388        self.handle_blocking_query(
389            request,
390            Platform::<DefaultCoreRPC>::query_data_contracts,
391            "get_data_contracts",
392        )
393        .await
394    }
395
396    async fn get_documents(
397        &self,
398        request: Request<GetDocumentsRequest>,
399    ) -> Result<Response<GetDocumentsResponse>, Status> {
400        self.handle_blocking_query(
401            request,
402            Platform::<DefaultCoreRPC>::query_documents,
403            "get_documents",
404        )
405        .await
406    }
407
408    async fn get_identity_by_public_key_hash(
409        &self,
410        request: Request<GetIdentityByPublicKeyHashRequest>,
411    ) -> Result<Response<GetIdentityByPublicKeyHashResponse>, Status> {
412        self.handle_blocking_query(
413            request,
414            Platform::<DefaultCoreRPC>::query_identity_by_public_key_hash,
415            "get_identity_by_public_key_hash",
416        )
417        .await
418    }
419
420    async fn get_identity_by_non_unique_public_key_hash(
421        &self,
422        request: Request<GetIdentityByNonUniquePublicKeyHashRequest>,
423    ) -> Result<Response<GetIdentityByNonUniquePublicKeyHashResponse>, Status> {
424        self.handle_blocking_query(
425            request,
426            Platform::<DefaultCoreRPC>::query_identity_by_non_unique_public_key_hash,
427            "get_identity_by_non_unique_public_key_hash",
428        )
429        .await
430    }
431
432    async fn wait_for_state_transition_result(
433        &self,
434        _request: Request<WaitForStateTransitionResultRequest>,
435    ) -> Result<Response<WaitForStateTransitionResultResponse>, Status> {
436        respond_with_unimplemented("wait_for_state_transition_result")
437    }
438
439    async fn get_consensus_params(
440        &self,
441        _request: Request<GetConsensusParamsRequest>,
442    ) -> Result<Response<GetConsensusParamsResponse>, Status> {
443        respond_with_unimplemented("get_consensus_params")
444    }
445
446    async fn get_protocol_version_upgrade_state(
447        &self,
448        request: Request<GetProtocolVersionUpgradeStateRequest>,
449    ) -> Result<Response<GetProtocolVersionUpgradeStateResponse>, Status> {
450        self.handle_blocking_query(
451            request,
452            Platform::<DefaultCoreRPC>::query_version_upgrade_state,
453            "get_protocol_version_upgrade_state",
454        )
455        .await
456    }
457
458    async fn get_protocol_version_upgrade_vote_status(
459        &self,
460        request: Request<GetProtocolVersionUpgradeVoteStatusRequest>,
461    ) -> Result<Response<GetProtocolVersionUpgradeVoteStatusResponse>, Status> {
462        self.handle_blocking_query(
463            request,
464            Platform::<DefaultCoreRPC>::query_version_upgrade_vote_status,
465            "get_protocol_version_upgrade_vote_status",
466        )
467        .await
468    }
469
470    async fn get_epochs_info(
471        &self,
472        request: Request<GetEpochsInfoRequest>,
473    ) -> Result<Response<GetEpochsInfoResponse>, Status> {
474        self.handle_blocking_query(
475            request,
476            Platform::<DefaultCoreRPC>::query_epoch_infos,
477            "get_epochs_info",
478        )
479        .await
480    }
481
482    async fn get_path_elements(
483        &self,
484        request: Request<GetPathElementsRequest>,
485    ) -> Result<Response<GetPathElementsResponse>, Status> {
486        self.handle_blocking_query(
487            request,
488            Platform::<DefaultCoreRPC>::query_path_elements,
489            "get_path_elements",
490        )
491        .await
492    }
493
494    async fn get_contested_resources(
495        &self,
496        request: Request<GetContestedResourcesRequest>,
497    ) -> Result<Response<GetContestedResourcesResponse>, Status> {
498        self.handle_blocking_query(
499            request,
500            Platform::<DefaultCoreRPC>::query_contested_resources,
501            "get_contested_resources",
502        )
503        .await
504    }
505
506    async fn get_contested_resource_vote_state(
507        &self,
508        request: Request<GetContestedResourceVoteStateRequest>,
509    ) -> Result<Response<GetContestedResourceVoteStateResponse>, Status> {
510        self.handle_blocking_query(
511            request,
512            Platform::<DefaultCoreRPC>::query_contested_resource_vote_state,
513            "get_contested_resource_vote_state",
514        )
515        .await
516    }
517
518    async fn get_contested_resource_voters_for_identity(
519        &self,
520        request: Request<GetContestedResourceVotersForIdentityRequest>,
521    ) -> Result<Response<GetContestedResourceVotersForIdentityResponse>, Status> {
522        self.handle_blocking_query(
523            request,
524            Platform::<DefaultCoreRPC>::query_contested_resource_voters_for_identity,
525            "get_contested_resource_voters_for_identity",
526        )
527        .await
528    }
529
530    async fn get_contested_resource_identity_votes(
531        &self,
532        request: Request<GetContestedResourceIdentityVotesRequest>,
533    ) -> Result<Response<GetContestedResourceIdentityVotesResponse>, Status> {
534        self.handle_blocking_query(
535            request,
536            Platform::<DefaultCoreRPC>::query_contested_resource_identity_votes,
537            "get_contested_resource_identity_votes",
538        )
539        .await
540    }
541
542    async fn get_vote_polls_by_end_date(
543        &self,
544        request: Request<GetVotePollsByEndDateRequest>,
545    ) -> Result<Response<GetVotePollsByEndDateResponse>, Status> {
546        self.handle_blocking_query(
547            request,
548            Platform::<DefaultCoreRPC>::query_vote_polls_by_end_date_query,
549            "get_vote_polls_by_end_date",
550        )
551        .await
552    }
553
554    async fn get_prefunded_specialized_balance(
555        &self,
556        request: Request<GetPrefundedSpecializedBalanceRequest>,
557    ) -> Result<Response<GetPrefundedSpecializedBalanceResponse>, Status> {
558        self.handle_blocking_query(
559            request,
560            Platform::<DefaultCoreRPC>::query_prefunded_specialized_balance,
561            "get_prefunded_specialized_balance",
562        )
563        .await
564    }
565
566    async fn get_total_credits_in_platform(
567        &self,
568        request: Request<GetTotalCreditsInPlatformRequest>,
569    ) -> Result<Response<GetTotalCreditsInPlatformResponse>, Status> {
570        self.handle_blocking_query(
571            request,
572            Platform::<DefaultCoreRPC>::query_total_credits_in_platform,
573            "get_total_credits_in_platform",
574        )
575        .await
576    }
577
578    async fn get_identities_balances(
579        &self,
580        request: Request<GetIdentitiesBalancesRequest>,
581    ) -> Result<Response<GetIdentitiesBalancesResponse>, Status> {
582        self.handle_blocking_query(
583            request,
584            Platform::<DefaultCoreRPC>::query_identities_balances,
585            "get_identities_balances",
586        )
587        .await
588    }
589
590    async fn get_status(
591        &self,
592        request: Request<GetStatusRequest>,
593    ) -> Result<Response<GetStatusResponse>, Status> {
594        self.handle_blocking_query(
595            request,
596            Platform::<DefaultCoreRPC>::query_partial_status,
597            "query_partial_status",
598        )
599        .await
600    }
601
602    async fn get_evonodes_proposed_epoch_blocks_by_ids(
603        &self,
604        request: Request<GetEvonodesProposedEpochBlocksByIdsRequest>,
605    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
606        self.handle_blocking_query(
607            request,
608            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_evonode_ids,
609            "query_proposed_block_counts_by_evonode_ids",
610        )
611        .await
612    }
613
614    async fn get_evonodes_proposed_epoch_blocks_by_range(
615        &self,
616        request: Request<GetEvonodesProposedEpochBlocksByRangeRequest>,
617    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
618        self.handle_blocking_query(
619            request,
620            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_range,
621            "query_proposed_block_counts_by_range",
622        )
623        .await
624    }
625
626    async fn get_current_quorums_info(
627        &self,
628        request: Request<GetCurrentQuorumsInfoRequest>,
629    ) -> Result<Response<GetCurrentQuorumsInfoResponse>, Status> {
630        self.handle_blocking_query(
631            request,
632            Platform::<DefaultCoreRPC>::query_current_quorums_info,
633            "query_current_quorums_info",
634        )
635        .await
636    }
637
638    async fn get_identity_token_balances(
639        &self,
640        request: Request<GetIdentityTokenBalancesRequest>,
641    ) -> Result<Response<GetIdentityTokenBalancesResponse>, Status> {
642        self.handle_blocking_query(
643            request,
644            Platform::<DefaultCoreRPC>::query_identity_token_balances,
645            "query_identity_token_balances",
646        )
647        .await
648    }
649
650    async fn get_identities_token_balances(
651        &self,
652        request: Request<GetIdentitiesTokenBalancesRequest>,
653    ) -> Result<Response<GetIdentitiesTokenBalancesResponse>, Status> {
654        self.handle_blocking_query(
655            request,
656            Platform::<DefaultCoreRPC>::query_identities_token_balances,
657            "query_identities_token_balances",
658        )
659        .await
660    }
661
662    async fn get_identity_token_infos(
663        &self,
664        request: Request<GetIdentityTokenInfosRequest>,
665    ) -> Result<Response<GetIdentityTokenInfosResponse>, Status> {
666        self.handle_blocking_query(
667            request,
668            Platform::<DefaultCoreRPC>::query_identity_token_infos,
669            "query_identity_token_infos",
670        )
671        .await
672    }
673
674    async fn get_identities_token_infos(
675        &self,
676        request: Request<GetIdentitiesTokenInfosRequest>,
677    ) -> Result<Response<GetIdentitiesTokenInfosResponse>, Status> {
678        self.handle_blocking_query(
679            request,
680            Platform::<DefaultCoreRPC>::query_identities_token_infos,
681            "query_identities_token_infos",
682        )
683        .await
684    }
685
686    async fn get_token_statuses(
687        &self,
688        request: Request<GetTokenStatusesRequest>,
689    ) -> Result<Response<GetTokenStatusesResponse>, Status> {
690        self.handle_blocking_query(
691            request,
692            Platform::<DefaultCoreRPC>::query_token_statuses,
693            "get_token_statuses",
694        )
695        .await
696    }
697
698    async fn get_token_pre_programmed_distributions(
699        &self,
700        request: Request<GetTokenPreProgrammedDistributionsRequest>,
701    ) -> Result<Response<GetTokenPreProgrammedDistributionsResponse>, Status> {
702        self.handle_blocking_query(
703            request,
704            Platform::<DefaultCoreRPC>::query_token_pre_programmed_distributions,
705            "get_token_pre_programmed_distributions",
706        )
707        .await
708    }
709
710    async fn get_token_total_supply(
711        &self,
712        request: Request<GetTokenTotalSupplyRequest>,
713    ) -> Result<Response<GetTokenTotalSupplyResponse>, Status> {
714        self.handle_blocking_query(
715            request,
716            Platform::<DefaultCoreRPC>::query_token_total_supply,
717            "get_token_total_supply",
718        )
719        .await
720    }
721
722    async fn get_group_info(
723        &self,
724        request: Request<GetGroupInfoRequest>,
725    ) -> Result<Response<GetGroupInfoResponse>, Status> {
726        self.handle_blocking_query(
727            request,
728            Platform::<DefaultCoreRPC>::query_group_info,
729            "get_group_info",
730        )
731        .await
732    }
733
734    async fn get_group_infos(
735        &self,
736        request: Request<GetGroupInfosRequest>,
737    ) -> Result<Response<GetGroupInfosResponse>, Status> {
738        self.handle_blocking_query(
739            request,
740            Platform::<DefaultCoreRPC>::query_group_infos,
741            "get_group_infos",
742        )
743        .await
744    }
745
746    async fn get_group_actions(
747        &self,
748        request: Request<GetGroupActionsRequest>,
749    ) -> Result<Response<GetGroupActionsResponse>, Status> {
750        self.handle_blocking_query(
751            request,
752            Platform::<DefaultCoreRPC>::query_group_actions,
753            "get_group_actions",
754        )
755        .await
756    }
757
758    async fn get_group_action_signers(
759        &self,
760        request: Request<GetGroupActionSignersRequest>,
761    ) -> Result<Response<GetGroupActionSignersResponse>, Status> {
762        self.handle_blocking_query(
763            request,
764            Platform::<DefaultCoreRPC>::query_group_action_signers,
765            "get_group_action_signers",
766        )
767        .await
768    }
769
770    async fn get_token_direct_purchase_prices(
771        &self,
772        request: Request<GetTokenDirectPurchasePricesRequest>,
773    ) -> Result<Response<GetTokenDirectPurchasePricesResponse>, Status> {
774        self.handle_blocking_query(
775            request,
776            Platform::<DefaultCoreRPC>::query_token_direct_purchase_prices,
777            "get_token_direct_purchase_prices",
778        )
779        .await
780    }
781
782    async fn get_token_contract_info(
783        &self,
784        request: Request<GetTokenContractInfoRequest>,
785    ) -> Result<Response<GetTokenContractInfoResponse>, Status> {
786        self.handle_blocking_query(
787            request,
788            Platform::<DefaultCoreRPC>::query_token_contract_info,
789            "get_token_contract_info",
790        )
791        .await
792    }
793
794    async fn get_token_perpetual_distribution_last_claim(
795        &self,
796        request: Request<GetTokenPerpetualDistributionLastClaimRequest>,
797    ) -> Result<Response<GetTokenPerpetualDistributionLastClaimResponse>, Status> {
798        self.handle_blocking_query(
799            request,
800            Platform::<DefaultCoreRPC>::query_token_perpetual_distribution_last_claim,
801            "get_token_perpetual_distribution_last_claim",
802        )
803        .await
804    }
805
806    async fn get_finalized_epoch_infos(
807        &self,
808        request: Request<GetFinalizedEpochInfosRequest>,
809    ) -> Result<Response<GetFinalizedEpochInfosResponse>, Status> {
810        self.handle_blocking_query(
811            request,
812            Platform::<DefaultCoreRPC>::query_finalized_epoch_infos,
813            "get_finalized_epoch_infos",
814        )
815        .await
816    }
817
818    async fn get_address_info(
819        &self,
820        request: Request<GetAddressInfoRequest>,
821    ) -> Result<Response<GetAddressInfoResponse>, Status> {
822        self.handle_blocking_query(
823            request,
824            Platform::<DefaultCoreRPC>::query_address_info,
825            "get_address_info",
826        )
827        .await
828    }
829
830    async fn get_addresses_infos(
831        &self,
832        request: Request<GetAddressesInfosRequest>,
833    ) -> Result<Response<GetAddressesInfosResponse>, Status> {
834        self.handle_blocking_query(
835            request,
836            Platform::<DefaultCoreRPC>::query_addresses_infos,
837            "get_addresses_infos",
838        )
839        .await
840    }
841
842    async fn get_addresses_trunk_state(
843        &self,
844        request: Request<GetAddressesTrunkStateRequest>,
845    ) -> Result<Response<GetAddressesTrunkStateResponse>, Status> {
846        self.handle_blocking_query(
847            request,
848            Platform::<DefaultCoreRPC>::query_addresses_trunk_state,
849            "get_addresses_trunk_state",
850        )
851        .await
852    }
853
854    async fn get_addresses_branch_state(
855        &self,
856        request: Request<GetAddressesBranchStateRequest>,
857    ) -> Result<Response<GetAddressesBranchStateResponse>, Status> {
858        self.handle_blocking_query(
859            request,
860            Platform::<DefaultCoreRPC>::query_addresses_branch_state,
861            "get_addresses_branch_state",
862        )
863        .await
864    }
865
866    async fn get_recent_address_balance_changes(
867        &self,
868        request: Request<GetRecentAddressBalanceChangesRequest>,
869    ) -> Result<Response<GetRecentAddressBalanceChangesResponse>, Status> {
870        self.handle_blocking_query(
871            request,
872            Platform::<DefaultCoreRPC>::query_recent_address_balance_changes,
873            "get_recent_address_balance_changes",
874        )
875        .await
876    }
877
878    async fn get_recent_compacted_address_balance_changes(
879        &self,
880        request: Request<GetRecentCompactedAddressBalanceChangesRequest>,
881    ) -> Result<Response<GetRecentCompactedAddressBalanceChangesResponse>, Status> {
882        self.handle_blocking_query(
883            request,
884            Platform::<DefaultCoreRPC>::query_recent_compacted_address_balance_changes,
885            "get_recent_compacted_address_balance_changes",
886        )
887        .await
888    }
889
890    async fn get_shielded_encrypted_notes(
891        &self,
892        request: Request<GetShieldedEncryptedNotesRequest>,
893    ) -> Result<Response<GetShieldedEncryptedNotesResponse>, Status> {
894        self.handle_blocking_query(
895            request,
896            Platform::<DefaultCoreRPC>::query_shielded_encrypted_notes,
897            "get_shielded_encrypted_notes",
898        )
899        .await
900    }
901
902    async fn get_shielded_anchors(
903        &self,
904        request: Request<GetShieldedAnchorsRequest>,
905    ) -> Result<Response<GetShieldedAnchorsResponse>, Status> {
906        self.handle_blocking_query(
907            request,
908            Platform::<DefaultCoreRPC>::query_shielded_anchors,
909            "get_shielded_anchors",
910        )
911        .await
912    }
913
914    async fn get_most_recent_shielded_anchor(
915        &self,
916        request: Request<GetMostRecentShieldedAnchorRequest>,
917    ) -> Result<Response<GetMostRecentShieldedAnchorResponse>, Status> {
918        self.handle_blocking_query(
919            request,
920            Platform::<DefaultCoreRPC>::query_most_recent_shielded_anchor,
921            "get_most_recent_shielded_anchor",
922        )
923        .await
924    }
925
926    async fn get_shielded_pool_state(
927        &self,
928        request: Request<GetShieldedPoolStateRequest>,
929    ) -> Result<Response<GetShieldedPoolStateResponse>, Status> {
930        self.handle_blocking_query(
931            request,
932            Platform::<DefaultCoreRPC>::query_shielded_pool_state,
933            "get_shielded_pool_state",
934        )
935        .await
936    }
937
938    async fn get_shielded_nullifiers(
939        &self,
940        request: Request<GetShieldedNullifiersRequest>,
941    ) -> Result<Response<GetShieldedNullifiersResponse>, Status> {
942        self.handle_blocking_query(
943            request,
944            Platform::<DefaultCoreRPC>::query_shielded_nullifiers,
945            "get_shielded_nullifiers",
946        )
947        .await
948    }
949
950    async fn get_nullifiers_trunk_state(
951        &self,
952        request: Request<GetNullifiersTrunkStateRequest>,
953    ) -> Result<Response<GetNullifiersTrunkStateResponse>, Status> {
954        self.handle_blocking_query(
955            request,
956            Platform::<DefaultCoreRPC>::query_nullifiers_trunk_state,
957            "get_nullifiers_trunk_state",
958        )
959        .await
960    }
961
962    async fn get_nullifiers_branch_state(
963        &self,
964        request: Request<GetNullifiersBranchStateRequest>,
965    ) -> Result<Response<GetNullifiersBranchStateResponse>, Status> {
966        self.handle_blocking_query(
967            request,
968            Platform::<DefaultCoreRPC>::query_nullifiers_branch_state,
969            "get_nullifiers_branch_state",
970        )
971        .await
972    }
973
974    async fn get_recent_nullifier_changes(
975        &self,
976        request: Request<GetRecentNullifierChangesRequest>,
977    ) -> Result<Response<GetRecentNullifierChangesResponse>, Status> {
978        self.handle_blocking_query(
979            request,
980            Platform::<DefaultCoreRPC>::query_recent_nullifier_changes,
981            "get_recent_nullifier_changes",
982        )
983        .await
984    }
985
986    async fn get_recent_compacted_nullifier_changes(
987        &self,
988        request: Request<GetRecentCompactedNullifierChangesRequest>,
989    ) -> Result<Response<GetRecentCompactedNullifierChangesResponse>, Status> {
990        self.handle_blocking_query(
991            request,
992            Platform::<DefaultCoreRPC>::query_recent_compacted_nullifier_changes,
993            "get_recent_compacted_nullifier_changes",
994        )
995        .await
996    }
997}
998
999#[async_trait]
1000impl DriveInternal for QueryService {
1001    async fn get_proofs(
1002        &self,
1003        request: Request<GetProofsRequest>,
1004    ) -> Result<Response<GetProofsResponse>, Status> {
1005        self.handle_blocking_query(
1006            request,
1007            Platform::<DefaultCoreRPC>::query_proofs,
1008            "get_proofs",
1009        )
1010        .await
1011    }
1012}
1013
1014fn query_error_into_status(error: QueryError) -> Status {
1015    match error {
1016        QueryError::NotFound(message) => Status::not_found(message),
1017        QueryError::InvalidArgument(message) => Status::invalid_argument(message),
1018        QueryError::Query(error) => Status::invalid_argument(error.to_string()),
1019        _ => {
1020            tracing::error!("unexpected query error: {:?}", error);
1021
1022            Status::unknown(error.to_string())
1023        }
1024    }
1025}
1026
1027fn error_into_status(error: Error) -> Status {
1028    Status::internal(format!("query: {}", error))
1029}