1use crate::error::query::QueryError;
2use crate::error::Error;
3use crate::metrics::{abci_response_code_metric_label, query_duration_metric};
4use crate::platform_types::platform::Platform;
5use crate::platform_types::platform_state::PlatformState;
6use crate::platform_types::platform_state::PlatformStateV0Methods;
7use crate::query::QueryValidationResult;
8use crate::rpc::core::DefaultCoreRPC;
9use crate::utils::spawn_blocking_task_with_name_if_supported;
10use async_trait::async_trait;
11use dapi_grpc::drive::v0::drive_internal_server::DriveInternal;
12use dapi_grpc::drive::v0::{GetProofsRequest, GetProofsResponse};
13use dapi_grpc::platform::v0::platform_server::Platform as PlatformService;
14use dapi_grpc::platform::v0::{
15 BroadcastStateTransitionRequest, BroadcastStateTransitionResponse, GetAddressInfoRequest,
16 GetAddressInfoResponse, GetAddressesBranchStateRequest, GetAddressesBranchStateResponse,
17 GetAddressesInfosRequest, GetAddressesInfosResponse, GetAddressesTrunkStateRequest,
18 GetAddressesTrunkStateResponse, GetConsensusParamsRequest, GetConsensusParamsResponse,
19 GetContestedResourceIdentityVotesRequest, GetContestedResourceIdentityVotesResponse,
20 GetContestedResourceVoteStateRequest, GetContestedResourceVoteStateResponse,
21 GetContestedResourceVotersForIdentityRequest, GetContestedResourceVotersForIdentityResponse,
22 GetContestedResourcesRequest, GetContestedResourcesResponse, GetCurrentQuorumsInfoRequest,
23 GetCurrentQuorumsInfoResponse, GetDataContractHistoryRequest, GetDataContractHistoryResponse,
24 GetDataContractRequest, GetDataContractResponse, GetDataContractsRequest,
25 GetDataContractsResponse, GetDocumentHistoryRequest, GetDocumentHistoryResponse,
26 GetDocumentsRequest, GetDocumentsResponse, GetEpochsInfoRequest, GetEpochsInfoResponse,
27 GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest,
28 GetEvonodesProposedEpochBlocksResponse, GetFinalizedEpochInfosRequest,
29 GetFinalizedEpochInfosResponse, GetGroupActionSignersRequest, GetGroupActionSignersResponse,
30 GetGroupActionsRequest, GetGroupActionsResponse, GetGroupInfoRequest, GetGroupInfoResponse,
31 GetGroupInfosRequest, GetGroupInfosResponse, GetIdentitiesBalancesRequest,
32 GetIdentitiesBalancesResponse, GetIdentitiesContractKeysRequest,
33 GetIdentitiesContractKeysResponse, GetIdentitiesTokenBalancesRequest,
34 GetIdentitiesTokenBalancesResponse, GetIdentitiesTokenInfosRequest,
35 GetIdentitiesTokenInfosResponse, GetIdentityBalanceAndRevisionRequest,
36 GetIdentityBalanceAndRevisionResponse, GetIdentityBalanceRequest, GetIdentityBalanceResponse,
37 GetIdentityByNonUniquePublicKeyHashRequest, GetIdentityByNonUniquePublicKeyHashResponse,
38 GetIdentityByPublicKeyHashRequest, GetIdentityByPublicKeyHashResponse,
39 GetIdentityContractNonceRequest, GetIdentityContractNonceResponse, GetIdentityKeysRequest,
40 GetIdentityKeysResponse, GetIdentityNonceRequest, GetIdentityNonceResponse, GetIdentityRequest,
41 GetIdentityResponse, GetIdentityTokenBalancesRequest, GetIdentityTokenBalancesResponse,
42 GetIdentityTokenInfosRequest, GetIdentityTokenInfosResponse,
43 GetMostRecentShieldedAnchorRequest, GetMostRecentShieldedAnchorResponse,
44 GetPathElementsRequest, GetPathElementsResponse, GetPrefundedSpecializedBalanceRequest,
45 GetPrefundedSpecializedBalanceResponse, GetProtocolVersionUpgradeStateRequest,
46 GetProtocolVersionUpgradeStateResponse, GetProtocolVersionUpgradeVoteStatusRequest,
47 GetProtocolVersionUpgradeVoteStatusResponse, GetRecentAddressBalanceChangesRequest,
48 GetRecentAddressBalanceChangesResponse, GetRecentCompactedAddressBalanceChangesRequest,
49 GetRecentCompactedAddressBalanceChangesResponse, GetShieldedAnchorsRequest,
50 GetShieldedAnchorsResponse, GetShieldedEncryptedNotesRequest,
51 GetShieldedEncryptedNotesResponse, GetShieldedNotesCountRequest, GetShieldedNotesCountResponse,
52 GetShieldedNullifiersRequest, GetShieldedNullifiersResponse, GetShieldedPoolStateRequest,
53 GetShieldedPoolStateResponse, GetStatusRequest, GetStatusResponse, GetTokenContractInfoRequest,
54 GetTokenContractInfoResponse, GetTokenDirectPurchasePricesRequest,
55 GetTokenDirectPurchasePricesResponse, GetTokenPerpetualDistributionLastClaimRequest,
56 GetTokenPerpetualDistributionLastClaimResponse, GetTokenPreProgrammedDistributionsRequest,
57 GetTokenPreProgrammedDistributionsResponse, GetTokenStatusesRequest, GetTokenStatusesResponse,
58 GetTokenTotalSupplyRequest, GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest,
59 GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse,
60 WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse,
61};
62use dapi_grpc::tonic::{Code, Request, Response, Status};
63use dpp::version::PlatformVersion;
64use std::fmt::Debug;
65use std::sync::atomic::Ordering;
66use std::sync::Arc;
67use std::thread::sleep;
68use std::time::Duration;
69use tracing::Instrument;
70
71pub struct QueryService {
73 platform: Arc<Platform<DefaultCoreRPC>>,
74}
75
76type QueryMethod<RQ, RS> = fn(
77 &Platform<DefaultCoreRPC>,
78 RQ,
79 &PlatformState,
80 &PlatformVersion,
81) -> Result<QueryValidationResult<RS>, Error>;
82
83impl QueryService {
84 pub fn new(platform: Arc<Platform<DefaultCoreRPC>>) -> Self {
86 Self { platform }
87 }
88
89 async fn handle_blocking_query<RQ, RS>(
90 &self,
91 request: Request<RQ>,
92 query_method: QueryMethod<RQ, RS>,
93 endpoint_name: &str,
94 ) -> Result<Response<RS>, Status>
95 where
96 RS: Clone + Send + 'static,
97 RQ: Debug + Send + Clone + 'static,
98 {
99 let mut response_duration_metric = query_duration_metric(endpoint_name);
100
101 let platform = Arc::clone(&self.platform);
102
103 let request_debug = format!("{:?}", &request);
104
105 let result = spawn_blocking_task_with_name_if_supported("query", move || {
106 let mut result;
107
108 let query_request = request.into_inner();
109
110 let mut query_counter = 0;
111
112 loop {
113 let platform_state = platform.state.load();
114
115 let platform_version = platform_state
116 .current_platform_version()
117 .map_err(|_| Status::unavailable("platform is not initialized"))?;
118
119 let mut needs_restart = false;
126
127 loop {
128 let committed_block_height_guard = platform
129 .committed_block_height_guard
130 .load(Ordering::Relaxed);
131 let mut counter = 0;
132 if platform_state.last_committed_block_height() == committed_block_height_guard
133 {
134 break;
135 } else {
136 counter += 1;
137 sleep(Duration::from_millis(10))
138 }
139
140 if counter >= 100 {
142 query_counter += 1;
143 needs_restart = true;
144 break;
145 }
146 }
147
148 if query_counter > 3 {
149 return Err(query_error_into_status(QueryError::NotServiceable(
150 "platform is saturated (did not attempt query)".to_string(),
151 )));
152 }
153
154 if needs_restart {
155 continue;
156 }
157
158 result = query_method(
159 &platform,
160 query_request.clone(),
161 &platform_state,
162 platform_version,
163 );
164
165 let committed_block_height_guard = platform
166 .committed_block_height_guard
167 .load(Ordering::Relaxed);
168
169 if platform_state.last_committed_block_height() == committed_block_height_guard {
170 break;
172 } else {
173 query_counter += 1;
174
175 if query_counter > 2 {
176 return Err(query_error_into_status(QueryError::NotServiceable(
178 "platform is saturated".to_string(),
179 )));
180 }
181 }
182 }
183
184 let mut query_result = result.map_err(error_into_status)?;
185
186 if query_result.is_valid() {
187 let response = query_result
188 .into_data()
189 .map_err(|error| error_into_status(error.into()))?;
190
191 Ok(Response::new(response))
192 } else {
193 let error = query_result.errors.swap_remove(0);
194
195 Err(query_error_into_status(error))
196 }
197 })?
198 .instrument(tracing::trace_span!("query", endpoint_name))
199 .await
200 .map_err(|error| Status::internal(format!("query thread failed: {}", error)))?;
201
202 let code = match &result {
204 Ok(_) => Code::Ok,
205 Err(status) => status.code(),
206 };
207
208 let code_label = format!("{:?}", code).to_lowercase();
209
210 let label = abci_response_code_metric_label(code);
212 response_duration_metric.add_label(label);
213
214 match code {
215 Code::Ok
217 | Code::InvalidArgument
218 | Code::NotFound
219 | Code::AlreadyExists
220 | Code::ResourceExhausted
221 | Code::PermissionDenied
222 | Code::Unavailable
223 | Code::Aborted
224 | Code::FailedPrecondition
225 | Code::OutOfRange
226 | Code::Cancelled
227 | Code::DeadlineExceeded
228 | Code::Unauthenticated => {
229 let elapsed_time = response_duration_metric.elapsed().as_secs_f64();
230
231 tracing::trace!(
232 request = request_debug,
233 elapsed_time,
234 endpoint_name,
235 code = code_label,
236 "query '{}' executed with code {:?} in {} secs",
237 endpoint_name,
238 code,
239 elapsed_time
240 );
241 }
242 Code::Unknown | Code::Unimplemented | Code::Internal | Code::DataLoss => {
244 tracing::error!(
245 request = request_debug,
246 endpoint_name,
247 code = code_label,
248 "query '{}' execution failed with code {:?}",
249 endpoint_name,
250 code
251 );
252 }
253 }
254
255 result
256 }
257}
258
259fn respond_with_unimplemented<RS>(name: &str) -> Result<Response<RS>, Status> {
260 tracing::error!("{} endpoint is called but it's not supported", name);
261
262 Err(Status::unimplemented("the endpoint is not supported"))
263}
264
265#[async_trait]
266impl PlatformService for QueryService {
267 async fn broadcast_state_transition(
268 &self,
269 _request: Request<BroadcastStateTransitionRequest>,
270 ) -> Result<Response<BroadcastStateTransitionResponse>, Status> {
271 respond_with_unimplemented("broadcast_state_transition")
272 }
273
274 async fn get_identity(
275 &self,
276 request: Request<GetIdentityRequest>,
277 ) -> Result<Response<GetIdentityResponse>, Status> {
278 self.handle_blocking_query(
279 request,
280 Platform::<DefaultCoreRPC>::query_identity,
281 "get_identity",
282 )
283 .await
284 }
285
286 async fn get_identities_contract_keys(
287 &self,
288 request: Request<GetIdentitiesContractKeysRequest>,
289 ) -> Result<Response<GetIdentitiesContractKeysResponse>, Status> {
290 self.handle_blocking_query(
291 request,
292 Platform::<DefaultCoreRPC>::query_identities_contract_keys,
293 "get_identities_contract_keys",
294 )
295 .await
296 }
297
298 async fn get_identity_keys(
299 &self,
300 request: Request<GetIdentityKeysRequest>,
301 ) -> Result<Response<GetIdentityKeysResponse>, Status> {
302 self.handle_blocking_query(
303 request,
304 Platform::<DefaultCoreRPC>::query_keys,
305 "get_identity_keys",
306 )
307 .await
308 }
309
310 async fn get_identity_nonce(
311 &self,
312 request: Request<GetIdentityNonceRequest>,
313 ) -> Result<Response<GetIdentityNonceResponse>, Status> {
314 self.handle_blocking_query(
315 request,
316 Platform::<DefaultCoreRPC>::query_identity_nonce,
317 "get_identity_nonce",
318 )
319 .await
320 }
321
322 async fn get_identity_contract_nonce(
323 &self,
324 request: Request<GetIdentityContractNonceRequest>,
325 ) -> Result<Response<GetIdentityContractNonceResponse>, Status> {
326 self.handle_blocking_query(
327 request,
328 Platform::<DefaultCoreRPC>::query_identity_contract_nonce,
329 "get_identity_contract_nonce",
330 )
331 .await
332 }
333
334 async fn get_identity_balance(
335 &self,
336 request: Request<GetIdentityBalanceRequest>,
337 ) -> Result<Response<GetIdentityBalanceResponse>, Status> {
338 self.handle_blocking_query(
339 request,
340 Platform::<DefaultCoreRPC>::query_balance,
341 "get_identity_balance",
342 )
343 .await
344 }
345
346 async fn get_identity_balance_and_revision(
347 &self,
348 request: Request<GetIdentityBalanceAndRevisionRequest>,
349 ) -> Result<Response<GetIdentityBalanceAndRevisionResponse>, Status> {
350 self.handle_blocking_query(
351 request,
352 Platform::<DefaultCoreRPC>::query_balance_and_revision,
353 "get_identity_balance_and_revision",
354 )
355 .await
356 }
357
358 async fn get_data_contract(
359 &self,
360 request: Request<GetDataContractRequest>,
361 ) -> Result<Response<GetDataContractResponse>, Status> {
362 self.handle_blocking_query(
363 request,
364 Platform::<DefaultCoreRPC>::query_data_contract,
365 "get_data_contract",
366 )
367 .await
368 }
369
370 async fn get_data_contract_history(
371 &self,
372 request: Request<GetDataContractHistoryRequest>,
373 ) -> Result<Response<GetDataContractHistoryResponse>, Status> {
374 self.handle_blocking_query(
375 request,
376 Platform::<DefaultCoreRPC>::query_data_contract_history,
377 "get_data_contract_history",
378 )
379 .await
380 }
381
382 async fn get_data_contracts(
383 &self,
384 request: Request<GetDataContractsRequest>,
385 ) -> Result<Response<GetDataContractsResponse>, Status> {
386 self.handle_blocking_query(
387 request,
388 Platform::<DefaultCoreRPC>::query_data_contracts,
389 "get_data_contracts",
390 )
391 .await
392 }
393
394 async fn get_document_history(
395 &self,
396 request: Request<GetDocumentHistoryRequest>,
397 ) -> Result<Response<GetDocumentHistoryResponse>, Status> {
398 self.handle_blocking_query(
399 request,
400 Platform::<DefaultCoreRPC>::query_document_history,
401 "get_document_history",
402 )
403 .await
404 }
405
406 async fn get_documents(
407 &self,
408 request: Request<GetDocumentsRequest>,
409 ) -> Result<Response<GetDocumentsResponse>, Status> {
410 self.handle_blocking_query(
411 request,
412 Platform::<DefaultCoreRPC>::query_documents,
413 "get_documents",
414 )
415 .await
416 }
417
418 async fn get_identity_by_public_key_hash(
419 &self,
420 request: Request<GetIdentityByPublicKeyHashRequest>,
421 ) -> Result<Response<GetIdentityByPublicKeyHashResponse>, Status> {
422 self.handle_blocking_query(
423 request,
424 Platform::<DefaultCoreRPC>::query_identity_by_public_key_hash,
425 "get_identity_by_public_key_hash",
426 )
427 .await
428 }
429
430 async fn get_identity_by_non_unique_public_key_hash(
431 &self,
432 request: Request<GetIdentityByNonUniquePublicKeyHashRequest>,
433 ) -> Result<Response<GetIdentityByNonUniquePublicKeyHashResponse>, Status> {
434 self.handle_blocking_query(
435 request,
436 Platform::<DefaultCoreRPC>::query_identity_by_non_unique_public_key_hash,
437 "get_identity_by_non_unique_public_key_hash",
438 )
439 .await
440 }
441
442 async fn wait_for_state_transition_result(
443 &self,
444 _request: Request<WaitForStateTransitionResultRequest>,
445 ) -> Result<Response<WaitForStateTransitionResultResponse>, Status> {
446 respond_with_unimplemented("wait_for_state_transition_result")
447 }
448
449 async fn get_consensus_params(
450 &self,
451 _request: Request<GetConsensusParamsRequest>,
452 ) -> Result<Response<GetConsensusParamsResponse>, Status> {
453 respond_with_unimplemented("get_consensus_params")
454 }
455
456 async fn get_protocol_version_upgrade_state(
457 &self,
458 request: Request<GetProtocolVersionUpgradeStateRequest>,
459 ) -> Result<Response<GetProtocolVersionUpgradeStateResponse>, Status> {
460 self.handle_blocking_query(
461 request,
462 Platform::<DefaultCoreRPC>::query_version_upgrade_state,
463 "get_protocol_version_upgrade_state",
464 )
465 .await
466 }
467
468 async fn get_protocol_version_upgrade_vote_status(
469 &self,
470 request: Request<GetProtocolVersionUpgradeVoteStatusRequest>,
471 ) -> Result<Response<GetProtocolVersionUpgradeVoteStatusResponse>, Status> {
472 self.handle_blocking_query(
473 request,
474 Platform::<DefaultCoreRPC>::query_version_upgrade_vote_status,
475 "get_protocol_version_upgrade_vote_status",
476 )
477 .await
478 }
479
480 async fn get_epochs_info(
481 &self,
482 request: Request<GetEpochsInfoRequest>,
483 ) -> Result<Response<GetEpochsInfoResponse>, Status> {
484 self.handle_blocking_query(
485 request,
486 Platform::<DefaultCoreRPC>::query_epoch_infos,
487 "get_epochs_info",
488 )
489 .await
490 }
491
492 async fn get_path_elements(
493 &self,
494 request: Request<GetPathElementsRequest>,
495 ) -> Result<Response<GetPathElementsResponse>, Status> {
496 self.handle_blocking_query(
497 request,
498 Platform::<DefaultCoreRPC>::query_path_elements,
499 "get_path_elements",
500 )
501 .await
502 }
503
504 async fn get_contested_resources(
505 &self,
506 request: Request<GetContestedResourcesRequest>,
507 ) -> Result<Response<GetContestedResourcesResponse>, Status> {
508 self.handle_blocking_query(
509 request,
510 Platform::<DefaultCoreRPC>::query_contested_resources,
511 "get_contested_resources",
512 )
513 .await
514 }
515
516 async fn get_contested_resource_vote_state(
517 &self,
518 request: Request<GetContestedResourceVoteStateRequest>,
519 ) -> Result<Response<GetContestedResourceVoteStateResponse>, Status> {
520 self.handle_blocking_query(
521 request,
522 Platform::<DefaultCoreRPC>::query_contested_resource_vote_state,
523 "get_contested_resource_vote_state",
524 )
525 .await
526 }
527
528 async fn get_contested_resource_voters_for_identity(
529 &self,
530 request: Request<GetContestedResourceVotersForIdentityRequest>,
531 ) -> Result<Response<GetContestedResourceVotersForIdentityResponse>, Status> {
532 self.handle_blocking_query(
533 request,
534 Platform::<DefaultCoreRPC>::query_contested_resource_voters_for_identity,
535 "get_contested_resource_voters_for_identity",
536 )
537 .await
538 }
539
540 async fn get_contested_resource_identity_votes(
541 &self,
542 request: Request<GetContestedResourceIdentityVotesRequest>,
543 ) -> Result<Response<GetContestedResourceIdentityVotesResponse>, Status> {
544 self.handle_blocking_query(
545 request,
546 Platform::<DefaultCoreRPC>::query_contested_resource_identity_votes,
547 "get_contested_resource_identity_votes",
548 )
549 .await
550 }
551
552 async fn get_vote_polls_by_end_date(
553 &self,
554 request: Request<GetVotePollsByEndDateRequest>,
555 ) -> Result<Response<GetVotePollsByEndDateResponse>, Status> {
556 self.handle_blocking_query(
557 request,
558 Platform::<DefaultCoreRPC>::query_vote_polls_by_end_date_query,
559 "get_vote_polls_by_end_date",
560 )
561 .await
562 }
563
564 async fn get_prefunded_specialized_balance(
565 &self,
566 request: Request<GetPrefundedSpecializedBalanceRequest>,
567 ) -> Result<Response<GetPrefundedSpecializedBalanceResponse>, Status> {
568 self.handle_blocking_query(
569 request,
570 Platform::<DefaultCoreRPC>::query_prefunded_specialized_balance,
571 "get_prefunded_specialized_balance",
572 )
573 .await
574 }
575
576 async fn get_total_credits_in_platform(
577 &self,
578 request: Request<GetTotalCreditsInPlatformRequest>,
579 ) -> Result<Response<GetTotalCreditsInPlatformResponse>, Status> {
580 self.handle_blocking_query(
581 request,
582 Platform::<DefaultCoreRPC>::query_total_credits_in_platform,
583 "get_total_credits_in_platform",
584 )
585 .await
586 }
587
588 async fn get_identities_balances(
589 &self,
590 request: Request<GetIdentitiesBalancesRequest>,
591 ) -> Result<Response<GetIdentitiesBalancesResponse>, Status> {
592 self.handle_blocking_query(
593 request,
594 Platform::<DefaultCoreRPC>::query_identities_balances,
595 "get_identities_balances",
596 )
597 .await
598 }
599
600 async fn get_status(
601 &self,
602 request: Request<GetStatusRequest>,
603 ) -> Result<Response<GetStatusResponse>, Status> {
604 self.handle_blocking_query(
605 request,
606 Platform::<DefaultCoreRPC>::query_partial_status,
607 "query_partial_status",
608 )
609 .await
610 }
611
612 async fn get_evonodes_proposed_epoch_blocks_by_ids(
613 &self,
614 request: Request<GetEvonodesProposedEpochBlocksByIdsRequest>,
615 ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
616 self.handle_blocking_query(
617 request,
618 Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_evonode_ids,
619 "query_proposed_block_counts_by_evonode_ids",
620 )
621 .await
622 }
623
624 async fn get_evonodes_proposed_epoch_blocks_by_range(
625 &self,
626 request: Request<GetEvonodesProposedEpochBlocksByRangeRequest>,
627 ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
628 self.handle_blocking_query(
629 request,
630 Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_range,
631 "query_proposed_block_counts_by_range",
632 )
633 .await
634 }
635
636 async fn get_current_quorums_info(
637 &self,
638 request: Request<GetCurrentQuorumsInfoRequest>,
639 ) -> Result<Response<GetCurrentQuorumsInfoResponse>, Status> {
640 self.handle_blocking_query(
641 request,
642 Platform::<DefaultCoreRPC>::query_current_quorums_info,
643 "query_current_quorums_info",
644 )
645 .await
646 }
647
648 async fn get_identity_token_balances(
649 &self,
650 request: Request<GetIdentityTokenBalancesRequest>,
651 ) -> Result<Response<GetIdentityTokenBalancesResponse>, Status> {
652 self.handle_blocking_query(
653 request,
654 Platform::<DefaultCoreRPC>::query_identity_token_balances,
655 "query_identity_token_balances",
656 )
657 .await
658 }
659
660 async fn get_identities_token_balances(
661 &self,
662 request: Request<GetIdentitiesTokenBalancesRequest>,
663 ) -> Result<Response<GetIdentitiesTokenBalancesResponse>, Status> {
664 self.handle_blocking_query(
665 request,
666 Platform::<DefaultCoreRPC>::query_identities_token_balances,
667 "query_identities_token_balances",
668 )
669 .await
670 }
671
672 async fn get_identity_token_infos(
673 &self,
674 request: Request<GetIdentityTokenInfosRequest>,
675 ) -> Result<Response<GetIdentityTokenInfosResponse>, Status> {
676 self.handle_blocking_query(
677 request,
678 Platform::<DefaultCoreRPC>::query_identity_token_infos,
679 "query_identity_token_infos",
680 )
681 .await
682 }
683
684 async fn get_identities_token_infos(
685 &self,
686 request: Request<GetIdentitiesTokenInfosRequest>,
687 ) -> Result<Response<GetIdentitiesTokenInfosResponse>, Status> {
688 self.handle_blocking_query(
689 request,
690 Platform::<DefaultCoreRPC>::query_identities_token_infos,
691 "query_identities_token_infos",
692 )
693 .await
694 }
695
696 async fn get_token_statuses(
697 &self,
698 request: Request<GetTokenStatusesRequest>,
699 ) -> Result<Response<GetTokenStatusesResponse>, Status> {
700 self.handle_blocking_query(
701 request,
702 Platform::<DefaultCoreRPC>::query_token_statuses,
703 "get_token_statuses",
704 )
705 .await
706 }
707
708 async fn get_token_pre_programmed_distributions(
709 &self,
710 request: Request<GetTokenPreProgrammedDistributionsRequest>,
711 ) -> Result<Response<GetTokenPreProgrammedDistributionsResponse>, Status> {
712 self.handle_blocking_query(
713 request,
714 Platform::<DefaultCoreRPC>::query_token_pre_programmed_distributions,
715 "get_token_pre_programmed_distributions",
716 )
717 .await
718 }
719
720 async fn get_token_total_supply(
721 &self,
722 request: Request<GetTokenTotalSupplyRequest>,
723 ) -> Result<Response<GetTokenTotalSupplyResponse>, Status> {
724 self.handle_blocking_query(
725 request,
726 Platform::<DefaultCoreRPC>::query_token_total_supply,
727 "get_token_total_supply",
728 )
729 .await
730 }
731
732 async fn get_group_info(
733 &self,
734 request: Request<GetGroupInfoRequest>,
735 ) -> Result<Response<GetGroupInfoResponse>, Status> {
736 self.handle_blocking_query(
737 request,
738 Platform::<DefaultCoreRPC>::query_group_info,
739 "get_group_info",
740 )
741 .await
742 }
743
744 async fn get_group_infos(
745 &self,
746 request: Request<GetGroupInfosRequest>,
747 ) -> Result<Response<GetGroupInfosResponse>, Status> {
748 self.handle_blocking_query(
749 request,
750 Platform::<DefaultCoreRPC>::query_group_infos,
751 "get_group_infos",
752 )
753 .await
754 }
755
756 async fn get_group_actions(
757 &self,
758 request: Request<GetGroupActionsRequest>,
759 ) -> Result<Response<GetGroupActionsResponse>, Status> {
760 self.handle_blocking_query(
761 request,
762 Platform::<DefaultCoreRPC>::query_group_actions,
763 "get_group_actions",
764 )
765 .await
766 }
767
768 async fn get_group_action_signers(
769 &self,
770 request: Request<GetGroupActionSignersRequest>,
771 ) -> Result<Response<GetGroupActionSignersResponse>, Status> {
772 self.handle_blocking_query(
773 request,
774 Platform::<DefaultCoreRPC>::query_group_action_signers,
775 "get_group_action_signers",
776 )
777 .await
778 }
779
780 async fn get_token_direct_purchase_prices(
781 &self,
782 request: Request<GetTokenDirectPurchasePricesRequest>,
783 ) -> Result<Response<GetTokenDirectPurchasePricesResponse>, Status> {
784 self.handle_blocking_query(
785 request,
786 Platform::<DefaultCoreRPC>::query_token_direct_purchase_prices,
787 "get_token_direct_purchase_prices",
788 )
789 .await
790 }
791
792 async fn get_token_contract_info(
793 &self,
794 request: Request<GetTokenContractInfoRequest>,
795 ) -> Result<Response<GetTokenContractInfoResponse>, Status> {
796 self.handle_blocking_query(
797 request,
798 Platform::<DefaultCoreRPC>::query_token_contract_info,
799 "get_token_contract_info",
800 )
801 .await
802 }
803
804 async fn get_token_perpetual_distribution_last_claim(
805 &self,
806 request: Request<GetTokenPerpetualDistributionLastClaimRequest>,
807 ) -> Result<Response<GetTokenPerpetualDistributionLastClaimResponse>, Status> {
808 self.handle_blocking_query(
809 request,
810 Platform::<DefaultCoreRPC>::query_token_perpetual_distribution_last_claim,
811 "get_token_perpetual_distribution_last_claim",
812 )
813 .await
814 }
815
816 async fn get_finalized_epoch_infos(
817 &self,
818 request: Request<GetFinalizedEpochInfosRequest>,
819 ) -> Result<Response<GetFinalizedEpochInfosResponse>, Status> {
820 self.handle_blocking_query(
821 request,
822 Platform::<DefaultCoreRPC>::query_finalized_epoch_infos,
823 "get_finalized_epoch_infos",
824 )
825 .await
826 }
827
828 async fn get_address_info(
829 &self,
830 request: Request<GetAddressInfoRequest>,
831 ) -> Result<Response<GetAddressInfoResponse>, Status> {
832 self.handle_blocking_query(
833 request,
834 Platform::<DefaultCoreRPC>::query_address_info,
835 "get_address_info",
836 )
837 .await
838 }
839
840 async fn get_addresses_infos(
841 &self,
842 request: Request<GetAddressesInfosRequest>,
843 ) -> Result<Response<GetAddressesInfosResponse>, Status> {
844 self.handle_blocking_query(
845 request,
846 Platform::<DefaultCoreRPC>::query_addresses_infos,
847 "get_addresses_infos",
848 )
849 .await
850 }
851
852 async fn get_addresses_trunk_state(
853 &self,
854 request: Request<GetAddressesTrunkStateRequest>,
855 ) -> Result<Response<GetAddressesTrunkStateResponse>, Status> {
856 self.handle_blocking_query(
857 request,
858 Platform::<DefaultCoreRPC>::query_addresses_trunk_state,
859 "get_addresses_trunk_state",
860 )
861 .await
862 }
863
864 async fn get_addresses_branch_state(
865 &self,
866 request: Request<GetAddressesBranchStateRequest>,
867 ) -> Result<Response<GetAddressesBranchStateResponse>, Status> {
868 self.handle_blocking_query(
869 request,
870 Platform::<DefaultCoreRPC>::query_addresses_branch_state,
871 "get_addresses_branch_state",
872 )
873 .await
874 }
875
876 async fn get_recent_address_balance_changes(
877 &self,
878 request: Request<GetRecentAddressBalanceChangesRequest>,
879 ) -> Result<Response<GetRecentAddressBalanceChangesResponse>, Status> {
880 self.handle_blocking_query(
881 request,
882 Platform::<DefaultCoreRPC>::query_recent_address_balance_changes,
883 "get_recent_address_balance_changes",
884 )
885 .await
886 }
887
888 async fn get_recent_compacted_address_balance_changes(
889 &self,
890 request: Request<GetRecentCompactedAddressBalanceChangesRequest>,
891 ) -> Result<Response<GetRecentCompactedAddressBalanceChangesResponse>, Status> {
892 self.handle_blocking_query(
893 request,
894 Platform::<DefaultCoreRPC>::query_recent_compacted_address_balance_changes,
895 "get_recent_compacted_address_balance_changes",
896 )
897 .await
898 }
899
900 async fn get_shielded_encrypted_notes(
901 &self,
902 request: Request<GetShieldedEncryptedNotesRequest>,
903 ) -> Result<Response<GetShieldedEncryptedNotesResponse>, Status> {
904 self.handle_blocking_query(
905 request,
906 Platform::<DefaultCoreRPC>::query_shielded_encrypted_notes,
907 "get_shielded_encrypted_notes",
908 )
909 .await
910 }
911
912 async fn get_shielded_anchors(
913 &self,
914 request: Request<GetShieldedAnchorsRequest>,
915 ) -> Result<Response<GetShieldedAnchorsResponse>, Status> {
916 self.handle_blocking_query(
917 request,
918 Platform::<DefaultCoreRPC>::query_shielded_anchors,
919 "get_shielded_anchors",
920 )
921 .await
922 }
923
924 async fn get_most_recent_shielded_anchor(
925 &self,
926 request: Request<GetMostRecentShieldedAnchorRequest>,
927 ) -> Result<Response<GetMostRecentShieldedAnchorResponse>, Status> {
928 self.handle_blocking_query(
929 request,
930 Platform::<DefaultCoreRPC>::query_most_recent_shielded_anchor,
931 "get_most_recent_shielded_anchor",
932 )
933 .await
934 }
935
936 async fn get_shielded_pool_state(
937 &self,
938 request: Request<GetShieldedPoolStateRequest>,
939 ) -> Result<Response<GetShieldedPoolStateResponse>, Status> {
940 self.handle_blocking_query(
941 request,
942 Platform::<DefaultCoreRPC>::query_shielded_pool_state,
943 "get_shielded_pool_state",
944 )
945 .await
946 }
947
948 async fn get_shielded_notes_count(
949 &self,
950 request: Request<GetShieldedNotesCountRequest>,
951 ) -> Result<Response<GetShieldedNotesCountResponse>, Status> {
952 self.handle_blocking_query(
953 request,
954 Platform::<DefaultCoreRPC>::query_shielded_notes_count,
955 "get_shielded_notes_count",
956 )
957 .await
958 }
959
960 async fn get_shielded_nullifiers(
961 &self,
962 request: Request<GetShieldedNullifiersRequest>,
963 ) -> Result<Response<GetShieldedNullifiersResponse>, Status> {
964 self.handle_blocking_query(
965 request,
966 Platform::<DefaultCoreRPC>::query_shielded_nullifiers,
967 "get_shielded_nullifiers",
968 )
969 .await
970 }
971}
972
973#[async_trait]
974impl DriveInternal for QueryService {
975 async fn get_proofs(
976 &self,
977 request: Request<GetProofsRequest>,
978 ) -> Result<Response<GetProofsResponse>, Status> {
979 self.handle_blocking_query(
980 request,
981 Platform::<DefaultCoreRPC>::query_proofs,
982 "get_proofs",
983 )
984 .await
985 }
986}
987
988fn query_error_into_status(error: QueryError) -> Status {
989 match error {
990 QueryError::NotFound(message) => Status::not_found(message),
991 QueryError::InvalidArgument(message) => Status::invalid_argument(message),
992 QueryError::Query(error) => Status::invalid_argument(error.to_string()),
993 _ => {
994 tracing::error!("unexpected query error: {:?}", error);
995
996 Status::unknown(error.to_string())
997 }
998 }
999}
1000
1001fn error_into_status(error: Error) -> Status {
1002 Status::internal(format!("query: {}", error))
1003}