Skip to content

Commit b74ddac

Browse files
andresilvaMTDK1
authored andcommitted
node: fix shutdown (paritytech#1308)
* node: remove grandpa authority flags * node: exit-guard grandpa and aura spawned futures * node: wait for futures to stop running on shutdown * core: run connectivity tests on same ports * core: pass on_exit future when starting aura and grandpa * node: add issue number to todo * core: fix aura and grandpa tests
1 parent 598551d commit b74ddac

File tree

7 files changed

+54
-28
lines changed

7 files changed

+54
-28
lines changed

core/consensus/aura/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,15 @@ pub fn start_aura_thread<B, C, E, I, SO, Error>(
179179
}
180180
};
181181

182-
runtime.spawn(start_aura(
182+
let _ = runtime.block_on(start_aura(
183183
slot_duration,
184184
local_key,
185185
client,
186186
block_import,
187187
env,
188188
sync_oracle,
189+
on_exit,
189190
));
190-
191-
runtime.block_on(on_exit).expect("Exit future should not fail");
192191
});
193192
}
194193

@@ -200,6 +199,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
200199
block_import: Arc<I>,
201200
env: Arc<E>,
202201
sync_oracle: SO,
202+
on_exit: impl Future<Item=(),Error=()>,
203203
) -> impl Future<Item=(),Error=()> where
204204
B: Block,
205205
C: Authorities<B> + ChainHead<B>,
@@ -352,7 +352,7 @@ pub fn start_aura<B, C, E, I, SO, Error>(
352352
})
353353
};
354354

355-
future::loop_fn((), move |()| {
355+
let work = future::loop_fn((), move |()| {
356356
let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
357357
authorship_task.catch_unwind().then(|res| {
358358
match res {
@@ -369,7 +369,9 @@ pub fn start_aura<B, C, E, I, SO, Error>(
369369

370370
Ok(future::Loop::Continue(()))
371371
})
372-
})
372+
});
373+
374+
work.select(on_exit).then(|_| Ok(()))
373375
}
374376

375377
// a header which has been checked
@@ -760,6 +762,7 @@ mod tests {
760762
client,
761763
environ.clone(),
762764
DummyOracle,
765+
futures::empty(),
763766
);
764767

765768
runtime.spawn(aura);

core/finality-grandpa/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
11861186
config: Config,
11871187
link: LinkHalf<B, E, Block, RA>,
11881188
network: N,
1189+
on_exit: impl Future<Item=(),Error=()> + Send + 'static,
11891190
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
11901191
Block::Hash: Ord,
11911192
B: Backend<Block, Blake2Hasher> + 'static,
@@ -1312,5 +1313,5 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
13121313
}))
13131314
}).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
13141315

1315-
Ok(voter_work)
1316+
Ok(voter_work.select(on_exit).then(|_| Ok(())))
13161317
}

core/finality-grandpa/src/tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ fn finalize_3_voters_no_observers() {
376376
},
377377
link,
378378
MessageRouting::new(net.clone(), peer_id),
379+
futures::empty(),
379380
).expect("all in order with client and network");
380381

381382
assert_send(&voter);
@@ -436,6 +437,7 @@ fn finalize_3_voters_1_observer() {
436437
},
437438
link,
438439
MessageRouting::new(net.clone(), peer_id),
440+
futures::empty(),
439441
).expect("all in order with client and network");
440442

441443
runtime.spawn(voter);
@@ -592,6 +594,7 @@ fn transition_3_voters_twice_1_observer() {
592594
},
593595
link,
594596
MessageRouting::new(net.clone(), peer_id),
597+
futures::empty(),
595598
).expect("all in order with client and network");
596599

597600
runtime.spawn(voter);

core/service/src/lib.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub struct Service<Components: components::Components> {
107107
/// Configuration of this Service
108108
pub config: FactoryFullConfiguration<Components::Factory>,
109109
_rpc: Box<::std::any::Any + Send + Sync>,
110-
_telemetry: Option<tel::Telemetry>,
110+
_telemetry: Option<Arc<tel::Telemetry>>,
111111
}
112112

113113
/// Creates bare client without any networking.
@@ -263,7 +263,7 @@ impl<Components: components::Components> Service<Components> {
263263
let impl_name = config.impl_name.to_owned();
264264
let version = version.clone();
265265
let chain_name = config.chain_spec.name().to_owned();
266-
Some(tel::init_telemetry(tel::TelemetryConfig {
266+
Some(Arc::new(tel::init_telemetry(tel::TelemetryConfig {
267267
url: url,
268268
on_connect: Box::new(move || {
269269
telemetry!("system.connected";
@@ -276,7 +276,7 @@ impl<Components: components::Components> Service<Components> {
276276
"authority" => is_authority
277277
);
278278
}),
279-
}))
279+
})))
280280
},
281281
None => None,
282282
};
@@ -306,6 +306,10 @@ impl<Components: components::Components> Service<Components> {
306306
None
307307
}
308308
}
309+
310+
pub fn telemetry(&self) -> Option<Arc<tel::Telemetry>> {
311+
self._telemetry.as_ref().map(|t| t.clone())
312+
}
309313
}
310314

311315
impl<Components> Service<Components> where Components: components::Components {

core/service/test/src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::iter;
3333
use std::sync::Arc;
3434
use std::net::Ipv4Addr;
3535
use std::time::Duration;
36-
use futures::Stream;
36+
use futures::{Future, Stream};
3737
use tempdir::TempDir;
3838
use tokio::runtime::Runtime;
3939
use tokio::timer::Interval;
@@ -188,7 +188,7 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
188188
const NUM_NODES: u32 = 10;
189189
{
190190
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
191-
{
191+
let runtime = {
192192
let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES, 0, vec![], 30400);
193193
info!("Checking star topology");
194194
let first_address = network.full_nodes[0].1.network().node_id().expect("No node address");
@@ -198,13 +198,17 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
198198
network.run_until_all_full(|_index, service|
199199
service.network().status().num_peers == NUM_NODES as usize - 1
200200
);
201-
}
201+
network.runtime
202+
};
203+
204+
runtime.shutdown_on_idle().wait().expect("Error shutting down runtime");
205+
202206
temp.close().expect("Error removing temp dir");
203207
}
204208
{
205209
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
206210
{
207-
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30500);
211+
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30400);
208212
info!("Checking linked topology");
209213
let mut address = network.full_nodes[0].1.network().node_id().expect("No node address");
210214
for (_, service) in network.full_nodes.iter().skip(1) {

node/cli/src/lib.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub mod chain_spec;
5050
mod service;
5151
mod params;
5252

53+
use tokio::prelude::Future;
5354
use tokio::runtime::Runtime;
5455
pub use cli::{VersionInfo, IntoExit};
5556
use substrate_service::{ServiceFactory, Roles as ServiceRoles};
@@ -136,16 +137,16 @@ pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Resul
136137
let mut runtime = Runtime::new()?;
137138
let executor = runtime.executor();
138139
match config.roles == ServiceRoles::LIGHT {
139-
true => run_until_exit(&mut runtime, service::Factory::new_light(config, executor)?, exit)?,
140-
false => run_until_exit(&mut runtime, service::Factory::new_full(config, executor)?, exit)?,
140+
true => run_until_exit(runtime, service::Factory::new_light(config, executor)?, exit)?,
141+
false => run_until_exit(runtime, service::Factory::new_full(config, executor)?, exit)?,
141142
}
142143
}
143144
}
144145
Ok(())
145146
}
146147

147148
fn run_until_exit<T, C, E>(
148-
runtime: &mut Runtime,
149+
mut runtime: Runtime,
149150
service: T,
150151
e: E,
151152
) -> error::Result<()>
@@ -161,5 +162,14 @@ fn run_until_exit<T, C, E>(
161162

162163
let _ = runtime.block_on(e.into_exit());
163164
exit_send.fire();
165+
166+
// we eagerly drop the service so that the internal exit future is fired,
167+
// but we need to keep holding a reference to the global telemetry guard
168+
let _telemetry = service.telemetry();
169+
drop(service);
170+
171+
// TODO [andre]: timeout this future #1318
172+
let _ = runtime.shutdown_on_idle().wait();
173+
164174
Ok(())
165175
}

node/cli/src/service.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,20 @@
1919
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2020
2121
use std::sync::Arc;
22-
use transaction_pool::{self, txpool::{Pool as TransactionPool}};
23-
use node_runtime::{GenesisConfig, RuntimeApi};
22+
use std::time::Duration;
23+
24+
use client;
25+
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
26+
use grandpa;
27+
use node_executor;
28+
use primitives::ed25519::Pair;
2429
use node_primitives::{Block, InherentData};
30+
use node_runtime::{GenesisConfig, RuntimeApi};
2531
use substrate_service::{
2632
FactoryFullConfiguration, LightComponents, FullComponents, FullBackend,
2733
FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, TaskExecutor
2834
};
29-
use node_executor;
30-
use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
31-
use primitives::ed25519::Pair;
32-
use client;
33-
use std::time::Duration;
34-
use grandpa;
35+
use transaction_pool::{self, txpool::{Pool as TransactionPool}};
3536

3637
construct_simple_protocol! {
3738
/// Demo protocol attachment for substrate.
@@ -89,22 +90,22 @@ construct_service_factory! {
8990
block_import.clone(),
9091
proposer,
9192
service.network(),
93+
service.on_exit(),
9294
));
9395

9496
info!("Running Grandpa session as Authority {}", key.public());
9597
}
9698

97-
let voter = grandpa::run_grandpa(
99+
executor.spawn(grandpa::run_grandpa(
98100
grandpa::Config {
99101
local_key,
100102
gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec?
101103
name: Some(service.config.name.clone())
102104
},
103105
link_half,
104106
grandpa::NetworkBridge::new(service.network()),
105-
)?;
106-
107-
executor.spawn(voter);
107+
service.on_exit(),
108+
)?);
108109

109110
Ok(service)
110111
}

0 commit comments

Comments
 (0)