1use crate::abci::app::CheckTxAbciApplication;
5use crate::abci::app::ConsensusAbciApplication;
6use crate::config::PlatformConfig;
7use crate::platform_types::platform::Platform;
8use crate::query::QueryService;
9use crate::rpc::core::DefaultCoreRPC;
10use std::sync::Arc;
11use tokio::runtime::Runtime;
12use tokio_util::sync::CancellationToken;
13
14pub fn start(
18 runtime: &Runtime,
19 platform: Arc<Platform<DefaultCoreRPC>>,
20 config: PlatformConfig,
21 cancel: CancellationToken,
22) {
23 let query_service = Arc::new(QueryService::new(Arc::clone(&platform)));
24
25 let drive_internal = Arc::clone(&query_service);
26
27 let check_tx_core_rpc = DefaultCoreRPC::open(
28 &config.core.check_tx_rpc.url(),
29 config.core.check_tx_rpc.username,
30 config.core.check_tx_rpc.password,
31 )
32 .expect("failed to open check tx core rpc");
33
34 let check_tx_service =
35 CheckTxAbciApplication::new(Arc::clone(&platform), Arc::new(check_tx_core_rpc));
36
37 let grpc_server = dapi_grpc::tonic::transport::Server::builder()
38 .add_service(
39 dapi_grpc::drive::v0::drive_internal_server::DriveInternalServer::from_arc(
40 drive_internal,
41 ),
42 )
43 .add_service(
44 dapi_grpc::platform::v0::platform_server::PlatformServer::from_arc(query_service),
45 )
46 .add_service(
47 tenderdash_abci::proto::abci::abci_application_server::AbciApplicationServer::new(
48 check_tx_service,
49 ),
50 );
51
52 let grpc_server_cancel = cancel.clone();
53
54 runtime.spawn(async move {
55 tracing::info!("gRPC server is listening on {}", &config.grpc_bind_address);
56
57 grpc_server
58 .serve_with_shutdown(
59 config
60 .grpc_bind_address
61 .parse()
62 .expect("invalid grpc address"),
63 grpc_server_cancel.cancelled(),
64 )
65 .await
66 .expect("gRPC server failed");
67
68 tracing::info!("gRPC server is stopped");
69 });
70
71 let app = ConsensusAbciApplication::new(platform.as_ref());
74
75 let server = tenderdash_abci::ServerBuilder::new(app, &config.abci.consensus_bind_address)
76 .with_cancel_token(cancel.clone())
77 .with_runtime(runtime.handle().clone())
78 .build()
79 .expect("failed to build ABCI server");
80
81 while !cancel.is_cancelled() {
82 tracing::info!(
83 "ABCI app is waiting for new connection on {}",
84 config.abci.consensus_bind_address
85 );
86 match server.next_client() {
87 Err(e) => tracing::error!("ABCI connection terminated: {:?}", e),
88 Ok(_) => tracing::info!("ABCI connection closed"),
89 }
90 }
91}