Skip to content

Include error messages for stale primary #1714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions driver-core/src/main/com/mongodb/MongoStalePrimaryException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb;

/**
* Exception thrown when a replica set primary is identified as a stale primary during Server Discovery and Monitoring (SDAM).
* This occurs when a new primary is discovered, causing the previously known primary to be marked stale, typically during network
* partitions or elections.
*
* @since 5.6
*/
public class MongoStalePrimaryException extends MongoException {

/**
* Construct an instance.
*
* @param message the exception message.
*/
public MongoStalePrimaryException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoException;
import com.mongodb.MongoStalePrimaryException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterId;
Expand Down Expand Up @@ -266,7 +267,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (isStalePrimary(newDescription)) {
invalidatePotentialPrimary(newDescription);
invalidatePotentialPrimary(newDescription, new MongoStalePrimaryException("Primary marked stale due to electionId/setVersion mismatch"));
return false;
}

Expand Down Expand Up @@ -297,12 +298,13 @@ private boolean isStalePrimary(final ServerDescription description) {
}
}

private void invalidatePotentialPrimary(final ServerDescription newDescription) {
private void invalidatePotentialPrimary(final ServerDescription newDescription, final MongoStalePrimaryException cause) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();

addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting(cause);
}

/**
Expand Down Expand Up @@ -377,7 +379,7 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress()));
}
serverTuple.server.invalidate();
serverTuple.server.invalidate(new MongoStalePrimaryException("Primary marked stale due to discovery of newer primary"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.connection;

import com.mongodb.MongoException;

import java.util.List;

import static java.util.Arrays.asList;
Expand All @@ -30,13 +32,13 @@ interface ClusterableServer extends Server {
/**
* Reset server description to connecting state
*/
void resetToConnecting();
void resetToConnecting(MongoException cause);

/**
* Invalidate the description of this server. Implementation of this method should not block, but rather trigger an asynchronous
* attempt to connect with the server in order to determine its current status.
*/
void invalidate();
void invalidate(MongoException cause);

/**
* <p>Closes the server. Instances that have been closed will no longer be available for use.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ void doMaintenance() {
try {
sdamProvider.optional().ifPresent(sdam -> {
if (!silentlyComplete.test(actualException)) {
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(actualException, sdam.context(newConnection)));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(actualException, sdam.context(newConnection)));
}
});
} catch (Exception suppressed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DefaultSdamServerDescriptionManager implements SdamServerDescription
}

@Override
public void update(final ServerDescription candidateDescription) {
public void monitorUpdate(final ServerDescription candidateDescription) {
Copy link
Member Author

@vbabanin vbabanin May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original update() method performed two checks:

  1. Checks if the server is either a data-bearing connection or a direct connection. This aligns with the Connection Pool Management section of the SDAM spec.
  2. Checks if the ServerDescription contains an exception, and invalidates the connection pool accordingly. This corresponds to the Error Handling section under Server Monitoring.
    Although the spec specifies handling only network and command errors, the implementation currently checks for any Throwable.

These two behaviors conform to the spec requirements for server monitor behavior:

  1. The first check is applied after a successful server check.
  2. The second check applies when a server check fails.

The update() method was invoked from two places:

  • ServerMonitor, which is expected and directly supports the server monitor responsibilities outlined in the spec.
  • DefaultServer.resetToConnecting(), which is used in one specific case: when an RSPrimary is discovered with a stale electionId/setVersion. In that case, update() was called to transition the server to an UNKNOWN state. However, neither check (1 nor 2) was triggered, so the method merely updated the description.

This approach worked until the introduction of a new change where the RSPrimary with a stale electionId/setVersion case began passing an explicit exception, which unintentionally triggered a side effect of connection pool invalidation.

To address this and maintain separation of concerns, the logic has now been split:

monitorUpdate(): now encapsulates the server monitor-specific logic (checks 1 and 2).
update(): is simplified to only update the description to UNKNOWN (currently used for one specific case).

cluster.withLock(() -> {
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
return;
Expand All @@ -82,6 +82,17 @@ public void update(final ServerDescription candidateDescription) {
});
}

@Override
public void update(final ServerDescription candidateDescription) {
cluster.withLock(() -> {
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
return;
}

updateDescription(candidateDescription);
});
}

@Override
public void handleExceptionBeforeHandshake(final SdamIssue sdamIssue) {
cluster.withLock(() -> handleException(sdamIssue, true));
Expand Down Expand Up @@ -128,7 +139,7 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
} else if (sdamIssue.relatedToWriteConcern() || !sdamIssue.specific()) {
} else if (sdamIssue.relatedToWriteConcern() || sdamIssue.relatedToStalePrimary()) {
updateDescription(sdamIssue.serverDescription());
serverMonitor.connect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Connection getConnection(final OperationContext operationContext) {
try {
operationEnd();
if (e instanceof MongoException) {
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(e, exceptionContext));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(e, exceptionContext));
}
} catch (Exception suppressed) {
e.addSuppressed(suppressed);
Expand All @@ -118,7 +118,7 @@ public void getConnectionAsync(final OperationContext operationContext, final Si
if (t != null) {
try {
operationEnd();
sdam.handleExceptionBeforeHandshake(SdamIssue.specific(t, exceptionContext));
sdam.handleExceptionBeforeHandshake(SdamIssue.of(t, exceptionContext));
} catch (Exception suppressed) {
t.addSuppressed(suppressed);
} finally {
Expand Down Expand Up @@ -150,14 +150,14 @@ private void operationEnd() {
}

@Override
public void resetToConnecting() {
sdam.update(unknownConnectingServerDescription(serverId, null));
public void resetToConnecting(final MongoException cause) {
sdam.update(unknownConnectingServerDescription(serverId, cause));
}

@Override
public void invalidate() {
public void invalidate(final MongoException cause) {
if (!isClosed()) {
sdam.handleExceptionAfterHandshake(SdamIssue.unspecified(sdam.context()));
sdam.handleExceptionAfterHandshake(SdamIssue.of(cause, sdam.context()));
}
}

Expand Down Expand Up @@ -208,7 +208,7 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
.execute(connection);
} catch (MongoException e) {
try {
sdam.handleExceptionAfterHandshake(SdamIssue.specific(e, sdam.context(connection)));
sdam.handleExceptionAfterHandshake(SdamIssue.of(e, sdam.context(connection)));
} catch (Exception suppressed) {
e.addSuppressed(suppressed);
}
Expand All @@ -231,7 +231,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
.executeAsync(connection, errorHandlingCallback((result, t) -> {
if (t != null) {
try {
sdam.handleExceptionAfterHandshake(SdamIssue.specific(t, sdam.context(connection)));
sdam.handleExceptionAfterHandshake(SdamIssue.of(t, sdam.context(connection)));
} catch (Exception suppressed) {
t.addSuppressed(suppressed);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import static com.mongodb.internal.connection.CommandHelper.HELLO;
import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO;
import static com.mongodb.internal.connection.CommandHelper.executeCommand;
import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription;
import static com.mongodb.internal.connection.ServerDescriptionHelper.unknownConnectingServerDescription;
import static com.mongodb.internal.event.EventListenerHelper.singleServerMonitorListener;
import static java.lang.String.format;
Expand Down Expand Up @@ -190,7 +189,7 @@ public void run() {
}

logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);
sdamProvider.get().monitorUpdate(currentServerDescription);

if ((shouldStreamResponses && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
Expand Down Expand Up @@ -259,8 +258,9 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
elapsedTimeNanos, shouldStreamResponses));

return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(),
roundTripTimeSampler.getMin());
throw new Throwable("WELL WELL WELL");
// return createServerDescription(serverId.getAddress(), helloResult, roundTripTimeSampler.getAverage(),
// roundTripTimeSampler.getMin());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public LoadBalancedServer(final ServerId serverId, final ConnectionPool connecti
}

@Override
public void resetToConnecting() {
public void resetToConnecting(final MongoException cause) {
// no op
}

@Override
public void invalidate() {
public void invalidate(final MongoException cause) {
// no op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.MongoSecurityException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.MongoStalePrimaryException;
import com.mongodb.annotations.Immutable;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ServerDescription;
Expand All @@ -43,6 +44,15 @@
*/
@ThreadSafe
interface SdamServerDescriptionManager {
/**
* Receives candidate {@link ServerDescription} from the monitoring activity.
*
* @param candidateDescription A {@link ServerDescription} that may or may not be applied
* {@linkplain TopologyVersionHelper#newer(TopologyVersion, TopologyVersion) depending on}
* its {@link ServerDescription#getTopologyVersion() topology version}.
*/
void monitorUpdate(ServerDescription candidateDescription);

/**
* @param candidateDescription A {@link ServerDescription} that may or may not be applied
* {@linkplain TopologyVersionHelper#newer(TopologyVersion, TopologyVersion) depending on}
Expand Down Expand Up @@ -84,34 +94,17 @@ private SdamIssue(@Nullable final Throwable exception, final Context context) {
this.context = assertNotNull(context);
}

/**
* @see #unspecified(Context)
*/
static SdamIssue specific(final Throwable exception, final Context context) {
static SdamIssue of(final Throwable exception, final Context context) {
return new SdamIssue(assertNotNull(exception), assertNotNull(context));
}

/**
* @see #specific(Throwable, Context)
*/
static SdamIssue unspecified(final Context context) {
return new SdamIssue(null, assertNotNull(context));
}
Comment on lines -95 to -99
Copy link
Member Author

@vbabanin vbabanin May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, an unspecified()was used in a case where the primary was marked as stale due to the discovery of a newer primary. This behavior was handled in SdamDescriptionManager by updating the server’s description to UNKNOWN and initiating a reconnect:

} else if (sdamIssue.relatedToWriteConcern() || !sdamIssue.specific()) {
updateDescription(sdamIssue.serverDescription());
serverMonitor.connect();
}

This behavior aligns with the following section of the SDAM specification:

“A note on invalidating the old primary: when a new primary is discovered, the client finds the previous primary (there should be none or one) and replaces its description with a default ServerDescription of type Unknown. Additionally, the error field of the new ServerDescription object MUST include a descriptive error explaining that it was invalidated because the primary was determined to be stale. A multi-threaded client MUST request an immediate check for that server as soon as possible.”

The unspecified() method previously implied a transition to UNKNOWN without providing an error. As this is no longer consistent with the current spec - which requires an explanatory error in such transitions - the method has been removed.

Relevant spec changes:
SDAM Error Field Documentation
Discussion in Spec PR #1729


/**
* @return An exception if and only if this {@link SdamIssue} is {@linkplain #specific()}.
* @return An exception that caused this {@link SdamIssue}.
*/
Optional<Throwable> exception() {
return Optional.ofNullable(exception);
}

/**
* @return {@code true} if and only if this {@link SdamIssue} has an exception {@linkplain #specific(Throwable, Context) specified}.
*/
boolean specific() {
return exception != null;
}

ServerDescription serverDescription() {
return unknownConnectingServerDescription(context.serverId(), exception);
}
Expand All @@ -127,6 +120,10 @@ boolean relatedToStateChange() {
return exception instanceof MongoNotPrimaryException || exception instanceof MongoNodeIsRecoveringException;
}

boolean relatedToStalePrimary() {
return exception instanceof MongoStalePrimaryException;
}

/**
* Represents a subset of {@link #relatedToStateChange()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ class ServerMonitorSpecification extends OperationFunctionalSpecification {

def initializeServerMonitor(ServerAddress address) {
SdamServerDescriptionManager sdam = new SdamServerDescriptionManager() {
@Override
void monitorUpdate(final ServerDescription candidateDescription) {
assert candidateDescription != null
newDescription = candidateDescription
latch.countDown()
}

@Override
void update(final ServerDescription candidateDescription) {
assert candidateDescription != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ protected void applyApplicationError(final BsonDocument applicationError) {
switch (when) {
case "beforeHandshakeCompletes":
server.sdamServerDescriptionManager().handleExceptionBeforeHandshake(
SdamIssue.specific(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
SdamIssue.of(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
break;
case "afterHandshakeCompletes":
server.sdamServerDescriptionManager().handleExceptionAfterHandshake(
SdamIssue.specific(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
SdamIssue.of(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
break;
default:
throw new UnsupportedOperationException("Unsupported `when` value: " + when);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class DefaultServerMonitorSpecification extends Specification {
given:
def stateChanged = false
def sdam = new SdamServerDescriptionManager() {
@Override
void monitorUpdate(final ServerDescription candidateDescription) {
assert candidateDescription != null
stateChanged = true
}

@Override
void update(final ServerDescription candidateDescription) {
assert candidateDescription != null
Expand Down
Loading