Skip to content

Commit 598854d

Browse files
committed
[Discovery] accumulated improvements to ZenDiscovery
Merging the accumulated work from the feautre/improve_zen branch. Here are the highlights of the changes: __Testing infra__ - Networking: - all symmetric partitioning - dropping packets - hard disconnects - Jepsen Tests - Single node service disruptions: - Long GC / Halt - Slow cluster state updates - Discovery settings - Easy to setup unicast with partial host list __Zen Discovery__ - Pinging after master loss (no local elects) - Fixes the split brain issue: elastic#2488 - Batching join requests - More resilient joining process (wait on a publish from master) Closes elastic#7493
2 parents 889db1c + 34f4ca7 commit 598854d

File tree

63 files changed

+3891
-803
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+3891
-803
lines changed

pom.xml

Lines changed: 239 additions & 229 deletions
Large diffs are not rendered by default.

src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
137137
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
138138
}
139139

140+
@Override
141+
public void onNoLongerMaster(String source) {
142+
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
143+
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
144+
}
145+
140146
@Override
141147
public void onFailure(String source, Throwable t) {
142148
//if the reroute fails we only log

src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request
173173

174174
@Override
175175
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
176-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
176+
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
177177
}
178178

179179
@Override
180180
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
181-
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
181+
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
182182
}
183183

184184
static class ShardRecoveryRequest extends BroadcastShardOperationRequest {

src/main/java/org/elasticsearch/action/bench/BenchmarkService.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
6666
/**
6767
* Constructs a service component for running benchmarks
6868
*
69-
* @param settings Settings
70-
* @param clusterService Cluster service
71-
* @param threadPool Thread pool
72-
* @param client Client
73-
* @param transportService Transport service
69+
* @param settings Settings
70+
* @param clusterService Cluster service
71+
* @param threadPool Thread pool
72+
* @param client Client
73+
* @param transportService Transport service
7474
*/
7575
@Inject
7676
public BenchmarkService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
@@ -86,19 +86,22 @@ public BenchmarkService(Settings settings, ClusterService clusterService, Thread
8686
}
8787

8888
@Override
89-
protected void doStart() throws ElasticsearchException { }
89+
protected void doStart() throws ElasticsearchException {
90+
}
9091

9192
@Override
92-
protected void doStop() throws ElasticsearchException { }
93+
protected void doStop() throws ElasticsearchException {
94+
}
9395

9496
@Override
95-
protected void doClose() throws ElasticsearchException { }
97+
protected void doClose() throws ElasticsearchException {
98+
}
9699

97100
/**
98101
* Lists actively running benchmarks on the cluster
99102
*
100-
* @param request Status request
101-
* @param listener Response listener
103+
* @param request Status request
104+
* @param listener Response listener
102105
*/
103106
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {
104107

@@ -171,8 +174,8 @@ public void onFailure(Throwable t) {
171174
/**
172175
* Executes benchmarks on the cluster
173176
*
174-
* @param request Benchmark request
175-
* @param listener Response listener
177+
* @param request Benchmark request
178+
* @param listener Response listener
176179
*/
177180
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {
178181

@@ -228,7 +231,7 @@ public void onFailure(Throwable t) {
228231
listener.onFailure(t);
229232
}
230233
}, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) &&
231-
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
234+
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
232235
}
233236

234237
private final boolean isBenchmarkNode(DiscoveryNode node) {
@@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) {
403406
}
404407

405408
public abstract T newInstance();
409+
406410
protected abstract void sendResponse();
407411

408412
@Override
@@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) {
593597

594598
if (bmd != null) {
595599
for (BenchmarkMetaData.Entry entry : bmd.entries()) {
596-
if (request.benchmarkName().equals(entry.benchmarkId())){
600+
if (request.benchmarkName().equals(entry.benchmarkId())) {
597601
if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) {
598602
throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]");
599603
}
@@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList
648652
@Override
649653
protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
650654
BenchmarkMetaData.State state = entry.state();
651-
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
655+
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
652656
if (success) {
653657
return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS);
654658
} else {
@@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask {
661665
private final String[] patterns;
662666

663667
public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) {
664-
super("abort_benchmark", null , listener);
668+
super("abort_benchmark", null, listener);
665669
this.patterns = patterns;
666670
}
667671

@@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
675679
}
676680
}
677681

678-
public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
682+
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {
679683

680684
private final String reason;
681685
protected final String benchmarkId;
@@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) {
702706
ImmutableList.Builder<BenchmarkMetaData.Entry> builder = new ImmutableList.Builder<BenchmarkMetaData.Entry>();
703707
for (BenchmarkMetaData.Entry e : bmd.entries()) {
704708
if (benchmarkId == null || match(e)) {
705-
e = process(e) ;
709+
e = process(e);
706710
instances.add(e);
707711
}
708712
// Don't keep finished benchmarks around in cluster state
@@ -741,7 +745,7 @@ public String reason() {
741745
}
742746
}
743747

744-
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
748+
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
745749
protected final R request;
746750

747751
public BenchmarkStateChangeAction(R request) {

src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
2929
* all the nodes have acknowledged a cluster state update request
3030
*/
31-
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
31+
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {
3232

3333
private final ActionListener<Response> listener;
3434
private final AckedRequest request;
@@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo
4040

4141
/**
4242
* Called to determine which nodes the acknowledgement is expected from
43+
*
4344
* @param discoveryNode a node
4445
* @return true if the node is expected to send ack back, false otherwise
4546
*/
@@ -50,6 +51,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
5051
/**
5152
* Called once all the nodes have acknowledged the cluster state update request. Must be
5253
* very lightweight execution, since it gets executed on the cluster service thread.
54+
*
5355
* @param t optional error that might have been thrown
5456
*/
5557
public void onAllNodesAcked(@Nullable Throwable t) {

src/main/java/org/elasticsearch/cluster/ClusterService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,5 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
110110
* Returns the tasks that are pending.
111111
*/
112112
List<PendingClusterTask> pendingTasks();
113+
113114
}

src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type
115115
}
116116

117117

118+
public static final long UNKNOWN_VERSION = -1;
119+
118120
private final long version;
119121

120122
private final RoutingTable routingTable;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster;
21+
22+
/**
23+
* This is a marker interface to indicate that the task should be executed
24+
* even if the current node is not a master.
25+
*/
26+
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
27+
28+
@Override
29+
public boolean runOnlyOnMaster() {
30+
return false;
31+
}
32+
}

src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,37 @@
1919

2020
package org.elasticsearch.cluster;
2121

22+
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
2225
/**
2326
* A task that can update the cluster state.
2427
*/
25-
public interface ClusterStateUpdateTask {
28+
abstract public class ClusterStateUpdateTask {
2629

2730
/**
2831
* Update the cluster state based on the current state. Return the *same instance* if no state
2932
* should be changed.
3033
*/
31-
ClusterState execute(ClusterState currentState) throws Exception;
34+
abstract public ClusterState execute(ClusterState currentState) throws Exception;
3235

3336
/**
3437
* A callback called when execute fails.
3538
*/
36-
void onFailure(String source, Throwable t);
39+
abstract public void onFailure(String source, @Nullable Throwable t);
40+
41+
42+
/**
43+
* indicates whether this task should only run if current node is master
44+
*/
45+
public boolean runOnlyOnMaster() {
46+
return true;
47+
}
48+
49+
/**
50+
* called when the task was rejected because the local node is no longer master
51+
*/
52+
public void onNoLongerMaster(String source) {
53+
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
54+
}
3755
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster;
20+
21+
/**
22+
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
23+
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
24+
*/
25+
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {
26+
27+
@Override
28+
public boolean runOnlyOnMaster() {
29+
return false;
30+
}
31+
}

src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
2424
* the cluster state update has been processed.
2525
*/
26-
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
26+
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
2727

2828
/**
2929
* Called when the result of the {@link #execute(ClusterState)} have been processed
3030
* properly by all listeners.
3131
*/
32-
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
32+
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
3333
}

src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
2626
* a timeout.
2727
*/
28-
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
28+
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
2929

3030
/**
3131
* If the cluster state update task wasn't processed by the provided timeout, call
3232
* {@link #onFailure(String, Throwable)}
3333
*/
34-
TimeValue timeout();
34+
abstract public TimeValue timeout();
3535
}

src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ public boolean hasGlobalBlock(ClusterBlock block) {
108108
return global.contains(block);
109109
}
110110

111+
public boolean hasGlobalBlock(int blockId) {
112+
for (ClusterBlock clusterBlock : global) {
113+
if (clusterBlock.id() == blockId) {
114+
return true;
115+
}
116+
}
117+
return false;
118+
}
119+
120+
public boolean hasGlobalBlock(ClusterBlockLevel level) {
121+
return global(level).size() > 0;
122+
}
123+
111124
/**
112125
* Is there a global block with the provided status?
113126
*/

src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,15 @@ public ClusterState execute(ClusterState currentState) {
149149
return ClusterState.builder(currentState).routingResult(routingResult).build();
150150
}
151151

152+
@Override
153+
public void onNoLongerMaster(String source) {
154+
// no biggie
155+
}
156+
152157
@Override
153158
public void onFailure(String source, Throwable t) {
154-
ClusterState state = clusterService.state();
155-
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
159+
ClusterState state = clusterService.state();
160+
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
156161
}
157162
});
158163
routingTableDirty = false;

0 commit comments

Comments
 (0)