Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

node: fix shutdown #1308

Merged
merged 8 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions core/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,15 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
}
};

runtime.spawn(start_aura(
let _ = runtime.block_on(start_aura(
slot_duration,
local_key,
client,
block_import,
env,
sync_oracle,
on_exit,
));

runtime.block_on(on_exit).expect("Exit future should not fail");
});
}

Expand All @@ -200,6 +199,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
block_import: Arc<I>,
env: Arc<E>,
sync_oracle: SO,
on_exit: impl Future<Item=(),Error=()>,
) -> impl Future<Item=(),Error=()> where
B: Block,
C: Authorities<B> + ChainHead<B>,
Expand Down Expand Up @@ -352,7 +352,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
})
};

future::loop_fn((), move |()| {
let work = future::loop_fn((), move |()| {
let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
authorship_task.catch_unwind().then(|res| {
match res {
Expand All @@ -369,7 +369,9 @@ pub fn start_aura<B, C, E, I, SO, Error>(

Ok(future::Loop::Continue(()))
})
})
});

work.select(on_exit).then(|_| Ok(()))
}

// a header which has been checked
Expand Down Expand Up @@ -760,6 +762,7 @@ mod tests {
client,
environ.clone(),
DummyOracle,
futures::empty(),
);

runtime.spawn(aura);
Expand Down
3 changes: 2 additions & 1 deletion core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
config: Config,
link: LinkHalf<B, E, Block, RA>,
network: N,
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
Expand Down Expand Up @@ -1312,5 +1313,5 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
}))
}).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));

Ok(voter_work)
Ok(voter_work.select(on_exit).then(|_| Ok(())))
}
3 changes: 3 additions & 0 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ fn finalize_3_voters_no_observers() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");

assert_send(&voter);
Expand Down Expand Up @@ -436,6 +437,7 @@ fn finalize_3_voters_1_observer() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");

runtime.spawn(voter);
Expand Down Expand Up @@ -592,6 +594,7 @@ fn transition_3_voters_twice_1_observer() {
},
link,
MessageRouting::new(net.clone(), peer_id),
futures::empty(),
).expect("all in order with client and network");

runtime.spawn(voter);
Expand Down
10 changes: 7 additions & 3 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct Service<Components: components::Components> {
/// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>,
_rpc: Box<::std::any::Any + Send + Sync>,
_telemetry: Option<tel::Telemetry>,
_telemetry: Option<Arc<tel::Telemetry>>,
}

/// Creates bare client without any networking.
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<Components: components::Components> Service<Components> {
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
Some(tel::init_telemetry(tel::TelemetryConfig {
Some(Arc::new(tel::init_telemetry(tel::TelemetryConfig {
url: url,
on_connect: Box::new(move || {
telemetry!("system.connected";
Expand All @@ -276,7 +276,7 @@ impl<Components: components::Components> Service<Components> {
"authority" => is_authority
);
}),
}))
})))
},
None => None,
};
Expand Down Expand Up @@ -306,6 +306,10 @@ impl<Components: components::Components> Service<Components> {
None
}
}

pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> {
self._telemetry.as_ref().map(|t| t.clone())
}
}

impl<Components> Service<Components> where Components: components::Components {
Expand Down
12 changes: 8 additions & 4 deletions core/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::iter;
use std::sync::Arc;
use std::net::Ipv4Addr;
use std::time::Duration;
use futures::Stream;
use futures::{Future, Stream};
use tempdir::TempDir;
use tokio::runtime::Runtime;
use tokio::timer::Interval;
Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
const NUM_NODES: u32 = 10;
{
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
{
let runtime = {
let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES, 0, vec![], 30400);
info!("Checking star topology");
let first_address = network.full_nodes[0].1.network().node_id().expect("No node address");
Expand All @@ -198,13 +198,17 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
network.run_until_all_full(|_index, service|
service.network().status().num_peers == NUM_NODES as usize - 1
);
}
network.runtime
};

runtime.shutdown_on_idle().wait().expect("Error shutting down runtime");

temp.close().expect("Error removing temp dir");
}
{
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
{
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30500);
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30400);
info!("Checking linked topology");
let mut address = network.full_nodes[0].1.network().node_id().expect("No node address");
for (_, service) in network.full_nodes.iter().skip(1) {
Expand Down
16 changes: 13 additions & 3 deletions node/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub mod chain_spec;
mod service;
mod params;

use tokio::prelude::Future;
use tokio::runtime::Runtime;
pub use cli::{VersionInfo, IntoExit};
use substrate_service::{ServiceFactory, Roles as ServiceRoles};
Expand Down Expand Up @@ -136,16 +137,16 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul
let mut runtime = Runtime::new()?;
let executor = runtime.executor();
match config.roles == ServiceRoles::LIGHT {
true => run_until_exit(&mut runtime, service::Factory::new_light(config, executor)?, exit)?,
false => run_until_exit(&mut runtime, service::Factory::new_full(config, executor)?, exit)?,
true => run_until_exit(runtime, service::Factory::new_light(config, executor)?, exit)?,
false => run_until_exit(runtime, service::Factory::new_full(config, executor)?, exit)?,
}
}
}
Ok(())
}

fn run_until_exit<T, C, E>(
runtime: &mut Runtime,
mut runtime: Runtime,
service: T,
e: E,
) -> error::Result<()>
Expand All @@ -161,5 +162,14 @@ fn run_until_exit<T, C, E>(

let _ = runtime.block_on(e.into_exit());
exit_send.fire();

// we eagerly drop the service so that the internal exit future is fired,
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
drop(service);

// TODO [andre]: timeout this future #1318
let _ = runtime.shutdown_on_idle().wait();

Ok(())
}
25 changes: 13 additions & 12 deletions node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.

use std::sync::Arc;
use transaction_pool::{self, txpool::{Pool as TransactionPool}};
use node_runtime::{GenesisConfig, RuntimeApi};
use std::time::Duration;

use client;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
use grandpa;
use node_executor;
use primitives::ed25519::Pair;
use node_primitives::{Block, InherentData};
use node_runtime::{GenesisConfig, RuntimeApi};
use substrate_service::{
FactoryFullConfiguration, LightComponents, FullComponents, FullBackend,
FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, TaskExecutor
};
use node_executor;
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
use primitives::ed25519::Pair;
use client;
use std::time::Duration;
use grandpa;
use transaction_pool::{self, txpool::{Pool as TransactionPool}};

construct_simple_protocol! {
/// Demo protocol attachment for substrate.
Expand Down Expand Up @@ -89,22 +90,22 @@ construct_service_factory! {
block_import.clone(),
proposer,
service.network(),
service.on_exit(),
));

info!("Running Grandpa session as Authority {}", key.public());
}

let voter = grandpa::run_grandpa(
executor.spawn(grandpa::run_grandpa(
grandpa::Config {
local_key,
gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec?
name: Some(service.config.name.clone())
},
link_half,
grandpa::NetworkBridge::new(service.network()),
)?;

executor.spawn(voter);
service.on_exit(),
)?);

Ok(service)
}
Expand Down