Skip to main content

drive_abci/query/
service.rs

1use crate::error::query::QueryError;
2use crate::error::Error;
3use crate::metrics::{abci_response_code_metric_label, query_duration_metric};
4use crate::platform_types::platform::Platform;
5use crate::platform_types::platform_state::PlatformState;
6use crate::platform_types::platform_state::PlatformStateV0Methods;
7use crate::query::QueryValidationResult;
8use crate::rpc::core::DefaultCoreRPC;
9use crate::utils::spawn_blocking_task_with_name_if_supported;
10use async_trait::async_trait;
11use dapi_grpc::drive::v0::drive_internal_server::DriveInternal;
12use dapi_grpc::drive::v0::{GetProofsRequest, GetProofsResponse};
13use dapi_grpc::platform::v0::platform_server::Platform as PlatformService;
14use dapi_grpc::platform::v0::{
15    BroadcastStateTransitionRequest, BroadcastStateTransitionResponse, GetAddressInfoRequest,
16    GetAddressInfoResponse, GetAddressesBranchStateRequest, GetAddressesBranchStateResponse,
17    GetAddressesInfosRequest, GetAddressesInfosResponse, GetAddressesTrunkStateRequest,
18    GetAddressesTrunkStateResponse, GetConsensusParamsRequest, GetConsensusParamsResponse,
19    GetContestedResourceIdentityVotesRequest, GetContestedResourceIdentityVotesResponse,
20    GetContestedResourceVoteStateRequest, GetContestedResourceVoteStateResponse,
21    GetContestedResourceVotersForIdentityRequest, GetContestedResourceVotersForIdentityResponse,
22    GetContestedResourcesRequest, GetContestedResourcesResponse, GetCurrentQuorumsInfoRequest,
23    GetCurrentQuorumsInfoResponse, GetDataContractHistoryRequest, GetDataContractHistoryResponse,
24    GetDataContractRequest, GetDataContractResponse, GetDataContractsRequest,
25    GetDataContractsResponse, GetDocumentHistoryRequest, GetDocumentHistoryResponse,
26    GetDocumentsRequest, GetDocumentsResponse, GetEpochsInfoRequest, GetEpochsInfoResponse,
27    GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest,
28    GetEvonodesProposedEpochBlocksResponse, GetFinalizedEpochInfosRequest,
29    GetFinalizedEpochInfosResponse, GetGroupActionSignersRequest, GetGroupActionSignersResponse,
30    GetGroupActionsRequest, GetGroupActionsResponse, GetGroupInfoRequest, GetGroupInfoResponse,
31    GetGroupInfosRequest, GetGroupInfosResponse, GetIdentitiesBalancesRequest,
32    GetIdentitiesBalancesResponse, GetIdentitiesContractKeysRequest,
33    GetIdentitiesContractKeysResponse, GetIdentitiesTokenBalancesRequest,
34    GetIdentitiesTokenBalancesResponse, GetIdentitiesTokenInfosRequest,
35    GetIdentitiesTokenInfosResponse, GetIdentityBalanceAndRevisionRequest,
36    GetIdentityBalanceAndRevisionResponse, GetIdentityBalanceRequest, GetIdentityBalanceResponse,
37    GetIdentityByNonUniquePublicKeyHashRequest, GetIdentityByNonUniquePublicKeyHashResponse,
38    GetIdentityByPublicKeyHashRequest, GetIdentityByPublicKeyHashResponse,
39    GetIdentityContractNonceRequest, GetIdentityContractNonceResponse, GetIdentityKeysRequest,
40    GetIdentityKeysResponse, GetIdentityNonceRequest, GetIdentityNonceResponse, GetIdentityRequest,
41    GetIdentityResponse, GetIdentityTokenBalancesRequest, GetIdentityTokenBalancesResponse,
42    GetIdentityTokenInfosRequest, GetIdentityTokenInfosResponse,
43    GetMostRecentShieldedAnchorRequest, GetMostRecentShieldedAnchorResponse,
44    GetNullifiersBranchStateRequest, GetNullifiersBranchStateResponse,
45    GetNullifiersTrunkStateRequest, GetNullifiersTrunkStateResponse, GetPathElementsRequest,
46    GetPathElementsResponse, GetPrefundedSpecializedBalanceRequest,
47    GetPrefundedSpecializedBalanceResponse, GetProtocolVersionUpgradeStateRequest,
48    GetProtocolVersionUpgradeStateResponse, GetProtocolVersionUpgradeVoteStatusRequest,
49    GetProtocolVersionUpgradeVoteStatusResponse, GetRecentAddressBalanceChangesRequest,
50    GetRecentAddressBalanceChangesResponse, GetRecentCompactedAddressBalanceChangesRequest,
51    GetRecentCompactedAddressBalanceChangesResponse, GetRecentCompactedNullifierChangesRequest,
52    GetRecentCompactedNullifierChangesResponse, GetRecentNullifierChangesRequest,
53    GetRecentNullifierChangesResponse, GetShieldedAnchorsRequest, GetShieldedAnchorsResponse,
54    GetShieldedEncryptedNotesRequest, GetShieldedEncryptedNotesResponse,
55    GetShieldedNotesCountRequest, GetShieldedNotesCountResponse, GetShieldedNullifiersRequest,
56    GetShieldedNullifiersResponse, GetShieldedPoolStateRequest, GetShieldedPoolStateResponse,
57    GetStatusRequest, GetStatusResponse, GetTokenContractInfoRequest, GetTokenContractInfoResponse,
58    GetTokenDirectPurchasePricesRequest, GetTokenDirectPurchasePricesResponse,
59    GetTokenPerpetualDistributionLastClaimRequest, GetTokenPerpetualDistributionLastClaimResponse,
60    GetTokenPreProgrammedDistributionsRequest, GetTokenPreProgrammedDistributionsResponse,
61    GetTokenStatusesRequest, GetTokenStatusesResponse, GetTokenTotalSupplyRequest,
62    GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest,
63    GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse,
64    WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse,
65};
66use dapi_grpc::tonic::{Code, Request, Response, Status};
67use dpp::version::PlatformVersion;
68use std::fmt::Debug;
69use std::sync::atomic::Ordering;
70use std::sync::Arc;
71use std::thread::sleep;
72use std::time::Duration;
73use tracing::Instrument;
74
75/// Service to handle platform queries
76pub struct QueryService {
77    platform: Arc<Platform<DefaultCoreRPC>>,
78}
79
80type QueryMethod<RQ, RS> = fn(
81    &Platform<DefaultCoreRPC>,
82    RQ,
83    &PlatformState,
84    &PlatformVersion,
85) -> Result<QueryValidationResult<RS>, Error>;
86
87impl QueryService {
88    /// Creates new QueryService
89    pub fn new(platform: Arc<Platform<DefaultCoreRPC>>) -> Self {
90        Self { platform }
91    }
92
93    async fn handle_blocking_query<RQ, RS>(
94        &self,
95        request: Request<RQ>,
96        query_method: QueryMethod<RQ, RS>,
97        endpoint_name: &str,
98    ) -> Result<Response<RS>, Status>
99    where
100        RS: Clone + Send + 'static,
101        RQ: Debug + Send + Clone + 'static,
102    {
103        let mut response_duration_metric = query_duration_metric(endpoint_name);
104
105        let platform = Arc::clone(&self.platform);
106
107        let request_debug = format!("{:?}", &request);
108
109        let result = spawn_blocking_task_with_name_if_supported("query", move || {
110            let mut result;
111
112            let query_request = request.into_inner();
113
114            let mut query_counter = 0;
115
116            loop {
117                let platform_state = platform.state.load();
118
119                let platform_version = platform_state
120                    .current_platform_version()
121                    .map_err(|_| Status::unavailable("platform is not initialized"))?;
122
123                // Query is using Platform execution state and Drive state to during the execution.
124                // They are updating every block in finalize block ABCI handler.
125                // The problem is that these two operations aren't atomic and some latency between
126                // them could lead to data races. `committed_block_height_guard` counter that represents
127                // the latest the height of latest committed Drive state and logic bellow ensures
128                // that query is executed only after/before both states are updated.
129                let mut needs_restart = false;
130
131                loop {
132                    let committed_block_height_guard = platform
133                        .committed_block_height_guard
134                        .load(Ordering::Relaxed);
135                    let mut counter = 0;
136                    if platform_state.last_committed_block_height() == committed_block_height_guard
137                    {
138                        break;
139                    } else {
140                        counter += 1;
141                        sleep(Duration::from_millis(10))
142                    }
143
144                    // We try for up to 1 second
145                    if counter >= 100 {
146                        query_counter += 1;
147                        needs_restart = true;
148                        break;
149                    }
150                }
151
152                if query_counter > 3 {
153                    return Err(query_error_into_status(QueryError::NotServiceable(
154                        "platform is saturated (did not attempt query)".to_string(),
155                    )));
156                }
157
158                if needs_restart {
159                    continue;
160                }
161
162                result = query_method(
163                    &platform,
164                    query_request.clone(),
165                    &platform_state,
166                    platform_version,
167                );
168
169                let committed_block_height_guard = platform
170                    .committed_block_height_guard
171                    .load(Ordering::Relaxed);
172
173                if platform_state.last_committed_block_height() == committed_block_height_guard {
174                    // in this case the query almost certainly executed correctly
175                    break;
176                } else {
177                    query_counter += 1;
178
179                    if query_counter > 2 {
180                        // This should never be possible
181                        return Err(query_error_into_status(QueryError::NotServiceable(
182                            "platform is saturated".to_string(),
183                        )));
184                    }
185                }
186            }
187
188            let mut query_result = result.map_err(error_into_status)?;
189
190            if query_result.is_valid() {
191                let response = query_result
192                    .into_data()
193                    .map_err(|error| error_into_status(error.into()))?;
194
195                Ok(Response::new(response))
196            } else {
197                let error = query_result.errors.swap_remove(0);
198
199                Err(query_error_into_status(error))
200            }
201        })?
202        .instrument(tracing::trace_span!("query", endpoint_name))
203        .await
204        .map_err(|error| Status::internal(format!("query thread failed: {}", error)))?;
205
206        // Query logging and metrics
207        let code = match &result {
208            Ok(_) => Code::Ok,
209            Err(status) => status.code(),
210        };
211
212        let code_label = format!("{:?}", code).to_lowercase();
213
214        // Add code to response duration metric
215        let label = abci_response_code_metric_label(code);
216        response_duration_metric.add_label(label);
217
218        match code {
219            // User errors
220            Code::Ok
221            | Code::InvalidArgument
222            | Code::NotFound
223            | Code::AlreadyExists
224            | Code::ResourceExhausted
225            | Code::PermissionDenied
226            | Code::Unavailable
227            | Code::Aborted
228            | Code::FailedPrecondition
229            | Code::OutOfRange
230            | Code::Cancelled
231            | Code::DeadlineExceeded
232            | Code::Unauthenticated => {
233                let elapsed_time = response_duration_metric.elapsed().as_secs_f64();
234
235                tracing::trace!(
236                    request = request_debug,
237                    elapsed_time,
238                    endpoint_name,
239                    code = code_label,
240                    "query '{}' executed with code {:?} in {} secs",
241                    endpoint_name,
242                    code,
243                    elapsed_time
244                );
245            }
246            // System errors
247            Code::Unknown | Code::Unimplemented | Code::Internal | Code::DataLoss => {
248                tracing::error!(
249                    request = request_debug,
250                    endpoint_name,
251                    code = code_label,
252                    "query '{}' execution failed with code {:?}",
253                    endpoint_name,
254                    code
255                );
256            }
257        }
258
259        result
260    }
261}
262
263fn respond_with_unimplemented<RS>(name: &str) -> Result<Response<RS>, Status> {
264    tracing::error!("{} endpoint is called but it's not supported", name);
265
266    Err(Status::unimplemented("the endpoint is not supported"))
267}
268
269#[async_trait]
270impl PlatformService for QueryService {
271    async fn broadcast_state_transition(
272        &self,
273        _request: Request<BroadcastStateTransitionRequest>,
274    ) -> Result<Response<BroadcastStateTransitionResponse>, Status> {
275        respond_with_unimplemented("broadcast_state_transition")
276    }
277
278    async fn get_identity(
279        &self,
280        request: Request<GetIdentityRequest>,
281    ) -> Result<Response<GetIdentityResponse>, Status> {
282        self.handle_blocking_query(
283            request,
284            Platform::<DefaultCoreRPC>::query_identity,
285            "get_identity",
286        )
287        .await
288    }
289
290    async fn get_identities_contract_keys(
291        &self,
292        request: Request<GetIdentitiesContractKeysRequest>,
293    ) -> Result<Response<GetIdentitiesContractKeysResponse>, Status> {
294        self.handle_blocking_query(
295            request,
296            Platform::<DefaultCoreRPC>::query_identities_contract_keys,
297            "get_identities_contract_keys",
298        )
299        .await
300    }
301
302    async fn get_identity_keys(
303        &self,
304        request: Request<GetIdentityKeysRequest>,
305    ) -> Result<Response<GetIdentityKeysResponse>, Status> {
306        self.handle_blocking_query(
307            request,
308            Platform::<DefaultCoreRPC>::query_keys,
309            "get_identity_keys",
310        )
311        .await
312    }
313
314    async fn get_identity_nonce(
315        &self,
316        request: Request<GetIdentityNonceRequest>,
317    ) -> Result<Response<GetIdentityNonceResponse>, Status> {
318        self.handle_blocking_query(
319            request,
320            Platform::<DefaultCoreRPC>::query_identity_nonce,
321            "get_identity_nonce",
322        )
323        .await
324    }
325
326    async fn get_identity_contract_nonce(
327        &self,
328        request: Request<GetIdentityContractNonceRequest>,
329    ) -> Result<Response<GetIdentityContractNonceResponse>, Status> {
330        self.handle_blocking_query(
331            request,
332            Platform::<DefaultCoreRPC>::query_identity_contract_nonce,
333            "get_identity_contract_nonce",
334        )
335        .await
336    }
337
338    async fn get_identity_balance(
339        &self,
340        request: Request<GetIdentityBalanceRequest>,
341    ) -> Result<Response<GetIdentityBalanceResponse>, Status> {
342        self.handle_blocking_query(
343            request,
344            Platform::<DefaultCoreRPC>::query_balance,
345            "get_identity_balance",
346        )
347        .await
348    }
349
350    async fn get_identity_balance_and_revision(
351        &self,
352        request: Request<GetIdentityBalanceAndRevisionRequest>,
353    ) -> Result<Response<GetIdentityBalanceAndRevisionResponse>, Status> {
354        self.handle_blocking_query(
355            request,
356            Platform::<DefaultCoreRPC>::query_balance_and_revision,
357            "get_identity_balance_and_revision",
358        )
359        .await
360    }
361
362    async fn get_data_contract(
363        &self,
364        request: Request<GetDataContractRequest>,
365    ) -> Result<Response<GetDataContractResponse>, Status> {
366        self.handle_blocking_query(
367            request,
368            Platform::<DefaultCoreRPC>::query_data_contract,
369            "get_data_contract",
370        )
371        .await
372    }
373
374    async fn get_data_contract_history(
375        &self,
376        request: Request<GetDataContractHistoryRequest>,
377    ) -> Result<Response<GetDataContractHistoryResponse>, Status> {
378        self.handle_blocking_query(
379            request,
380            Platform::<DefaultCoreRPC>::query_data_contract_history,
381            "get_data_contract_history",
382        )
383        .await
384    }
385
386    async fn get_data_contracts(
387        &self,
388        request: Request<GetDataContractsRequest>,
389    ) -> Result<Response<GetDataContractsResponse>, Status> {
390        self.handle_blocking_query(
391            request,
392            Platform::<DefaultCoreRPC>::query_data_contracts,
393            "get_data_contracts",
394        )
395        .await
396    }
397
398    async fn get_document_history(
399        &self,
400        request: Request<GetDocumentHistoryRequest>,
401    ) -> Result<Response<GetDocumentHistoryResponse>, Status> {
402        self.handle_blocking_query(
403            request,
404            Platform::<DefaultCoreRPC>::query_document_history,
405            "get_document_history",
406        )
407        .await
408    }
409
410    async fn get_documents(
411        &self,
412        request: Request<GetDocumentsRequest>,
413    ) -> Result<Response<GetDocumentsResponse>, Status> {
414        self.handle_blocking_query(
415            request,
416            Platform::<DefaultCoreRPC>::query_documents,
417            "get_documents",
418        )
419        .await
420    }
421
422    async fn get_identity_by_public_key_hash(
423        &self,
424        request: Request<GetIdentityByPublicKeyHashRequest>,
425    ) -> Result<Response<GetIdentityByPublicKeyHashResponse>, Status> {
426        self.handle_blocking_query(
427            request,
428            Platform::<DefaultCoreRPC>::query_identity_by_public_key_hash,
429            "get_identity_by_public_key_hash",
430        )
431        .await
432    }
433
434    async fn get_identity_by_non_unique_public_key_hash(
435        &self,
436        request: Request<GetIdentityByNonUniquePublicKeyHashRequest>,
437    ) -> Result<Response<GetIdentityByNonUniquePublicKeyHashResponse>, Status> {
438        self.handle_blocking_query(
439            request,
440            Platform::<DefaultCoreRPC>::query_identity_by_non_unique_public_key_hash,
441            "get_identity_by_non_unique_public_key_hash",
442        )
443        .await
444    }
445
446    async fn wait_for_state_transition_result(
447        &self,
448        _request: Request<WaitForStateTransitionResultRequest>,
449    ) -> Result<Response<WaitForStateTransitionResultResponse>, Status> {
450        respond_with_unimplemented("wait_for_state_transition_result")
451    }
452
453    async fn get_consensus_params(
454        &self,
455        _request: Request<GetConsensusParamsRequest>,
456    ) -> Result<Response<GetConsensusParamsResponse>, Status> {
457        respond_with_unimplemented("get_consensus_params")
458    }
459
460    async fn get_protocol_version_upgrade_state(
461        &self,
462        request: Request<GetProtocolVersionUpgradeStateRequest>,
463    ) -> Result<Response<GetProtocolVersionUpgradeStateResponse>, Status> {
464        self.handle_blocking_query(
465            request,
466            Platform::<DefaultCoreRPC>::query_version_upgrade_state,
467            "get_protocol_version_upgrade_state",
468        )
469        .await
470    }
471
472    async fn get_protocol_version_upgrade_vote_status(
473        &self,
474        request: Request<GetProtocolVersionUpgradeVoteStatusRequest>,
475    ) -> Result<Response<GetProtocolVersionUpgradeVoteStatusResponse>, Status> {
476        self.handle_blocking_query(
477            request,
478            Platform::<DefaultCoreRPC>::query_version_upgrade_vote_status,
479            "get_protocol_version_upgrade_vote_status",
480        )
481        .await
482    }
483
484    async fn get_epochs_info(
485        &self,
486        request: Request<GetEpochsInfoRequest>,
487    ) -> Result<Response<GetEpochsInfoResponse>, Status> {
488        self.handle_blocking_query(
489            request,
490            Platform::<DefaultCoreRPC>::query_epoch_infos,
491            "get_epochs_info",
492        )
493        .await
494    }
495
496    async fn get_path_elements(
497        &self,
498        request: Request<GetPathElementsRequest>,
499    ) -> Result<Response<GetPathElementsResponse>, Status> {
500        self.handle_blocking_query(
501            request,
502            Platform::<DefaultCoreRPC>::query_path_elements,
503            "get_path_elements",
504        )
505        .await
506    }
507
508    async fn get_contested_resources(
509        &self,
510        request: Request<GetContestedResourcesRequest>,
511    ) -> Result<Response<GetContestedResourcesResponse>, Status> {
512        self.handle_blocking_query(
513            request,
514            Platform::<DefaultCoreRPC>::query_contested_resources,
515            "get_contested_resources",
516        )
517        .await
518    }
519
520    async fn get_contested_resource_vote_state(
521        &self,
522        request: Request<GetContestedResourceVoteStateRequest>,
523    ) -> Result<Response<GetContestedResourceVoteStateResponse>, Status> {
524        self.handle_blocking_query(
525            request,
526            Platform::<DefaultCoreRPC>::query_contested_resource_vote_state,
527            "get_contested_resource_vote_state",
528        )
529        .await
530    }
531
532    async fn get_contested_resource_voters_for_identity(
533        &self,
534        request: Request<GetContestedResourceVotersForIdentityRequest>,
535    ) -> Result<Response<GetContestedResourceVotersForIdentityResponse>, Status> {
536        self.handle_blocking_query(
537            request,
538            Platform::<DefaultCoreRPC>::query_contested_resource_voters_for_identity,
539            "get_contested_resource_voters_for_identity",
540        )
541        .await
542    }
543
544    async fn get_contested_resource_identity_votes(
545        &self,
546        request: Request<GetContestedResourceIdentityVotesRequest>,
547    ) -> Result<Response<GetContestedResourceIdentityVotesResponse>, Status> {
548        self.handle_blocking_query(
549            request,
550            Platform::<DefaultCoreRPC>::query_contested_resource_identity_votes,
551            "get_contested_resource_identity_votes",
552        )
553        .await
554    }
555
556    async fn get_vote_polls_by_end_date(
557        &self,
558        request: Request<GetVotePollsByEndDateRequest>,
559    ) -> Result<Response<GetVotePollsByEndDateResponse>, Status> {
560        self.handle_blocking_query(
561            request,
562            Platform::<DefaultCoreRPC>::query_vote_polls_by_end_date_query,
563            "get_vote_polls_by_end_date",
564        )
565        .await
566    }
567
568    async fn get_prefunded_specialized_balance(
569        &self,
570        request: Request<GetPrefundedSpecializedBalanceRequest>,
571    ) -> Result<Response<GetPrefundedSpecializedBalanceResponse>, Status> {
572        self.handle_blocking_query(
573            request,
574            Platform::<DefaultCoreRPC>::query_prefunded_specialized_balance,
575            "get_prefunded_specialized_balance",
576        )
577        .await
578    }
579
580    async fn get_total_credits_in_platform(
581        &self,
582        request: Request<GetTotalCreditsInPlatformRequest>,
583    ) -> Result<Response<GetTotalCreditsInPlatformResponse>, Status> {
584        self.handle_blocking_query(
585            request,
586            Platform::<DefaultCoreRPC>::query_total_credits_in_platform,
587            "get_total_credits_in_platform",
588        )
589        .await
590    }
591
592    async fn get_identities_balances(
593        &self,
594        request: Request<GetIdentitiesBalancesRequest>,
595    ) -> Result<Response<GetIdentitiesBalancesResponse>, Status> {
596        self.handle_blocking_query(
597            request,
598            Platform::<DefaultCoreRPC>::query_identities_balances,
599            "get_identities_balances",
600        )
601        .await
602    }
603
604    async fn get_status(
605        &self,
606        request: Request<GetStatusRequest>,
607    ) -> Result<Response<GetStatusResponse>, Status> {
608        self.handle_blocking_query(
609            request,
610            Platform::<DefaultCoreRPC>::query_partial_status,
611            "query_partial_status",
612        )
613        .await
614    }
615
616    async fn get_evonodes_proposed_epoch_blocks_by_ids(
617        &self,
618        request: Request<GetEvonodesProposedEpochBlocksByIdsRequest>,
619    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
620        self.handle_blocking_query(
621            request,
622            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_evonode_ids,
623            "query_proposed_block_counts_by_evonode_ids",
624        )
625        .await
626    }
627
628    async fn get_evonodes_proposed_epoch_blocks_by_range(
629        &self,
630        request: Request<GetEvonodesProposedEpochBlocksByRangeRequest>,
631    ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
632        self.handle_blocking_query(
633            request,
634            Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_range,
635            "query_proposed_block_counts_by_range",
636        )
637        .await
638    }
639
640    async fn get_current_quorums_info(
641        &self,
642        request: Request<GetCurrentQuorumsInfoRequest>,
643    ) -> Result<Response<GetCurrentQuorumsInfoResponse>, Status> {
644        self.handle_blocking_query(
645            request,
646            Platform::<DefaultCoreRPC>::query_current_quorums_info,
647            "query_current_quorums_info",
648        )
649        .await
650    }
651
652    async fn get_identity_token_balances(
653        &self,
654        request: Request<GetIdentityTokenBalancesRequest>,
655    ) -> Result<Response<GetIdentityTokenBalancesResponse>, Status> {
656        self.handle_blocking_query(
657            request,
658            Platform::<DefaultCoreRPC>::query_identity_token_balances,
659            "query_identity_token_balances",
660        )
661        .await
662    }
663
664    async fn get_identities_token_balances(
665        &self,
666        request: Request<GetIdentitiesTokenBalancesRequest>,
667    ) -> Result<Response<GetIdentitiesTokenBalancesResponse>, Status> {
668        self.handle_blocking_query(
669            request,
670            Platform::<DefaultCoreRPC>::query_identities_token_balances,
671            "query_identities_token_balances",
672        )
673        .await
674    }
675
676    async fn get_identity_token_infos(
677        &self,
678        request: Request<GetIdentityTokenInfosRequest>,
679    ) -> Result<Response<GetIdentityTokenInfosResponse>, Status> {
680        self.handle_blocking_query(
681            request,
682            Platform::<DefaultCoreRPC>::query_identity_token_infos,
683            "query_identity_token_infos",
684        )
685        .await
686    }
687
688    async fn get_identities_token_infos(
689        &self,
690        request: Request<GetIdentitiesTokenInfosRequest>,
691    ) -> Result<Response<GetIdentitiesTokenInfosResponse>, Status> {
692        self.handle_blocking_query(
693            request,
694            Platform::<DefaultCoreRPC>::query_identities_token_infos,
695            "query_identities_token_infos",
696        )
697        .await
698    }
699
700    async fn get_token_statuses(
701        &self,
702        request: Request<GetTokenStatusesRequest>,
703    ) -> Result<Response<GetTokenStatusesResponse>, Status> {
704        self.handle_blocking_query(
705            request,
706            Platform::<DefaultCoreRPC>::query_token_statuses,
707            "get_token_statuses",
708        )
709        .await
710    }
711
712    async fn get_token_pre_programmed_distributions(
713        &self,
714        request: Request<GetTokenPreProgrammedDistributionsRequest>,
715    ) -> Result<Response<GetTokenPreProgrammedDistributionsResponse>, Status> {
716        self.handle_blocking_query(
717            request,
718            Platform::<DefaultCoreRPC>::query_token_pre_programmed_distributions,
719            "get_token_pre_programmed_distributions",
720        )
721        .await
722    }
723
724    async fn get_token_total_supply(
725        &self,
726        request: Request<GetTokenTotalSupplyRequest>,
727    ) -> Result<Response<GetTokenTotalSupplyResponse>, Status> {
728        self.handle_blocking_query(
729            request,
730            Platform::<DefaultCoreRPC>::query_token_total_supply,
731            "get_token_total_supply",
732        )
733        .await
734    }
735
736    async fn get_group_info(
737        &self,
738        request: Request<GetGroupInfoRequest>,
739    ) -> Result<Response<GetGroupInfoResponse>, Status> {
740        self.handle_blocking_query(
741            request,
742            Platform::<DefaultCoreRPC>::query_group_info,
743            "get_group_info",
744        )
745        .await
746    }
747
748    async fn get_group_infos(
749        &self,
750        request: Request<GetGroupInfosRequest>,
751    ) -> Result<Response<GetGroupInfosResponse>, Status> {
752        self.handle_blocking_query(
753            request,
754            Platform::<DefaultCoreRPC>::query_group_infos,
755            "get_group_infos",
756        )
757        .await
758    }
759
760    async fn get_group_actions(
761        &self,
762        request: Request<GetGroupActionsRequest>,
763    ) -> Result<Response<GetGroupActionsResponse>, Status> {
764        self.handle_blocking_query(
765            request,
766            Platform::<DefaultCoreRPC>::query_group_actions,
767            "get_group_actions",
768        )
769        .await
770    }
771
772    async fn get_group_action_signers(
773        &self,
774        request: Request<GetGroupActionSignersRequest>,
775    ) -> Result<Response<GetGroupActionSignersResponse>, Status> {
776        self.handle_blocking_query(
777            request,
778            Platform::<DefaultCoreRPC>::query_group_action_signers,
779            "get_group_action_signers",
780        )
781        .await
782    }
783
784    async fn get_token_direct_purchase_prices(
785        &self,
786        request: Request<GetTokenDirectPurchasePricesRequest>,
787    ) -> Result<Response<GetTokenDirectPurchasePricesResponse>, Status> {
788        self.handle_blocking_query(
789            request,
790            Platform::<DefaultCoreRPC>::query_token_direct_purchase_prices,
791            "get_token_direct_purchase_prices",
792        )
793        .await
794    }
795
796    async fn get_token_contract_info(
797        &self,
798        request: Request<GetTokenContractInfoRequest>,
799    ) -> Result<Response<GetTokenContractInfoResponse>, Status> {
800        self.handle_blocking_query(
801            request,
802            Platform::<DefaultCoreRPC>::query_token_contract_info,
803            "get_token_contract_info",
804        )
805        .await
806    }
807
808    async fn get_token_perpetual_distribution_last_claim(
809        &self,
810        request: Request<GetTokenPerpetualDistributionLastClaimRequest>,
811    ) -> Result<Response<GetTokenPerpetualDistributionLastClaimResponse>, Status> {
812        self.handle_blocking_query(
813            request,
814            Platform::<DefaultCoreRPC>::query_token_perpetual_distribution_last_claim,
815            "get_token_perpetual_distribution_last_claim",
816        )
817        .await
818    }
819
820    async fn get_finalized_epoch_infos(
821        &self,
822        request: Request<GetFinalizedEpochInfosRequest>,
823    ) -> Result<Response<GetFinalizedEpochInfosResponse>, Status> {
824        self.handle_blocking_query(
825            request,
826            Platform::<DefaultCoreRPC>::query_finalized_epoch_infos,
827            "get_finalized_epoch_infos",
828        )
829        .await
830    }
831
832    async fn get_address_info(
833        &self,
834        request: Request<GetAddressInfoRequest>,
835    ) -> Result<Response<GetAddressInfoResponse>, Status> {
836        self.handle_blocking_query(
837            request,
838            Platform::<DefaultCoreRPC>::query_address_info,
839            "get_address_info",
840        )
841        .await
842    }
843
844    async fn get_addresses_infos(
845        &self,
846        request: Request<GetAddressesInfosRequest>,
847    ) -> Result<Response<GetAddressesInfosResponse>, Status> {
848        self.handle_blocking_query(
849            request,
850            Platform::<DefaultCoreRPC>::query_addresses_infos,
851            "get_addresses_infos",
852        )
853        .await
854    }
855
856    async fn get_addresses_trunk_state(
857        &self,
858        request: Request<GetAddressesTrunkStateRequest>,
859    ) -> Result<Response<GetAddressesTrunkStateResponse>, Status> {
860        self.handle_blocking_query(
861            request,
862            Platform::<DefaultCoreRPC>::query_addresses_trunk_state,
863            "get_addresses_trunk_state",
864        )
865        .await
866    }
867
868    async fn get_addresses_branch_state(
869        &self,
870        request: Request<GetAddressesBranchStateRequest>,
871    ) -> Result<Response<GetAddressesBranchStateResponse>, Status> {
872        self.handle_blocking_query(
873            request,
874            Platform::<DefaultCoreRPC>::query_addresses_branch_state,
875            "get_addresses_branch_state",
876        )
877        .await
878    }
879
880    async fn get_recent_address_balance_changes(
881        &self,
882        request: Request<GetRecentAddressBalanceChangesRequest>,
883    ) -> Result<Response<GetRecentAddressBalanceChangesResponse>, Status> {
884        self.handle_blocking_query(
885            request,
886            Platform::<DefaultCoreRPC>::query_recent_address_balance_changes,
887            "get_recent_address_balance_changes",
888        )
889        .await
890    }
891
892    async fn get_recent_compacted_address_balance_changes(
893        &self,
894        request: Request<GetRecentCompactedAddressBalanceChangesRequest>,
895    ) -> Result<Response<GetRecentCompactedAddressBalanceChangesResponse>, Status> {
896        self.handle_blocking_query(
897            request,
898            Platform::<DefaultCoreRPC>::query_recent_compacted_address_balance_changes,
899            "get_recent_compacted_address_balance_changes",
900        )
901        .await
902    }
903
904    async fn get_shielded_encrypted_notes(
905        &self,
906        request: Request<GetShieldedEncryptedNotesRequest>,
907    ) -> Result<Response<GetShieldedEncryptedNotesResponse>, Status> {
908        self.handle_blocking_query(
909            request,
910            Platform::<DefaultCoreRPC>::query_shielded_encrypted_notes,
911            "get_shielded_encrypted_notes",
912        )
913        .await
914    }
915
916    async fn get_shielded_anchors(
917        &self,
918        request: Request<GetShieldedAnchorsRequest>,
919    ) -> Result<Response<GetShieldedAnchorsResponse>, Status> {
920        self.handle_blocking_query(
921            request,
922            Platform::<DefaultCoreRPC>::query_shielded_anchors,
923            "get_shielded_anchors",
924        )
925        .await
926    }
927
928    async fn get_most_recent_shielded_anchor(
929        &self,
930        request: Request<GetMostRecentShieldedAnchorRequest>,
931    ) -> Result<Response<GetMostRecentShieldedAnchorResponse>, Status> {
932        self.handle_blocking_query(
933            request,
934            Platform::<DefaultCoreRPC>::query_most_recent_shielded_anchor,
935            "get_most_recent_shielded_anchor",
936        )
937        .await
938    }
939
940    async fn get_shielded_pool_state(
941        &self,
942        request: Request<GetShieldedPoolStateRequest>,
943    ) -> Result<Response<GetShieldedPoolStateResponse>, Status> {
944        self.handle_blocking_query(
945            request,
946            Platform::<DefaultCoreRPC>::query_shielded_pool_state,
947            "get_shielded_pool_state",
948        )
949        .await
950    }
951
952    async fn get_shielded_notes_count(
953        &self,
954        request: Request<GetShieldedNotesCountRequest>,
955    ) -> Result<Response<GetShieldedNotesCountResponse>, Status> {
956        self.handle_blocking_query(
957            request,
958            Platform::<DefaultCoreRPC>::query_shielded_notes_count,
959            "get_shielded_notes_count",
960        )
961        .await
962    }
963
964    async fn get_shielded_nullifiers(
965        &self,
966        request: Request<GetShieldedNullifiersRequest>,
967    ) -> Result<Response<GetShieldedNullifiersResponse>, Status> {
968        self.handle_blocking_query(
969            request,
970            Platform::<DefaultCoreRPC>::query_shielded_nullifiers,
971            "get_shielded_nullifiers",
972        )
973        .await
974    }
975
976    async fn get_nullifiers_trunk_state(
977        &self,
978        request: Request<GetNullifiersTrunkStateRequest>,
979    ) -> Result<Response<GetNullifiersTrunkStateResponse>, Status> {
980        self.handle_blocking_query(
981            request,
982            Platform::<DefaultCoreRPC>::query_nullifiers_trunk_state,
983            "get_nullifiers_trunk_state",
984        )
985        .await
986    }
987
988    async fn get_nullifiers_branch_state(
989        &self,
990        request: Request<GetNullifiersBranchStateRequest>,
991    ) -> Result<Response<GetNullifiersBranchStateResponse>, Status> {
992        self.handle_blocking_query(
993            request,
994            Platform::<DefaultCoreRPC>::query_nullifiers_branch_state,
995            "get_nullifiers_branch_state",
996        )
997        .await
998    }
999
1000    async fn get_recent_nullifier_changes(
1001        &self,
1002        request: Request<GetRecentNullifierChangesRequest>,
1003    ) -> Result<Response<GetRecentNullifierChangesResponse>, Status> {
1004        self.handle_blocking_query(
1005            request,
1006            Platform::<DefaultCoreRPC>::query_recent_nullifier_changes,
1007            "get_recent_nullifier_changes",
1008        )
1009        .await
1010    }
1011
1012    async fn get_recent_compacted_nullifier_changes(
1013        &self,
1014        request: Request<GetRecentCompactedNullifierChangesRequest>,
1015    ) -> Result<Response<GetRecentCompactedNullifierChangesResponse>, Status> {
1016        self.handle_blocking_query(
1017            request,
1018            Platform::<DefaultCoreRPC>::query_recent_compacted_nullifier_changes,
1019            "get_recent_compacted_nullifier_changes",
1020        )
1021        .await
1022    }
1023}
1024
1025#[async_trait]
1026impl DriveInternal for QueryService {
1027    async fn get_proofs(
1028        &self,
1029        request: Request<GetProofsRequest>,
1030    ) -> Result<Response<GetProofsResponse>, Status> {
1031        self.handle_blocking_query(
1032            request,
1033            Platform::<DefaultCoreRPC>::query_proofs,
1034            "get_proofs",
1035        )
1036        .await
1037    }
1038}
1039
1040fn query_error_into_status(error: QueryError) -> Status {
1041    match error {
1042        QueryError::NotFound(message) => Status::not_found(message),
1043        QueryError::InvalidArgument(message) => Status::invalid_argument(message),
1044        QueryError::Query(error) => Status::invalid_argument(error.to_string()),
1045        _ => {
1046            tracing::error!("unexpected query error: {:?}", error);
1047
1048            Status::unknown(error.to_string())
1049        }
1050    }
1051}
1052
1053fn error_into_status(error: Error) -> Status {
1054    Status::internal(format!("query: {}", error))
1055}