drive_abci/
server.rs

1//! This module implements Drive ABCI server.
2//!
3
4use crate::abci::app::CheckTxAbciApplication;
5use crate::abci::app::ConsensusAbciApplication;
6use crate::config::PlatformConfig;
7use crate::platform_types::platform::Platform;
8use crate::query::QueryService;
9use crate::rpc::core::DefaultCoreRPC;
10use std::sync::Arc;
11use tokio::runtime::Runtime;
12use tokio_util::sync::CancellationToken;
13
14/// Starts gRPC and ABCI servers to serve Query, CheckTx and Consensus applications
15///
16/// Should only return when server is stopped
17pub fn start(
18    runtime: &Runtime,
19    platform: Arc<Platform<DefaultCoreRPC>>,
20    config: PlatformConfig,
21    cancel: CancellationToken,
22) {
23    let query_service = Arc::new(QueryService::new(Arc::clone(&platform)));
24
25    let drive_internal = Arc::clone(&query_service);
26
27    let check_tx_core_rpc = DefaultCoreRPC::open(
28        &config.core.check_tx_rpc.url(),
29        config.core.check_tx_rpc.username,
30        config.core.check_tx_rpc.password,
31    )
32    .expect("failed to open check tx core rpc");
33
34    let check_tx_service =
35        CheckTxAbciApplication::new(Arc::clone(&platform), Arc::new(check_tx_core_rpc));
36
37    let grpc_server = dapi_grpc::tonic::transport::Server::builder()
38        .add_service(
39            dapi_grpc::drive::v0::drive_internal_server::DriveInternalServer::from_arc(
40                drive_internal,
41            ),
42        )
43        .add_service(
44            dapi_grpc::platform::v0::platform_server::PlatformServer::from_arc(query_service),
45        )
46        .add_service(
47            tenderdash_abci::proto::abci::abci_application_server::AbciApplicationServer::new(
48                check_tx_service,
49            ),
50        );
51
52    let grpc_server_cancel = cancel.clone();
53
54    runtime.spawn(async move {
55        tracing::info!("gRPC server is listening on {}", &config.grpc_bind_address);
56
57        grpc_server
58            .serve_with_shutdown(
59                config
60                    .grpc_bind_address
61                    .parse()
62                    .expect("invalid grpc address"),
63                grpc_server_cancel.cancelled(),
64            )
65            .await
66            .expect("gRPC server failed");
67
68        tracing::info!("gRPC server is stopped");
69    });
70
71    // Start blocking ABCI socket-server that process consensus requests sequentially
72
73    let app = ConsensusAbciApplication::new(platform.as_ref());
74
75    let server = tenderdash_abci::ServerBuilder::new(app, &config.abci.consensus_bind_address)
76        .with_cancel_token(cancel.clone())
77        .with_runtime(runtime.handle().clone())
78        .build()
79        .expect("failed to build ABCI server");
80
81    while !cancel.is_cancelled() {
82        tracing::info!(
83            "ABCI app is waiting for new connection on {}",
84            config.abci.consensus_bind_address
85        );
86        match server.next_client() {
87            Err(e) => tracing::error!("ABCI connection terminated: {:?}", e),
88            Ok(_) => tracing::info!("ABCI connection closed"),
89        }
90    }
91}