From 151a67cb2a413d2f839f52a1b11113a07fc64bee Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 6 Nov 2024 14:17:06 -0500 Subject: [PATCH 01/12] Unit test ReservedStateUpdateTaskExecutor only running the last task --- .../ReservedClusterStateServiceTests.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index efe3566064170..93d5b08ae0ff2 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -279,6 +279,37 @@ public void success(Runnable onPublicationSuccess) { verify(rerouteService, times(1)).reroute(anyString(), any(), any()); } + public void testLastUpdateIsApplied() throws Exception { + ClusterState state0 = ClusterState.builder(new ClusterName("test")).version(1000).build(); + ClusterState state1 = ClusterState.builder(new ClusterName("test")).version(1001).build(); + ClusterState state2 = ClusterState.builder(new ClusterName("test")).version(1002).build(); + ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( + "test", + null, + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + Map.of(), + Set.of(), + errorState -> fail("Unexpected error"), + ActionListener.noop() + ); + ReservedStateUpdateTask task1 = spy(realTask); + doReturn(state1).when(task1).execute(any()); + ReservedStateUpdateTask task2 = spy(realTask); + doReturn(state2).when(task2).execute(any()); + RerouteService rerouteService = mock(RerouteService.class); + ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService); + ClusterState newState = taskExecutor.execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, + List.of(new TestTaskContext<>(task1), new TestTaskContext<>(task2)), + () -> null) + ); + + assertThat("State should be the final state", newState, sameInstance(state2)); + // Only process the final task; the intermediate ones can be skipped + verify(task1, times(0)).execute(any()); + verify(task2, times(1)).execute(any()); + } + public void testUpdateErrorState() { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterState.builder(new ClusterName("test")).build(); @@ -400,7 +431,7 @@ public TransformState transform(Object source, TransformState prevState) throws var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, BuildVersion.current())); var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name()); - // We submit a task with two handler, one will cause an exception, the other will create a new state. + // We submit a task with two handlers, one will cause an exception, the other will create a new state. // When we fail to update the metadata because of version, we ensure that the returned state is equal to the // original state by pointer reference to avoid cluster state update task to run. ReservedStateUpdateTask task = new ReservedStateUpdateTask( From b508682e408caf7feee45744c9b12a034cae6eee Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 6 Nov 2024 14:17:32 -0500 Subject: [PATCH 02/12] Fix ReservedStateUpdateTaskExecutor to run only the last task --- .../ReservedStateUpdateTaskExecutor.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index eb81b94ba53ed..cba8da1b1b922 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -14,15 +14,14 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.SimpleBatchedExecutor; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; -import org.elasticsearch.core.Tuple; /** * Reserved cluster state update task executor */ -public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor { +public class ReservedStateUpdateTaskExecutor implements ClusterStateTaskExecutor { private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class); @@ -34,17 +33,27 @@ public ReservedStateUpdateTaskExecutor(RerouteService rerouteService) { } @Override - public Tuple executeTask(ReservedStateUpdateTask task, ClusterState clusterState) { - return Tuple.tuple(task.execute(clusterState), null); + public final ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception { + var initState = batchExecutionContext.initialState(); + var taskContexts = batchExecutionContext.taskContexts(); + if (taskContexts.isEmpty()) { + return initState; + } + // Only the last update is relevant; the others can be skipped + var taskContext = taskContexts.get(taskContexts.size() - 1); + ClusterState clusterState = initState; + try (var ignored = taskContext.captureResponseHeaders()) { + var task = taskContext.getTask(); + clusterState = task.execute(clusterState); + taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); + } catch (Exception e) { + taskContext.onFailure(e); + } + return clusterState; } @Override - public void taskSucceeded(ReservedStateUpdateTask task, Void unused) { - task.listener().onResponse(ActionResponse.Empty.INSTANCE); - } - - @Override - public void clusterStatePublished() { + public final void clusterStatePublished(ClusterState newClusterState) { rerouteService.reroute( "reroute after saving and reserving part of the cluster state", Priority.NORMAL, @@ -54,4 +63,5 @@ public void clusterStatePublished() { ) ); } + } From 8a3c21ba0138a179927cbda5b208b8eb55b93d4e Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 6 Nov 2024 14:25:58 -0500 Subject: [PATCH 03/12] Spotless --- .../service/ReservedClusterStateServiceTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 93d5b08ae0ff2..de3061619f661 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -299,9 +299,11 @@ public void testLastUpdateIsApplied() throws Exception { RerouteService rerouteService = mock(RerouteService.class); ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService); ClusterState newState = taskExecutor.execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, + new ClusterStateTaskExecutor.BatchExecutionContext<>( + state0, List.of(new TestTaskContext<>(task1), new TestTaskContext<>(task2)), - () -> null) + () -> null + ) ); assertThat("State should be the final state", newState, sameInstance(state2)); From fdbcbc13a54852c6626a9a420068b5a929277df7 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Mon, 11 Nov 2024 12:45:11 -0500 Subject: [PATCH 04/12] Use penultimate update if the last one failed --- .../ReservedStateUpdateTaskExecutor.java | 31 +++-- .../ReservedClusterStateServiceTests.java | 106 ++++++++++++++---- 2 files changed, 108 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index cba8da1b1b922..5cd086ca3fc95 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -39,17 +39,28 @@ public final ClusterState execute(BatchExecutionContext if (taskContexts.isEmpty()) { return initState; } - // Only the last update is relevant; the others can be skipped - var taskContext = taskContexts.get(taskContexts.size() - 1); - ClusterState clusterState = initState; - try (var ignored = taskContext.captureResponseHeaders()) { - var task = taskContext.getTask(); - clusterState = task.execute(clusterState); - taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); - } catch (Exception e) { - taskContext.onFailure(e); + + // Only the last update is relevant; the others can be skipped. + // However, if that last update task fails, we should fall back to the preceding one. + for (var iterator = taskContexts.listIterator(taskContexts.size()); iterator.hasPrevious();) { + var taskContext = iterator.previous(); + ClusterState clusterState = initState; + try (var ignored = taskContext.captureResponseHeaders()) { + var task = taskContext.getTask(); + clusterState = task.execute(clusterState); + taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); + logger.debug("Update task succeeded"); + return clusterState; + } catch (Exception e) { + taskContext.onFailure(e); + if (iterator.hasPrevious()) { + logger.warn("Update task failed; will try the previous update task"); + } + } } - return clusterState; + + logger.warn("All {} update tasks failed; returning initial state", taskContexts.size()); + return initState; } @Override diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index de3061619f661..65b372eec0416 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.reservedstate.service; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateAckListener; @@ -39,6 +40,7 @@ import org.mockito.ArgumentMatchers; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; @@ -63,7 +65,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -280,11 +284,9 @@ public void success(Runnable onPublicationSuccess) { } public void testLastUpdateIsApplied() throws Exception { - ClusterState state0 = ClusterState.builder(new ClusterName("test")).version(1000).build(); - ClusterState state1 = ClusterState.builder(new ClusterName("test")).version(1001).build(); - ClusterState state2 = ClusterState.builder(new ClusterName("test")).version(1002).build(); + ClusterName clusterName = new ClusterName("test"); ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( - "test", + clusterName.value(), null, ReservedStateVersionCheck.HIGHER_VERSION_ONLY, Map.of(), @@ -292,24 +294,90 @@ public void testLastUpdateIsApplied() throws Exception { errorState -> fail("Unexpected error"), ActionListener.noop() ); - ReservedStateUpdateTask task1 = spy(realTask); - doReturn(state1).when(task1).execute(any()); - ReservedStateUpdateTask task2 = spy(realTask); - doReturn(state2).when(task2).execute(any()); - RerouteService rerouteService = mock(RerouteService.class); - ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService); - ClusterState newState = taskExecutor.execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>( - state0, - List.of(new TestTaskContext<>(task1), new TestTaskContext<>(task2)), - () -> null - ) + var updates = mockUpdateSequence(2, clusterName, realTask); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) ); - assertThat("State should be the final state", newState, sameInstance(state2)); + assertThat("State should be the final state", newState, sameInstance(updates.states().get(updates.states().size() - 1))); + // Only process the final task; the intermediate ones can be skipped - verify(task1, times(0)).execute(any()); - verify(task2, times(1)).execute(any()); + verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(1), times(1)).execute(any()); + } + + public void testLastSuccessfulUpdateIsApplied() throws Exception { + ClusterName clusterName = new ClusterName("test"); + ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( + clusterName.value(), + null, + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + Map.of(), + Set.of(), + errorState -> fail("Unexpected error"), + ActionListener.noop() + ) { + @Override + ActionListener listener() { + var superListener = super.listener(); + return new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + superListener.onResponse(empty); + } + + @Override + public void onFailure(Exception e) { + superListener.onFailure(e); + } + }; + } + }; + + var updates = mockUpdateSequence(3, clusterName, realTask); + + // Inject an error in the last update + reset(updates.tasks().get(2)); + doThrow(UnsupportedOperationException.class).when(updates.tasks().get(2)).execute(any()); + + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + ); + + assertThat("State should be the last successful state", newState, sameInstance(updates.states().get(1))); + + // Only process the final task; the intermediate ones can be skipped + verify(updates.tasks().get(2), times(1)).execute(any()); // Tried the last one, it failed + verify(updates.tasks().get(1), times(1)).execute(any()); // Tried the second-last one, it succeeded + verify(updates.tasks().get(0), times(0)).execute(any()); // Didn't bother trying the first one + } + + /** + * @param tasks Mockito spies configured to return a specific state + * @param states the corresponding states returned by {@link #tasks} + */ + private record MockUpdateSequence(List tasks, List states) { + public List> taskContexts() { + return tasks.stream().map(TestTaskContext::new).toList(); + } + } + + /** + * @return a sequence of updates that bump the version starting from 1001. + */ + private MockUpdateSequence mockUpdateSequence(int quantity, ClusterName clusterName, ReservedStateUpdateTask realTask) { + List tasks = new ArrayList<>(quantity); + List states = new ArrayList<>(quantity); + for (int i = 0; i < quantity; i++) { + ClusterState state = ClusterState.builder(clusterName).version(1001 + i).build(); + ReservedStateUpdateTask task = spy(realTask); + doReturn(state).when(task).execute(any()); + tasks.add(task); + states.add(state); + } + return new MockUpdateSequence(tasks, states); } public void testUpdateErrorState() { From 71b3f29b6cc6a70051f2fcb285ae5b8230a81410 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 27 Nov 2024 15:46:48 -0500 Subject: [PATCH 05/12] Fix test method name to match what it does --- .../reservedstate/service/FileSettingsServiceIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 85f0e2cf7e3ff..382370143f17f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -357,7 +357,7 @@ private void assertClusterStateNotSaved(CountDownLatch savedClusterState, Atomic updateClusterSettings(Settings.builder().put("search.allow_expensive_queries", "false")); } - public void testErrorSaved() throws Exception { + public void testErrorNotSaved() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start data node / non master node"); String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); From f249c76e4d30c4b4372fb423c7a926102e74d9ed Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 27 Nov 2024 17:06:41 -0500 Subject: [PATCH 06/12] IT for testLastSettingsInBatchApplied --- .../service/FileSettingsServiceIT.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 382370143f17f..9df8f1b434b06 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -22,11 +22,13 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.io.IOException; @@ -34,6 +36,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -381,6 +384,40 @@ public void testErrorNotSaved() throws Exception { assertClusterStateNotSaved(savedClusterState.v1(), savedClusterState.v2()); } + public void testLastSettingsInBatchApplied() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + var savedClusterState = setupClusterStateListener(masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertTrue(masterFileSettingsService.watching()); + assertFalse(dataFileSettingsService.watching()); + + final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var barrier = new CyclicBarrier(2); + masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> { + safeAwait(barrier); + safeAwait(barrier); + batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); + return batchExecutionContext.initialState(); + }).submitTask("block", ESTestCase::fail, null); + + safeAwait(barrier); + writeJSONFile(masterNode, testJSON, versionCounter, logger); // Valid but skipped + writeJSONFile(masterNode, testJSON43mb, versionCounter, logger); // The last valid setting + safeAwait(barrier); + assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); + } + public void testErrorCanRecoverOnRestart() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start data node / non master node"); From dc19d66821d07c1ee53f7a4fa6ad09d330b2aed7 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Fri, 29 Nov 2024 14:50:45 -0500 Subject: [PATCH 07/12] Consider version numbers in upsdate batches --- .../service/ReservedStateUpdateTask.java | 11 ++ .../ReservedStateUpdateTaskExecutor.java | 44 ++++- .../ReservedClusterStateServiceTests.java | 165 +++++++++++------- 3 files changed, 147 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 90ae9923910d1..5d1145d158388 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -70,6 +70,10 @@ public ReservedStateUpdateTask( this.listener = listener; } + public boolean supersedes(ReservedStateUpdateTask prev) { + return versionCheck.test(prev.stateChunk.metadata().version(), this.stateChunk.metadata().version()); + } + @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -213,4 +217,11 @@ static boolean checkMetadataVersion( return false; } + @Override + public String toString() { + return "ReservedStateUpdateTask{" + + "namespace='" + namespace + '\'' + + ", metadata=" + stateChunk.metadata() + + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index 5cd086ca3fc95..92fa15fc35177 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -18,6 +18,8 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; +import java.util.ArrayList; + /** * Reserved cluster state update task executor */ @@ -40,21 +42,34 @@ public final ClusterState execute(BatchExecutionContext return initState; } - // Only the last update is relevant; the others can be skipped. - // However, if that last update task fails, we should fall back to the preceding one. - for (var iterator = taskContexts.listIterator(taskContexts.size()); iterator.hasPrevious();) { - var taskContext = iterator.previous(); + // In a given batch of update tasks, only one will actually take effect, + // and we want to execute only that task, because if we execute all the tasks + // one after another, that will require all the tasks to be capable of executing + // correctly even without prior state updates being applied. + // + // The correct task to run would be whichever one would take effect if we were to + // run the tasks one-per-batch. In effect, this is the task with the highest version number; + // if multiple tasks have the same version number, their ReservedStateVersionCheck fields + // will be used to break the tie. + // + // One wrinkle is: if the tasks fails, then we will know retroactively that it was + // not the task that actually took effect, and we must eliminate that one and try again. + + var candidates = new ArrayList<>(taskContexts); + while (candidates.isEmpty() == false) { + TaskContext taskContext = removeEffectiveTaskContext(candidates); + logger.info("Effective task: {}", taskContext.getTask()); ClusterState clusterState = initState; try (var ignored = taskContext.captureResponseHeaders()) { var task = taskContext.getTask(); clusterState = task.execute(clusterState); taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); - logger.debug("Update task succeeded"); + logger.debug("-> Update task succeeded"); return clusterState; } catch (Exception e) { taskContext.onFailure(e); - if (iterator.hasPrevious()) { - logger.warn("Update task failed; will try the previous update task"); + if (candidates.isEmpty() == false) { + logger.warn("-> Update task failed; will try the previous update task"); } } } @@ -63,6 +78,21 @@ public final ClusterState execute(BatchExecutionContext return initState; } + /** + * Removes and returns the {@link TaskContext} corresponding to the task that would take effect + * if the tasks were executed one after the other. + */ + private TaskContext removeEffectiveTaskContext(ArrayList> candidates) { + assert candidates.isEmpty() == false; + int winner = 0; + for (int candidate = 1; candidate < candidates.size(); candidate++) { + if (candidates.get(candidate).getTask().supersedes(candidates.get(winner).getTask())) { + winner = candidate; + } + } + return candidates.remove(winner); + } + @Override public final void clusterStatePublished(ClusterState newClusterState) { rerouteService.reroute( diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 65b372eec0416..1008554be764c 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.reservedstate.service; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateAckListener; @@ -50,6 +49,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higher; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higherOrSame; +import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION; +import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_VERSION_ONLY; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; @@ -175,7 +178,7 @@ public void testOperatorController() throws IOException { controller.process( "operator", parser, - randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + randomFrom(HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), x::set ); @@ -210,7 +213,7 @@ public void testOperatorController() throws IOException { controller.process( "operator", parser, - randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + randomFrom(HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), Assert::assertNull ); } @@ -254,7 +257,7 @@ public void testUpdateStateTasks() throws Exception { new ReservedStateUpdateTask( "test", null, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), Set.of(), errorState -> {}, @@ -283,18 +286,9 @@ public void success(Runnable onPublicationSuccess) { verify(rerouteService, times(1)).reroute(anyString(), any(), any()); } - public void testLastUpdateIsApplied() throws Exception { + public void testBatchLastUpdateIsApplied() throws Exception { ClusterName clusterName = new ClusterName("test"); - ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( - clusterName.value(), - null, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, - Map.of(), - Set.of(), - errorState -> fail("Unexpected error"), - ActionListener.noop() - ); - var updates = mockUpdateSequence(2, clusterName, realTask); + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1002))); ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) @@ -307,35 +301,10 @@ public void testLastUpdateIsApplied() throws Exception { verify(updates.tasks().get(1), times(1)).execute(any()); } - public void testLastSuccessfulUpdateIsApplied() throws Exception { + public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { ClusterName clusterName = new ClusterName("test"); - ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( - clusterName.value(), - null, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, - Map.of(), - Set.of(), - errorState -> fail("Unexpected error"), - ActionListener.noop() - ) { - @Override - ActionListener listener() { - var superListener = super.listener(); - return new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty empty) { - superListener.onResponse(empty); - } - - @Override - public void onFailure(Exception e) { - superListener.onFailure(e); - } - }; - } - }; - var updates = mockUpdateSequence(3, clusterName, realTask); + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1002), higher(1003))); // Inject an error in the last update reset(updates.tasks().get(2)); @@ -354,24 +323,88 @@ public void onFailure(Exception e) { verify(updates.tasks().get(0), times(0)).execute(any()); // Didn't bother trying the first one } + public void testBatchHigherVersionEarlierWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1003), higher(1002))); // Out of order + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + ); + + assertThat("State should be the highest-versioned state", newState, sameInstance(updates.states().get(1))); + + verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(1), times(1)).execute(any()); + verify(updates.tasks().get(2), times(0)).execute(any()); + } + + public void testBatchEqualVersionEarlierWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higher(1003), higher(1002))); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + ); + + assertThat("State should be the first instance of the highest-versioned state", newState, sameInstance(updates.states().get(0))); + + // Only process the highest-version task; the others can be skipped + verify(updates.tasks().get(0), times(1)).execute(any()); + verify(updates.tasks().get(1), times(0)).execute(any()); + verify(updates.tasks().get(2), times(0)).execute(any()); + } + + public void testBatchEqualVersionLaterWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higherOrSame(1003), higher(1002))); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + ); + + assertThat("State should be the last instance of the highest-versioned state", newState, sameInstance(updates.states().get(1))); + + // Only process the highest-version task; the others can be skipped + verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(1), times(1)).execute(any()); + verify(updates.tasks().get(2), times(0)).execute(any()); + } + + record MockUpdateSpec(long version, ReservedStateVersionCheck check){ + public static MockUpdateSpec higher(long version) { + return new MockUpdateSpec(version, HIGHER_VERSION_ONLY); + } + + public static MockUpdateSpec higherOrSame(long version) { + return new MockUpdateSpec(version, HIGHER_OR_SAME_VERSION); + } + } + /** * @param tasks Mockito spies configured to return a specific state * @param states the corresponding states returned by {@link #tasks} */ - private record MockUpdateSequence(List tasks, List states) { + record MockUpdateSequence(List tasks, List states) { public List> taskContexts() { return tasks.stream().map(TestTaskContext::new).toList(); } } - /** - * @return a sequence of updates that bump the version starting from 1001. - */ - private MockUpdateSequence mockUpdateSequence(int quantity, ClusterName clusterName, ReservedStateUpdateTask realTask) { - List tasks = new ArrayList<>(quantity); - List states = new ArrayList<>(quantity); - for (int i = 0; i < quantity; i++) { - ClusterState state = ClusterState.builder(clusterName).version(1001 + i).build(); + private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List specs) { + List tasks = new ArrayList<>(specs.size()); + List states = new ArrayList<>(specs.size()); + for (var spec : specs) { + var stateChunk = new ReservedStateChunk(Map.of(), new ReservedStateVersion(spec.version(), BuildVersion.current())); + ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( + clusterName.value(), + stateChunk, + spec.check(), + Map.of(), + Set.of(), + errorState -> fail("Unexpected error"), + ActionListener.noop() + ); + ClusterState state = ClusterState.builder(clusterName).version(spec.version()).build(); ReservedStateUpdateTask task = spy(realTask); doReturn(state).when(task).execute(any()); tasks.add(task); @@ -398,7 +431,7 @@ public void testUpdateErrorState() { ErrorState error = new ErrorState( "namespace", 2L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("error"), ReservedStateErrorMetadata.ErrorKind.TRANSIENT ); @@ -425,7 +458,7 @@ public void testUpdateErrorState() { ErrorState oldError = new ErrorState( "namespace", 1L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("old error"), ReservedStateErrorMetadata.ErrorKind.TRANSIENT ); @@ -443,7 +476,7 @@ public void testErrorStateTask() throws Exception { new ErrorState( "test", 1L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("some parse error", "some io error"), ReservedStateErrorMetadata.ErrorKind.PARSING ), @@ -491,11 +524,11 @@ public TransformState transform(Object source, TransformState prevState) throws Metadata metadata = Metadata.builder().put(operatorMetadata).build(); ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); - assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); - assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, HIGHER_VERSION_ONLY)); + assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L, HIGHER_VERSION_ONLY)); assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION)); - assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); - assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L, HIGHER_VERSION_ONLY)); + assertTrue(ReservedStateErrorTask.isNewError(null, 1L, HIGHER_VERSION_ONLY)); assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION)); var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, BuildVersion.current())); @@ -507,7 +540,7 @@ public TransformState transform(Object source, TransformState prevState) throws ReservedStateUpdateTask task = new ReservedStateUpdateTask( "namespace_one", chunk, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), orderedHandlers, errorState -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, errorState.version(), errorState.versionCheck())), @@ -559,7 +592,7 @@ public void testCheckMetadataVersion() { ReservedStateUpdateTask task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -569,7 +602,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -580,7 +613,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(123L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -601,7 +634,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(122L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -622,7 +655,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersionTests.increment(BuildVersion.current()))), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -726,11 +759,11 @@ public void testCheckAndReportError() { final var controller = spy(new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of())); - assertNull(controller.checkAndReportError("test", List.of(), null, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertNull(controller.checkAndReportError("test", List.of(), null, HIGHER_VERSION_ONLY)); verify(controller, times(0)).updateErrorState(any()); var version = new ReservedStateVersion(2L, BuildVersion.current()); - var error = controller.checkAndReportError("test", List.of("test error"), version, ReservedStateVersionCheck.HIGHER_VERSION_ONLY); + var error = controller.checkAndReportError("test", List.of("test error"), version, HIGHER_VERSION_ONLY); assertThat(error, instanceOf(IllegalStateException.class)); assertThat(error.getMessage(), is("Error processing state change request for test, errors: test error")); verify(controller, times(1)).updateErrorState(any()); From a85a1e1c63b9719106512c8a65f0061a52777c35 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Mon, 2 Dec 2024 16:17:46 -0500 Subject: [PATCH 08/12] Track and assert on task completion --- .../ReservedStateUpdateTaskExecutor.java | 2 + .../ReservedClusterStateServiceTests.java | 66 ++++++++++++++----- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index 92fa15fc35177..3a580aca5b878 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -65,6 +65,8 @@ public final ClusterState execute(BatchExecutionContext clusterState = task.execute(clusterState); taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); logger.debug("-> Update task succeeded"); + // All the others "succeeded" and then were conceptually superseded by the effective task + candidates.forEach(c -> c.success(() -> c.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE))); return clusterState; } catch (Exception e) { taskContext.onFailure(e); diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 1008554be764c..89749d2c901ee 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -51,6 +51,9 @@ import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higher; import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higherOrSame; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.FAILED; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.INCOMPLETE; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.SUCCEEDED; import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION; import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_VERSION_ONLY; import static org.hamcrest.Matchers.anyOf; @@ -84,13 +87,20 @@ private static MasterServiceTaskQueue mo return (MasterServiceTaskQueue) mock(MasterServiceTaskQueue.class); } - private static class TestTaskContext implements ClusterStateTaskExecutor.TaskContext { + enum TaskState { INCOMPLETE, SUCCEEDED, FAILED }; + + static class TestTaskContext implements ClusterStateTaskExecutor.TaskContext { private final T task; + private final AtomicReference state = new AtomicReference<>(INCOMPLETE); private TestTaskContext(T task) { this.task = task; } + public boolean isIncomplete() { + return state.get() == INCOMPLETE; + } + @Override public T getTask() { return task; @@ -98,25 +108,40 @@ public T getTask() { @Override public void success(Runnable onPublicationSuccess) { + assert state.get() == INCOMPLETE; onPublicationSuccess.run(); + setCompletedState(SUCCEEDED); } @Override - public void success(Consumer publishedStateConsumer) {} + public void success(Consumer publishedStateConsumer) { + setCompletedState(SUCCEEDED); + } @Override - public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) { + setCompletedState(SUCCEEDED); + } @Override - public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) { + setCompletedState(SUCCEEDED); + } @Override - public void onFailure(Exception failure) {} + public void onFailure(Exception failure) { + setCompletedState(FAILED); + } @Override public Releasable captureResponseHeaders() { return null; } + + private void setCompletedState(TaskState newState) { + var prev = state.getAndSet(newState); + assert prev == INCOMPLETE; + } } private static class TestStateHandler implements ReservedClusterStateHandler> { @@ -291,7 +316,7 @@ public void testBatchLastUpdateIsApplied() throws Exception { var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1002))); ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) ); assertThat("State should be the final state", newState, sameInstance(updates.states().get(updates.states().size() - 1))); @@ -299,6 +324,8 @@ public void testBatchLastUpdateIsApplied() throws Exception { // Only process the final task; the intermediate ones can be skipped verify(updates.tasks().get(0), times(0)).execute(any()); verify(updates.tasks().get(1), times(1)).execute(any()); + + assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { @@ -312,7 +339,7 @@ public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) ); assertThat("State should be the last successful state", newState, sameInstance(updates.states().get(1))); @@ -321,6 +348,8 @@ public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { verify(updates.tasks().get(2), times(1)).execute(any()); // Tried the last one, it failed verify(updates.tasks().get(1), times(1)).execute(any()); // Tried the second-last one, it succeeded verify(updates.tasks().get(0), times(0)).execute(any()); // Didn't bother trying the first one + + assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchHigherVersionEarlierWins() throws Exception { @@ -328,7 +357,7 @@ public void testBatchHigherVersionEarlierWins() throws Exception { var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1003), higher(1002))); // Out of order ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) ); assertThat("State should be the highest-versioned state", newState, sameInstance(updates.states().get(1))); @@ -336,6 +365,8 @@ public void testBatchHigherVersionEarlierWins() throws Exception { verify(updates.tasks().get(0), times(0)).execute(any()); verify(updates.tasks().get(1), times(1)).execute(any()); verify(updates.tasks().get(2), times(0)).execute(any()); + + assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchEqualVersionEarlierWins() throws Exception { @@ -343,7 +374,7 @@ public void testBatchEqualVersionEarlierWins() throws Exception { var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higher(1003), higher(1002))); ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) ); assertThat("State should be the first instance of the highest-versioned state", newState, sameInstance(updates.states().get(0))); @@ -352,15 +383,16 @@ public void testBatchEqualVersionEarlierWins() throws Exception { verify(updates.tasks().get(0), times(1)).execute(any()); verify(updates.tasks().get(1), times(0)).execute(any()); verify(updates.tasks().get(2), times(0)).execute(any()); + + assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchEqualVersionLaterWins() throws Exception { ClusterName clusterName = new ClusterName("test"); var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higherOrSame(1003), higher(1002))); ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); - ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( - new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.taskContexts(), () -> null) - ); + var batch = new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute(batch); assertThat("State should be the last instance of the highest-versioned state", newState, sameInstance(updates.states().get(1))); @@ -368,6 +400,8 @@ public void testBatchEqualVersionLaterWins() throws Exception { verify(updates.tasks().get(0), times(0)).execute(any()); verify(updates.tasks().get(1), times(1)).execute(any()); verify(updates.tasks().get(2), times(0)).execute(any()); + + assertEquals(List.of(), updates.incompleteTasks()); } record MockUpdateSpec(long version, ReservedStateVersionCheck check){ @@ -384,9 +418,9 @@ public static MockUpdateSpec higherOrSame(long version) { * @param tasks Mockito spies configured to return a specific state * @param states the corresponding states returned by {@link #tasks} */ - record MockUpdateSequence(List tasks, List states) { - public List> taskContexts() { - return tasks.stream().map(TestTaskContext::new).toList(); + record MockUpdateSequence(List> contexts, List tasks, List states) { + public List> incompleteTasks() { + return contexts.stream().filter(TestTaskContext::isIncomplete).toList(); } } @@ -410,7 +444,7 @@ private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List Date: Mon, 2 Dec 2024 16:28:48 -0500 Subject: [PATCH 09/12] Assert each task's state --- .../ReservedClusterStateServiceTests.java | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 89749d2c901ee..3e9ca2aae040c 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -87,7 +87,15 @@ private static MasterServiceTaskQueue mo return (MasterServiceTaskQueue) mock(MasterServiceTaskQueue.class); } - enum TaskState { INCOMPLETE, SUCCEEDED, FAILED }; + enum TaskState { + INCOMPLETE, + FAILED, + + /** + * Also used for when a task was skipped because another task takes precedence and that one succeeded. + */ + SUCCEEDED, + } static class TestTaskContext implements ClusterStateTaskExecutor.TaskContext { private final T task; @@ -97,8 +105,8 @@ private TestTaskContext(T task) { this.task = task; } - public boolean isIncomplete() { - return state.get() == INCOMPLETE; + public TaskState getState() { + return state.get(); } @Override @@ -325,7 +333,7 @@ public void testBatchLastUpdateIsApplied() throws Exception { verify(updates.tasks().get(0), times(0)).execute(any()); verify(updates.tasks().get(1), times(1)).execute(any()); - assertEquals(List.of(), updates.incompleteTasks()); + assertTaskStates(updates, SUCCEEDED, SUCCEEDED); } public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { @@ -344,12 +352,12 @@ public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { assertThat("State should be the last successful state", newState, sameInstance(updates.states().get(1))); + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, FAILED); + // Only process the final task; the intermediate ones can be skipped verify(updates.tasks().get(2), times(1)).execute(any()); // Tried the last one, it failed verify(updates.tasks().get(1), times(1)).execute(any()); // Tried the second-last one, it succeeded verify(updates.tasks().get(0), times(0)).execute(any()); // Didn't bother trying the first one - - assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchHigherVersionEarlierWins() throws Exception { @@ -362,11 +370,11 @@ public void testBatchHigherVersionEarlierWins() throws Exception { assertThat("State should be the highest-versioned state", newState, sameInstance(updates.states().get(1))); + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + verify(updates.tasks().get(0), times(0)).execute(any()); verify(updates.tasks().get(1), times(1)).execute(any()); - verify(updates.tasks().get(2), times(0)).execute(any()); - - assertEquals(List.of(), updates.incompleteTasks()); + verify(updates.tasks().get(2), times(0)).execute(any()); // Prior task had higher version } public void testBatchEqualVersionEarlierWins() throws Exception { @@ -379,12 +387,12 @@ public void testBatchEqualVersionEarlierWins() throws Exception { assertThat("State should be the first instance of the highest-versioned state", newState, sameInstance(updates.states().get(0))); + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + // Only process the highest-version task; the others can be skipped verify(updates.tasks().get(0), times(1)).execute(any()); - verify(updates.tasks().get(1), times(0)).execute(any()); + verify(updates.tasks().get(1), times(0)).execute(any()); // Prior task already had the same version verify(updates.tasks().get(2), times(0)).execute(any()); - - assertEquals(List.of(), updates.incompleteTasks()); } public void testBatchEqualVersionLaterWins() throws Exception { @@ -396,12 +404,12 @@ public void testBatchEqualVersionLaterWins() throws Exception { assertThat("State should be the last instance of the highest-versioned state", newState, sameInstance(updates.states().get(1))); + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + // Only process the highest-version task; the others can be skipped - verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(0), times(0)).execute(any()); // Next task has same version and uses higherOrSame verify(updates.tasks().get(1), times(1)).execute(any()); verify(updates.tasks().get(2), times(0)).execute(any()); - - assertEquals(List.of(), updates.incompleteTasks()); } record MockUpdateSpec(long version, ReservedStateVersionCheck check){ @@ -419,9 +427,6 @@ public static MockUpdateSpec higherOrSame(long version) { * @param states the corresponding states returned by {@link #tasks} */ record MockUpdateSequence(List> contexts, List tasks, List states) { - public List> incompleteTasks() { - return contexts.stream().filter(TestTaskContext::isIncomplete).toList(); - } } private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List specs) { @@ -447,6 +452,10 @@ private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List Date: Mon, 2 Dec 2024 16:45:41 -0500 Subject: [PATCH 10/12] Use sorting. This will be O(n log n) for long lists, while the original way will be O(mn) where m is the number of failing tasks. None of this matters much for performance because n is expected to be small; but it does simplify the code and make it smell less imperative. --- .../service/ReservedStateUpdateTask.java | 9 +++++++ .../ReservedStateUpdateTaskExecutor.java | 27 ++++++------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 5d1145d158388..5b95a75c72a27 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,6 +71,9 @@ public ReservedStateUpdateTask( this.listener = listener; } + /** + * @return true if {@code this} would overwrite the effects of {@code prev} assuming {@code this} is executed later. + */ public boolean supersedes(ReservedStateUpdateTask prev) { return versionCheck.test(prev.stateChunk.metadata().version(), this.stateChunk.metadata().version()); } @@ -224,4 +228,9 @@ public String toString() { ", metadata=" + stateChunk.metadata() + '}'; } + + /** + * x < y if x.{@linkplain #supersedes}(y) + */ + public static final Comparator SUPERSEDING_FIRST = (x,y) -> x.supersedes(y) ? -1 : y.supersedes(x)? 1 : 0; } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index 3a580aca5b878..3cc6fd5ac10c5 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -20,6 +20,9 @@ import java.util.ArrayList; +import static java.util.Comparator.comparing; +import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.SUPERSEDING_FIRST; + /** * Reserved cluster state update task executor */ @@ -56,8 +59,9 @@ public final ClusterState execute(BatchExecutionContext // not the task that actually took effect, and we must eliminate that one and try again. var candidates = new ArrayList<>(taskContexts); - while (candidates.isEmpty() == false) { - TaskContext taskContext = removeEffectiveTaskContext(candidates); + candidates.sort(comparing(TaskContext::getTask, SUPERSEDING_FIRST)); + for (var iter = candidates.iterator(); iter.hasNext(); ) { + TaskContext taskContext = iter.next(); logger.info("Effective task: {}", taskContext.getTask()); ClusterState clusterState = initState; try (var ignored = taskContext.captureResponseHeaders()) { @@ -65,8 +69,8 @@ public final ClusterState execute(BatchExecutionContext clusterState = task.execute(clusterState); taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); logger.debug("-> Update task succeeded"); - // All the others "succeeded" and then were conceptually superseded by the effective task - candidates.forEach(c -> c.success(() -> c.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE))); + // All the others conceptually "succeeded" and then were superseded by the effective task + iter.forEachRemaining(c -> c.success(() -> c.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE))); return clusterState; } catch (Exception e) { taskContext.onFailure(e); @@ -80,21 +84,6 @@ public final ClusterState execute(BatchExecutionContext return initState; } - /** - * Removes and returns the {@link TaskContext} corresponding to the task that would take effect - * if the tasks were executed one after the other. - */ - private TaskContext removeEffectiveTaskContext(ArrayList> candidates) { - assert candidates.isEmpty() == false; - int winner = 0; - for (int candidate = 1; candidate < candidates.size(); candidate++) { - if (candidates.get(candidate).getTask().supersedes(candidates.get(winner).getTask())) { - winner = candidate; - } - } - return candidates.remove(winner); - } - @Override public final void clusterStatePublished(ClusterState newClusterState) { rerouteService.reroute( From 70269ba7160b58c4571607e5500b0a88788ea3d9 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Mon, 2 Dec 2024 16:54:02 -0500 Subject: [PATCH 11/12] Spotless --- .../service/FileSettingsServiceIT.java | 2 +- .../service/ReservedStateUpdateTask.java | 7 ++----- .../ReservedStateUpdateTaskExecutor.java | 2 +- .../ReservedClusterStateServiceTests.java | 19 +++++++------------ 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 9df8f1b434b06..44a1b6b97bc5b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -21,8 +21,8 @@ import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 5b95a75c72a27..a5f3822ef803f 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -223,14 +223,11 @@ static boolean checkMetadataVersion( @Override public String toString() { - return "ReservedStateUpdateTask{" + - "namespace='" + namespace + '\'' + - ", metadata=" + stateChunk.metadata() + - '}'; + return "ReservedStateUpdateTask{" + "namespace='" + namespace + '\'' + ", metadata=" + stateChunk.metadata() + '}'; } /** * x < y if x.{@linkplain #supersedes}(y) */ - public static final Comparator SUPERSEDING_FIRST = (x,y) -> x.supersedes(y) ? -1 : y.supersedes(x)? 1 : 0; + public static final Comparator SUPERSEDING_FIRST = (x, y) -> x.supersedes(y) ? -1 : y.supersedes(x) ? 1 : 0; } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index 3cc6fd5ac10c5..c64d1acab2f36 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -60,7 +60,7 @@ public final ClusterState execute(BatchExecutionContext var candidates = new ArrayList<>(taskContexts); candidates.sort(comparing(TaskContext::getTask, SUPERSEDING_FIRST)); - for (var iter = candidates.iterator(); iter.hasNext(); ) { + for (var iter = candidates.iterator(); iter.hasNext();) { TaskContext taskContext = iter.next(); logger.info("Effective task: {}", taskContext.getTask()); ClusterState clusterState = initState; diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 3e9ca2aae040c..a333b36e2861e 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -287,15 +287,7 @@ public void testUpdateStateTasks() throws Exception { AtomicBoolean successCalled = new AtomicBoolean(false); ReservedStateUpdateTask task = spy( - new ReservedStateUpdateTask( - "test", - null, - HIGHER_VERSION_ONLY, - Map.of(), - Set.of(), - errorState -> {}, - ActionListener.noop() - ) + new ReservedStateUpdateTask("test", null, HIGHER_VERSION_ONLY, Map.of(), Set.of(), errorState -> {}, ActionListener.noop()) ); doReturn(state).when(task).execute(any()); @@ -412,7 +404,7 @@ public void testBatchEqualVersionLaterWins() throws Exception { verify(updates.tasks().get(2), times(0)).execute(any()); } - record MockUpdateSpec(long version, ReservedStateVersionCheck check){ + record MockUpdateSpec(long version, ReservedStateVersionCheck check) { public static MockUpdateSpec higher(long version) { return new MockUpdateSpec(version, HIGHER_VERSION_ONLY); } @@ -426,8 +418,11 @@ public static MockUpdateSpec higherOrSame(long version) { * @param tasks Mockito spies configured to return a specific state * @param states the corresponding states returned by {@link #tasks} */ - record MockUpdateSequence(List> contexts, List tasks, List states) { - } + record MockUpdateSequence( + List> contexts, + List tasks, + List states + ) {} private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List specs) { List tasks = new ArrayList<>(specs.size()); From c152d0547329b6d4b55f00265cde22ce08e5b49b Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Tue, 3 Dec 2024 09:58:23 -0500 Subject: [PATCH 12/12] Fixes and comments --- .../reservedstate/service/FileSettingsServiceIT.java | 4 ++-- .../service/ReservedStateUpdateTaskExecutor.java | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 44a1b6b97bc5b..7ac966271a995 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -412,8 +412,8 @@ public void testLastSettingsInBatchApplied() throws Exception { }).submitTask("block", ESTestCase::fail, null); safeAwait(barrier); - writeJSONFile(masterNode, testJSON, versionCounter, logger); // Valid but skipped - writeJSONFile(masterNode, testJSON43mb, versionCounter, logger); // The last valid setting + writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet()); // Valid but skipped + writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet()); // The last valid setting safeAwait(barrier); assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index c64d1acab2f36..2517b8113e3fb 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -55,8 +55,10 @@ public final ClusterState execute(BatchExecutionContext // if multiple tasks have the same version number, their ReservedStateVersionCheck fields // will be used to break the tie. // - // One wrinkle is: if the tasks fails, then we will know retroactively that it was - // not the task that actually took effect, and we must eliminate that one and try again. + // One wrinkle is: if the task fails, then we will know retroactively that it was + // not the task that actually took effect, and we must then identify which of the + // remaining tasks would have taken effect. We achieve this by sorting the tasks + // using the SUPERSEDING_FIRST comparator. var candidates = new ArrayList<>(taskContexts); candidates.sort(comparing(TaskContext::getTask, SUPERSEDING_FIRST));