1use crate::error::query::QueryError;
2use crate::error::Error;
3use crate::metrics::{abci_response_code_metric_label, query_duration_metric};
4use crate::platform_types::platform::Platform;
5use crate::platform_types::platform_state::PlatformState;
6use crate::platform_types::platform_state::PlatformStateV0Methods;
7use crate::query::QueryValidationResult;
8use crate::rpc::core::DefaultCoreRPC;
9use crate::utils::spawn_blocking_task_with_name_if_supported;
10use async_trait::async_trait;
11use dapi_grpc::drive::v0::drive_internal_server::DriveInternal;
12use dapi_grpc::drive::v0::{GetProofsRequest, GetProofsResponse};
13use dapi_grpc::platform::v0::platform_server::Platform as PlatformService;
14use dapi_grpc::platform::v0::{
15 BroadcastStateTransitionRequest, BroadcastStateTransitionResponse, GetAddressInfoRequest,
16 GetAddressInfoResponse, GetAddressesBranchStateRequest, GetAddressesBranchStateResponse,
17 GetAddressesInfosRequest, GetAddressesInfosResponse, GetAddressesTrunkStateRequest,
18 GetAddressesTrunkStateResponse, GetConsensusParamsRequest, GetConsensusParamsResponse,
19 GetContestedResourceIdentityVotesRequest, GetContestedResourceIdentityVotesResponse,
20 GetContestedResourceVoteStateRequest, GetContestedResourceVoteStateResponse,
21 GetContestedResourceVotersForIdentityRequest, GetContestedResourceVotersForIdentityResponse,
22 GetContestedResourcesRequest, GetContestedResourcesResponse, GetCurrentQuorumsInfoRequest,
23 GetCurrentQuorumsInfoResponse, GetDataContractHistoryRequest, GetDataContractHistoryResponse,
24 GetDataContractRequest, GetDataContractResponse, GetDataContractsRequest,
25 GetDataContractsResponse, GetDocumentsRequest, GetDocumentsResponse, GetEpochsInfoRequest,
26 GetEpochsInfoResponse, GetEvonodesProposedEpochBlocksByIdsRequest,
27 GetEvonodesProposedEpochBlocksByRangeRequest, GetEvonodesProposedEpochBlocksResponse,
28 GetFinalizedEpochInfosRequest, GetFinalizedEpochInfosResponse, GetGroupActionSignersRequest,
29 GetGroupActionSignersResponse, GetGroupActionsRequest, GetGroupActionsResponse,
30 GetGroupInfoRequest, GetGroupInfoResponse, GetGroupInfosRequest, GetGroupInfosResponse,
31 GetIdentitiesBalancesRequest, GetIdentitiesBalancesResponse, GetIdentitiesContractKeysRequest,
32 GetIdentitiesContractKeysResponse, GetIdentitiesTokenBalancesRequest,
33 GetIdentitiesTokenBalancesResponse, GetIdentitiesTokenInfosRequest,
34 GetIdentitiesTokenInfosResponse, GetIdentityBalanceAndRevisionRequest,
35 GetIdentityBalanceAndRevisionResponse, GetIdentityBalanceRequest, GetIdentityBalanceResponse,
36 GetIdentityByNonUniquePublicKeyHashRequest, GetIdentityByNonUniquePublicKeyHashResponse,
37 GetIdentityByPublicKeyHashRequest, GetIdentityByPublicKeyHashResponse,
38 GetIdentityContractNonceRequest, GetIdentityContractNonceResponse, GetIdentityKeysRequest,
39 GetIdentityKeysResponse, GetIdentityNonceRequest, GetIdentityNonceResponse, GetIdentityRequest,
40 GetIdentityResponse, GetIdentityTokenBalancesRequest, GetIdentityTokenBalancesResponse,
41 GetIdentityTokenInfosRequest, GetIdentityTokenInfosResponse,
42 GetMostRecentShieldedAnchorRequest, GetMostRecentShieldedAnchorResponse,
43 GetNullifiersBranchStateRequest, GetNullifiersBranchStateResponse,
44 GetNullifiersTrunkStateRequest, GetNullifiersTrunkStateResponse, GetPathElementsRequest,
45 GetPathElementsResponse, GetPrefundedSpecializedBalanceRequest,
46 GetPrefundedSpecializedBalanceResponse, GetProtocolVersionUpgradeStateRequest,
47 GetProtocolVersionUpgradeStateResponse, GetProtocolVersionUpgradeVoteStatusRequest,
48 GetProtocolVersionUpgradeVoteStatusResponse, GetRecentAddressBalanceChangesRequest,
49 GetRecentAddressBalanceChangesResponse, GetRecentCompactedAddressBalanceChangesRequest,
50 GetRecentCompactedAddressBalanceChangesResponse, GetRecentCompactedNullifierChangesRequest,
51 GetRecentCompactedNullifierChangesResponse, GetRecentNullifierChangesRequest,
52 GetRecentNullifierChangesResponse, GetShieldedAnchorsRequest, GetShieldedAnchorsResponse,
53 GetShieldedEncryptedNotesRequest, GetShieldedEncryptedNotesResponse,
54 GetShieldedNullifiersRequest, GetShieldedNullifiersResponse, GetShieldedPoolStateRequest,
55 GetShieldedPoolStateResponse, GetStatusRequest, GetStatusResponse, GetTokenContractInfoRequest,
56 GetTokenContractInfoResponse, GetTokenDirectPurchasePricesRequest,
57 GetTokenDirectPurchasePricesResponse, GetTokenPerpetualDistributionLastClaimRequest,
58 GetTokenPerpetualDistributionLastClaimResponse, GetTokenPreProgrammedDistributionsRequest,
59 GetTokenPreProgrammedDistributionsResponse, GetTokenStatusesRequest, GetTokenStatusesResponse,
60 GetTokenTotalSupplyRequest, GetTokenTotalSupplyResponse, GetTotalCreditsInPlatformRequest,
61 GetTotalCreditsInPlatformResponse, GetVotePollsByEndDateRequest, GetVotePollsByEndDateResponse,
62 WaitForStateTransitionResultRequest, WaitForStateTransitionResultResponse,
63};
64use dapi_grpc::tonic::{Code, Request, Response, Status};
65use dpp::version::PlatformVersion;
66use std::fmt::Debug;
67use std::sync::atomic::Ordering;
68use std::sync::Arc;
69use std::thread::sleep;
70use std::time::Duration;
71use tracing::Instrument;
72
73pub struct QueryService {
75 platform: Arc<Platform<DefaultCoreRPC>>,
76}
77
78type QueryMethod<RQ, RS> = fn(
79 &Platform<DefaultCoreRPC>,
80 RQ,
81 &PlatformState,
82 &PlatformVersion,
83) -> Result<QueryValidationResult<RS>, Error>;
84
85impl QueryService {
86 pub fn new(platform: Arc<Platform<DefaultCoreRPC>>) -> Self {
88 Self { platform }
89 }
90
91 async fn handle_blocking_query<RQ, RS>(
92 &self,
93 request: Request<RQ>,
94 query_method: QueryMethod<RQ, RS>,
95 endpoint_name: &str,
96 ) -> Result<Response<RS>, Status>
97 where
98 RS: Clone + Send + 'static,
99 RQ: Debug + Send + Clone + 'static,
100 {
101 let mut response_duration_metric = query_duration_metric(endpoint_name);
102
103 let platform = Arc::clone(&self.platform);
104
105 let request_debug = format!("{:?}", &request);
106
107 let result = spawn_blocking_task_with_name_if_supported("query", move || {
108 let mut result;
109
110 let query_request = request.into_inner();
111
112 let mut query_counter = 0;
113
114 loop {
115 let platform_state = platform.state.load();
116
117 let platform_version = platform_state
118 .current_platform_version()
119 .map_err(|_| Status::unavailable("platform is not initialized"))?;
120
121 let mut needs_restart = false;
128
129 loop {
130 let committed_block_height_guard = platform
131 .committed_block_height_guard
132 .load(Ordering::Relaxed);
133 let mut counter = 0;
134 if platform_state.last_committed_block_height() == committed_block_height_guard
135 {
136 break;
137 } else {
138 counter += 1;
139 sleep(Duration::from_millis(10))
140 }
141
142 if counter >= 100 {
144 query_counter += 1;
145 needs_restart = true;
146 break;
147 }
148 }
149
150 if query_counter > 3 {
151 return Err(query_error_into_status(QueryError::NotServiceable(
152 "platform is saturated (did not attempt query)".to_string(),
153 )));
154 }
155
156 if needs_restart {
157 continue;
158 }
159
160 result = query_method(
161 &platform,
162 query_request.clone(),
163 &platform_state,
164 platform_version,
165 );
166
167 let committed_block_height_guard = platform
168 .committed_block_height_guard
169 .load(Ordering::Relaxed);
170
171 if platform_state.last_committed_block_height() == committed_block_height_guard {
172 break;
174 } else {
175 query_counter += 1;
176
177 if query_counter > 2 {
178 return Err(query_error_into_status(QueryError::NotServiceable(
180 "platform is saturated".to_string(),
181 )));
182 }
183 }
184 }
185
186 let mut query_result = result.map_err(error_into_status)?;
187
188 if query_result.is_valid() {
189 let response = query_result
190 .into_data()
191 .map_err(|error| error_into_status(error.into()))?;
192
193 Ok(Response::new(response))
194 } else {
195 let error = query_result.errors.swap_remove(0);
196
197 Err(query_error_into_status(error))
198 }
199 })?
200 .instrument(tracing::trace_span!("query", endpoint_name))
201 .await
202 .map_err(|error| Status::internal(format!("query thread failed: {}", error)))?;
203
204 let code = match &result {
206 Ok(_) => Code::Ok,
207 Err(status) => status.code(),
208 };
209
210 let code_label = format!("{:?}", code).to_lowercase();
211
212 let label = abci_response_code_metric_label(code);
214 response_duration_metric.add_label(label);
215
216 match code {
217 Code::Ok
219 | Code::InvalidArgument
220 | Code::NotFound
221 | Code::AlreadyExists
222 | Code::ResourceExhausted
223 | Code::PermissionDenied
224 | Code::Unavailable
225 | Code::Aborted
226 | Code::FailedPrecondition
227 | Code::OutOfRange
228 | Code::Cancelled
229 | Code::DeadlineExceeded
230 | Code::Unauthenticated => {
231 let elapsed_time = response_duration_metric.elapsed().as_secs_f64();
232
233 tracing::trace!(
234 request = request_debug,
235 elapsed_time,
236 endpoint_name,
237 code = code_label,
238 "query '{}' executed with code {:?} in {} secs",
239 endpoint_name,
240 code,
241 elapsed_time
242 );
243 }
244 Code::Unknown | Code::Unimplemented | Code::Internal | Code::DataLoss => {
246 tracing::error!(
247 request = request_debug,
248 endpoint_name,
249 code = code_label,
250 "query '{}' execution failed with code {:?}",
251 endpoint_name,
252 code
253 );
254 }
255 }
256
257 result
258 }
259}
260
261fn respond_with_unimplemented<RS>(name: &str) -> Result<Response<RS>, Status> {
262 tracing::error!("{} endpoint is called but it's not supported", name);
263
264 Err(Status::unimplemented("the endpoint is not supported"))
265}
266
267#[async_trait]
268impl PlatformService for QueryService {
269 async fn broadcast_state_transition(
270 &self,
271 _request: Request<BroadcastStateTransitionRequest>,
272 ) -> Result<Response<BroadcastStateTransitionResponse>, Status> {
273 respond_with_unimplemented("broadcast_state_transition")
274 }
275
276 async fn get_identity(
277 &self,
278 request: Request<GetIdentityRequest>,
279 ) -> Result<Response<GetIdentityResponse>, Status> {
280 self.handle_blocking_query(
281 request,
282 Platform::<DefaultCoreRPC>::query_identity,
283 "get_identity",
284 )
285 .await
286 }
287
288 async fn get_identities_contract_keys(
289 &self,
290 request: Request<GetIdentitiesContractKeysRequest>,
291 ) -> Result<Response<GetIdentitiesContractKeysResponse>, Status> {
292 self.handle_blocking_query(
293 request,
294 Platform::<DefaultCoreRPC>::query_identities_contract_keys,
295 "get_identities_contract_keys",
296 )
297 .await
298 }
299
300 async fn get_identity_keys(
301 &self,
302 request: Request<GetIdentityKeysRequest>,
303 ) -> Result<Response<GetIdentityKeysResponse>, Status> {
304 self.handle_blocking_query(
305 request,
306 Platform::<DefaultCoreRPC>::query_keys,
307 "get_identity_keys",
308 )
309 .await
310 }
311
312 async fn get_identity_nonce(
313 &self,
314 request: Request<GetIdentityNonceRequest>,
315 ) -> Result<Response<GetIdentityNonceResponse>, Status> {
316 self.handle_blocking_query(
317 request,
318 Platform::<DefaultCoreRPC>::query_identity_nonce,
319 "get_identity_nonce",
320 )
321 .await
322 }
323
324 async fn get_identity_contract_nonce(
325 &self,
326 request: Request<GetIdentityContractNonceRequest>,
327 ) -> Result<Response<GetIdentityContractNonceResponse>, Status> {
328 self.handle_blocking_query(
329 request,
330 Platform::<DefaultCoreRPC>::query_identity_contract_nonce,
331 "get_identity_contract_nonce",
332 )
333 .await
334 }
335
336 async fn get_identity_balance(
337 &self,
338 request: Request<GetIdentityBalanceRequest>,
339 ) -> Result<Response<GetIdentityBalanceResponse>, Status> {
340 self.handle_blocking_query(
341 request,
342 Platform::<DefaultCoreRPC>::query_balance,
343 "get_identity_balance",
344 )
345 .await
346 }
347
348 async fn get_identity_balance_and_revision(
349 &self,
350 request: Request<GetIdentityBalanceAndRevisionRequest>,
351 ) -> Result<Response<GetIdentityBalanceAndRevisionResponse>, Status> {
352 self.handle_blocking_query(
353 request,
354 Platform::<DefaultCoreRPC>::query_balance_and_revision,
355 "get_identity_balance_and_revision",
356 )
357 .await
358 }
359
360 async fn get_data_contract(
361 &self,
362 request: Request<GetDataContractRequest>,
363 ) -> Result<Response<GetDataContractResponse>, Status> {
364 self.handle_blocking_query(
365 request,
366 Platform::<DefaultCoreRPC>::query_data_contract,
367 "get_data_contract",
368 )
369 .await
370 }
371
372 async fn get_data_contract_history(
373 &self,
374 request: Request<GetDataContractHistoryRequest>,
375 ) -> Result<Response<GetDataContractHistoryResponse>, Status> {
376 self.handle_blocking_query(
377 request,
378 Platform::<DefaultCoreRPC>::query_data_contract_history,
379 "get_data_contract_history",
380 )
381 .await
382 }
383
384 async fn get_data_contracts(
385 &self,
386 request: Request<GetDataContractsRequest>,
387 ) -> Result<Response<GetDataContractsResponse>, Status> {
388 self.handle_blocking_query(
389 request,
390 Platform::<DefaultCoreRPC>::query_data_contracts,
391 "get_data_contracts",
392 )
393 .await
394 }
395
396 async fn get_documents(
397 &self,
398 request: Request<GetDocumentsRequest>,
399 ) -> Result<Response<GetDocumentsResponse>, Status> {
400 self.handle_blocking_query(
401 request,
402 Platform::<DefaultCoreRPC>::query_documents,
403 "get_documents",
404 )
405 .await
406 }
407
408 async fn get_identity_by_public_key_hash(
409 &self,
410 request: Request<GetIdentityByPublicKeyHashRequest>,
411 ) -> Result<Response<GetIdentityByPublicKeyHashResponse>, Status> {
412 self.handle_blocking_query(
413 request,
414 Platform::<DefaultCoreRPC>::query_identity_by_public_key_hash,
415 "get_identity_by_public_key_hash",
416 )
417 .await
418 }
419
420 async fn get_identity_by_non_unique_public_key_hash(
421 &self,
422 request: Request<GetIdentityByNonUniquePublicKeyHashRequest>,
423 ) -> Result<Response<GetIdentityByNonUniquePublicKeyHashResponse>, Status> {
424 self.handle_blocking_query(
425 request,
426 Platform::<DefaultCoreRPC>::query_identity_by_non_unique_public_key_hash,
427 "get_identity_by_non_unique_public_key_hash",
428 )
429 .await
430 }
431
432 async fn wait_for_state_transition_result(
433 &self,
434 _request: Request<WaitForStateTransitionResultRequest>,
435 ) -> Result<Response<WaitForStateTransitionResultResponse>, Status> {
436 respond_with_unimplemented("wait_for_state_transition_result")
437 }
438
439 async fn get_consensus_params(
440 &self,
441 _request: Request<GetConsensusParamsRequest>,
442 ) -> Result<Response<GetConsensusParamsResponse>, Status> {
443 respond_with_unimplemented("get_consensus_params")
444 }
445
446 async fn get_protocol_version_upgrade_state(
447 &self,
448 request: Request<GetProtocolVersionUpgradeStateRequest>,
449 ) -> Result<Response<GetProtocolVersionUpgradeStateResponse>, Status> {
450 self.handle_blocking_query(
451 request,
452 Platform::<DefaultCoreRPC>::query_version_upgrade_state,
453 "get_protocol_version_upgrade_state",
454 )
455 .await
456 }
457
458 async fn get_protocol_version_upgrade_vote_status(
459 &self,
460 request: Request<GetProtocolVersionUpgradeVoteStatusRequest>,
461 ) -> Result<Response<GetProtocolVersionUpgradeVoteStatusResponse>, Status> {
462 self.handle_blocking_query(
463 request,
464 Platform::<DefaultCoreRPC>::query_version_upgrade_vote_status,
465 "get_protocol_version_upgrade_vote_status",
466 )
467 .await
468 }
469
470 async fn get_epochs_info(
471 &self,
472 request: Request<GetEpochsInfoRequest>,
473 ) -> Result<Response<GetEpochsInfoResponse>, Status> {
474 self.handle_blocking_query(
475 request,
476 Platform::<DefaultCoreRPC>::query_epoch_infos,
477 "get_epochs_info",
478 )
479 .await
480 }
481
482 async fn get_path_elements(
483 &self,
484 request: Request<GetPathElementsRequest>,
485 ) -> Result<Response<GetPathElementsResponse>, Status> {
486 self.handle_blocking_query(
487 request,
488 Platform::<DefaultCoreRPC>::query_path_elements,
489 "get_path_elements",
490 )
491 .await
492 }
493
494 async fn get_contested_resources(
495 &self,
496 request: Request<GetContestedResourcesRequest>,
497 ) -> Result<Response<GetContestedResourcesResponse>, Status> {
498 self.handle_blocking_query(
499 request,
500 Platform::<DefaultCoreRPC>::query_contested_resources,
501 "get_contested_resources",
502 )
503 .await
504 }
505
506 async fn get_contested_resource_vote_state(
507 &self,
508 request: Request<GetContestedResourceVoteStateRequest>,
509 ) -> Result<Response<GetContestedResourceVoteStateResponse>, Status> {
510 self.handle_blocking_query(
511 request,
512 Platform::<DefaultCoreRPC>::query_contested_resource_vote_state,
513 "get_contested_resource_vote_state",
514 )
515 .await
516 }
517
518 async fn get_contested_resource_voters_for_identity(
519 &self,
520 request: Request<GetContestedResourceVotersForIdentityRequest>,
521 ) -> Result<Response<GetContestedResourceVotersForIdentityResponse>, Status> {
522 self.handle_blocking_query(
523 request,
524 Platform::<DefaultCoreRPC>::query_contested_resource_voters_for_identity,
525 "get_contested_resource_voters_for_identity",
526 )
527 .await
528 }
529
530 async fn get_contested_resource_identity_votes(
531 &self,
532 request: Request<GetContestedResourceIdentityVotesRequest>,
533 ) -> Result<Response<GetContestedResourceIdentityVotesResponse>, Status> {
534 self.handle_blocking_query(
535 request,
536 Platform::<DefaultCoreRPC>::query_contested_resource_identity_votes,
537 "get_contested_resource_identity_votes",
538 )
539 .await
540 }
541
542 async fn get_vote_polls_by_end_date(
543 &self,
544 request: Request<GetVotePollsByEndDateRequest>,
545 ) -> Result<Response<GetVotePollsByEndDateResponse>, Status> {
546 self.handle_blocking_query(
547 request,
548 Platform::<DefaultCoreRPC>::query_vote_polls_by_end_date_query,
549 "get_vote_polls_by_end_date",
550 )
551 .await
552 }
553
554 async fn get_prefunded_specialized_balance(
555 &self,
556 request: Request<GetPrefundedSpecializedBalanceRequest>,
557 ) -> Result<Response<GetPrefundedSpecializedBalanceResponse>, Status> {
558 self.handle_blocking_query(
559 request,
560 Platform::<DefaultCoreRPC>::query_prefunded_specialized_balance,
561 "get_prefunded_specialized_balance",
562 )
563 .await
564 }
565
566 async fn get_total_credits_in_platform(
567 &self,
568 request: Request<GetTotalCreditsInPlatformRequest>,
569 ) -> Result<Response<GetTotalCreditsInPlatformResponse>, Status> {
570 self.handle_blocking_query(
571 request,
572 Platform::<DefaultCoreRPC>::query_total_credits_in_platform,
573 "get_total_credits_in_platform",
574 )
575 .await
576 }
577
578 async fn get_identities_balances(
579 &self,
580 request: Request<GetIdentitiesBalancesRequest>,
581 ) -> Result<Response<GetIdentitiesBalancesResponse>, Status> {
582 self.handle_blocking_query(
583 request,
584 Platform::<DefaultCoreRPC>::query_identities_balances,
585 "get_identities_balances",
586 )
587 .await
588 }
589
590 async fn get_status(
591 &self,
592 request: Request<GetStatusRequest>,
593 ) -> Result<Response<GetStatusResponse>, Status> {
594 self.handle_blocking_query(
595 request,
596 Platform::<DefaultCoreRPC>::query_partial_status,
597 "query_partial_status",
598 )
599 .await
600 }
601
602 async fn get_evonodes_proposed_epoch_blocks_by_ids(
603 &self,
604 request: Request<GetEvonodesProposedEpochBlocksByIdsRequest>,
605 ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
606 self.handle_blocking_query(
607 request,
608 Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_evonode_ids,
609 "query_proposed_block_counts_by_evonode_ids",
610 )
611 .await
612 }
613
614 async fn get_evonodes_proposed_epoch_blocks_by_range(
615 &self,
616 request: Request<GetEvonodesProposedEpochBlocksByRangeRequest>,
617 ) -> Result<Response<GetEvonodesProposedEpochBlocksResponse>, Status> {
618 self.handle_blocking_query(
619 request,
620 Platform::<DefaultCoreRPC>::query_proposed_block_counts_by_range,
621 "query_proposed_block_counts_by_range",
622 )
623 .await
624 }
625
626 async fn get_current_quorums_info(
627 &self,
628 request: Request<GetCurrentQuorumsInfoRequest>,
629 ) -> Result<Response<GetCurrentQuorumsInfoResponse>, Status> {
630 self.handle_blocking_query(
631 request,
632 Platform::<DefaultCoreRPC>::query_current_quorums_info,
633 "query_current_quorums_info",
634 )
635 .await
636 }
637
638 async fn get_identity_token_balances(
639 &self,
640 request: Request<GetIdentityTokenBalancesRequest>,
641 ) -> Result<Response<GetIdentityTokenBalancesResponse>, Status> {
642 self.handle_blocking_query(
643 request,
644 Platform::<DefaultCoreRPC>::query_identity_token_balances,
645 "query_identity_token_balances",
646 )
647 .await
648 }
649
650 async fn get_identities_token_balances(
651 &self,
652 request: Request<GetIdentitiesTokenBalancesRequest>,
653 ) -> Result<Response<GetIdentitiesTokenBalancesResponse>, Status> {
654 self.handle_blocking_query(
655 request,
656 Platform::<DefaultCoreRPC>::query_identities_token_balances,
657 "query_identities_token_balances",
658 )
659 .await
660 }
661
662 async fn get_identity_token_infos(
663 &self,
664 request: Request<GetIdentityTokenInfosRequest>,
665 ) -> Result<Response<GetIdentityTokenInfosResponse>, Status> {
666 self.handle_blocking_query(
667 request,
668 Platform::<DefaultCoreRPC>::query_identity_token_infos,
669 "query_identity_token_infos",
670 )
671 .await
672 }
673
674 async fn get_identities_token_infos(
675 &self,
676 request: Request<GetIdentitiesTokenInfosRequest>,
677 ) -> Result<Response<GetIdentitiesTokenInfosResponse>, Status> {
678 self.handle_blocking_query(
679 request,
680 Platform::<DefaultCoreRPC>::query_identities_token_infos,
681 "query_identities_token_infos",
682 )
683 .await
684 }
685
686 async fn get_token_statuses(
687 &self,
688 request: Request<GetTokenStatusesRequest>,
689 ) -> Result<Response<GetTokenStatusesResponse>, Status> {
690 self.handle_blocking_query(
691 request,
692 Platform::<DefaultCoreRPC>::query_token_statuses,
693 "get_token_statuses",
694 )
695 .await
696 }
697
698 async fn get_token_pre_programmed_distributions(
699 &self,
700 request: Request<GetTokenPreProgrammedDistributionsRequest>,
701 ) -> Result<Response<GetTokenPreProgrammedDistributionsResponse>, Status> {
702 self.handle_blocking_query(
703 request,
704 Platform::<DefaultCoreRPC>::query_token_pre_programmed_distributions,
705 "get_token_pre_programmed_distributions",
706 )
707 .await
708 }
709
710 async fn get_token_total_supply(
711 &self,
712 request: Request<GetTokenTotalSupplyRequest>,
713 ) -> Result<Response<GetTokenTotalSupplyResponse>, Status> {
714 self.handle_blocking_query(
715 request,
716 Platform::<DefaultCoreRPC>::query_token_total_supply,
717 "get_token_total_supply",
718 )
719 .await
720 }
721
722 async fn get_group_info(
723 &self,
724 request: Request<GetGroupInfoRequest>,
725 ) -> Result<Response<GetGroupInfoResponse>, Status> {
726 self.handle_blocking_query(
727 request,
728 Platform::<DefaultCoreRPC>::query_group_info,
729 "get_group_info",
730 )
731 .await
732 }
733
734 async fn get_group_infos(
735 &self,
736 request: Request<GetGroupInfosRequest>,
737 ) -> Result<Response<GetGroupInfosResponse>, Status> {
738 self.handle_blocking_query(
739 request,
740 Platform::<DefaultCoreRPC>::query_group_infos,
741 "get_group_infos",
742 )
743 .await
744 }
745
746 async fn get_group_actions(
747 &self,
748 request: Request<GetGroupActionsRequest>,
749 ) -> Result<Response<GetGroupActionsResponse>, Status> {
750 self.handle_blocking_query(
751 request,
752 Platform::<DefaultCoreRPC>::query_group_actions,
753 "get_group_actions",
754 )
755 .await
756 }
757
758 async fn get_group_action_signers(
759 &self,
760 request: Request<GetGroupActionSignersRequest>,
761 ) -> Result<Response<GetGroupActionSignersResponse>, Status> {
762 self.handle_blocking_query(
763 request,
764 Platform::<DefaultCoreRPC>::query_group_action_signers,
765 "get_group_action_signers",
766 )
767 .await
768 }
769
770 async fn get_token_direct_purchase_prices(
771 &self,
772 request: Request<GetTokenDirectPurchasePricesRequest>,
773 ) -> Result<Response<GetTokenDirectPurchasePricesResponse>, Status> {
774 self.handle_blocking_query(
775 request,
776 Platform::<DefaultCoreRPC>::query_token_direct_purchase_prices,
777 "get_token_direct_purchase_prices",
778 )
779 .await
780 }
781
782 async fn get_token_contract_info(
783 &self,
784 request: Request<GetTokenContractInfoRequest>,
785 ) -> Result<Response<GetTokenContractInfoResponse>, Status> {
786 self.handle_blocking_query(
787 request,
788 Platform::<DefaultCoreRPC>::query_token_contract_info,
789 "get_token_contract_info",
790 )
791 .await
792 }
793
794 async fn get_token_perpetual_distribution_last_claim(
795 &self,
796 request: Request<GetTokenPerpetualDistributionLastClaimRequest>,
797 ) -> Result<Response<GetTokenPerpetualDistributionLastClaimResponse>, Status> {
798 self.handle_blocking_query(
799 request,
800 Platform::<DefaultCoreRPC>::query_token_perpetual_distribution_last_claim,
801 "get_token_perpetual_distribution_last_claim",
802 )
803 .await
804 }
805
806 async fn get_finalized_epoch_infos(
807 &self,
808 request: Request<GetFinalizedEpochInfosRequest>,
809 ) -> Result<Response<GetFinalizedEpochInfosResponse>, Status> {
810 self.handle_blocking_query(
811 request,
812 Platform::<DefaultCoreRPC>::query_finalized_epoch_infos,
813 "get_finalized_epoch_infos",
814 )
815 .await
816 }
817
818 async fn get_address_info(
819 &self,
820 request: Request<GetAddressInfoRequest>,
821 ) -> Result<Response<GetAddressInfoResponse>, Status> {
822 self.handle_blocking_query(
823 request,
824 Platform::<DefaultCoreRPC>::query_address_info,
825 "get_address_info",
826 )
827 .await
828 }
829
830 async fn get_addresses_infos(
831 &self,
832 request: Request<GetAddressesInfosRequest>,
833 ) -> Result<Response<GetAddressesInfosResponse>, Status> {
834 self.handle_blocking_query(
835 request,
836 Platform::<DefaultCoreRPC>::query_addresses_infos,
837 "get_addresses_infos",
838 )
839 .await
840 }
841
842 async fn get_addresses_trunk_state(
843 &self,
844 request: Request<GetAddressesTrunkStateRequest>,
845 ) -> Result<Response<GetAddressesTrunkStateResponse>, Status> {
846 self.handle_blocking_query(
847 request,
848 Platform::<DefaultCoreRPC>::query_addresses_trunk_state,
849 "get_addresses_trunk_state",
850 )
851 .await
852 }
853
854 async fn get_addresses_branch_state(
855 &self,
856 request: Request<GetAddressesBranchStateRequest>,
857 ) -> Result<Response<GetAddressesBranchStateResponse>, Status> {
858 self.handle_blocking_query(
859 request,
860 Platform::<DefaultCoreRPC>::query_addresses_branch_state,
861 "get_addresses_branch_state",
862 )
863 .await
864 }
865
866 async fn get_recent_address_balance_changes(
867 &self,
868 request: Request<GetRecentAddressBalanceChangesRequest>,
869 ) -> Result<Response<GetRecentAddressBalanceChangesResponse>, Status> {
870 self.handle_blocking_query(
871 request,
872 Platform::<DefaultCoreRPC>::query_recent_address_balance_changes,
873 "get_recent_address_balance_changes",
874 )
875 .await
876 }
877
878 async fn get_recent_compacted_address_balance_changes(
879 &self,
880 request: Request<GetRecentCompactedAddressBalanceChangesRequest>,
881 ) -> Result<Response<GetRecentCompactedAddressBalanceChangesResponse>, Status> {
882 self.handle_blocking_query(
883 request,
884 Platform::<DefaultCoreRPC>::query_recent_compacted_address_balance_changes,
885 "get_recent_compacted_address_balance_changes",
886 )
887 .await
888 }
889
890 async fn get_shielded_encrypted_notes(
891 &self,
892 request: Request<GetShieldedEncryptedNotesRequest>,
893 ) -> Result<Response<GetShieldedEncryptedNotesResponse>, Status> {
894 self.handle_blocking_query(
895 request,
896 Platform::<DefaultCoreRPC>::query_shielded_encrypted_notes,
897 "get_shielded_encrypted_notes",
898 )
899 .await
900 }
901
902 async fn get_shielded_anchors(
903 &self,
904 request: Request<GetShieldedAnchorsRequest>,
905 ) -> Result<Response<GetShieldedAnchorsResponse>, Status> {
906 self.handle_blocking_query(
907 request,
908 Platform::<DefaultCoreRPC>::query_shielded_anchors,
909 "get_shielded_anchors",
910 )
911 .await
912 }
913
914 async fn get_most_recent_shielded_anchor(
915 &self,
916 request: Request<GetMostRecentShieldedAnchorRequest>,
917 ) -> Result<Response<GetMostRecentShieldedAnchorResponse>, Status> {
918 self.handle_blocking_query(
919 request,
920 Platform::<DefaultCoreRPC>::query_most_recent_shielded_anchor,
921 "get_most_recent_shielded_anchor",
922 )
923 .await
924 }
925
926 async fn get_shielded_pool_state(
927 &self,
928 request: Request<GetShieldedPoolStateRequest>,
929 ) -> Result<Response<GetShieldedPoolStateResponse>, Status> {
930 self.handle_blocking_query(
931 request,
932 Platform::<DefaultCoreRPC>::query_shielded_pool_state,
933 "get_shielded_pool_state",
934 )
935 .await
936 }
937
938 async fn get_shielded_nullifiers(
939 &self,
940 request: Request<GetShieldedNullifiersRequest>,
941 ) -> Result<Response<GetShieldedNullifiersResponse>, Status> {
942 self.handle_blocking_query(
943 request,
944 Platform::<DefaultCoreRPC>::query_shielded_nullifiers,
945 "get_shielded_nullifiers",
946 )
947 .await
948 }
949
950 async fn get_nullifiers_trunk_state(
951 &self,
952 request: Request<GetNullifiersTrunkStateRequest>,
953 ) -> Result<Response<GetNullifiersTrunkStateResponse>, Status> {
954 self.handle_blocking_query(
955 request,
956 Platform::<DefaultCoreRPC>::query_nullifiers_trunk_state,
957 "get_nullifiers_trunk_state",
958 )
959 .await
960 }
961
962 async fn get_nullifiers_branch_state(
963 &self,
964 request: Request<GetNullifiersBranchStateRequest>,
965 ) -> Result<Response<GetNullifiersBranchStateResponse>, Status> {
966 self.handle_blocking_query(
967 request,
968 Platform::<DefaultCoreRPC>::query_nullifiers_branch_state,
969 "get_nullifiers_branch_state",
970 )
971 .await
972 }
973
974 async fn get_recent_nullifier_changes(
975 &self,
976 request: Request<GetRecentNullifierChangesRequest>,
977 ) -> Result<Response<GetRecentNullifierChangesResponse>, Status> {
978 self.handle_blocking_query(
979 request,
980 Platform::<DefaultCoreRPC>::query_recent_nullifier_changes,
981 "get_recent_nullifier_changes",
982 )
983 .await
984 }
985
986 async fn get_recent_compacted_nullifier_changes(
987 &self,
988 request: Request<GetRecentCompactedNullifierChangesRequest>,
989 ) -> Result<Response<GetRecentCompactedNullifierChangesResponse>, Status> {
990 self.handle_blocking_query(
991 request,
992 Platform::<DefaultCoreRPC>::query_recent_compacted_nullifier_changes,
993 "get_recent_compacted_nullifier_changes",
994 )
995 .await
996 }
997}
998
999#[async_trait]
1000impl DriveInternal for QueryService {
1001 async fn get_proofs(
1002 &self,
1003 request: Request<GetProofsRequest>,
1004 ) -> Result<Response<GetProofsResponse>, Status> {
1005 self.handle_blocking_query(
1006 request,
1007 Platform::<DefaultCoreRPC>::query_proofs,
1008 "get_proofs",
1009 )
1010 .await
1011 }
1012}
1013
1014fn query_error_into_status(error: QueryError) -> Status {
1015 match error {
1016 QueryError::NotFound(message) => Status::not_found(message),
1017 QueryError::InvalidArgument(message) => Status::invalid_argument(message),
1018 QueryError::Query(error) => Status::invalid_argument(error.to_string()),
1019 _ => {
1020 tracing::error!("unexpected query error: {:?}", error);
1021
1022 Status::unknown(error.to_string())
1023 }
1024 }
1025}
1026
1027fn error_into_status(error: Error) -> Status {
1028 Status::internal(format!("query: {}", error))
1029}