@@ -1203,6 +1203,10 @@ def check_version(self, timeout=2, strict=False, topics=[]):
1203
1203
stashed [key ] = self .config [key ]
1204
1204
self .config [key ] = override_config [key ]
1205
1205
1206
+ def reset_override_configs ():
1207
+ for key in stashed :
1208
+ self .config [key ] = stashed [key ]
1209
+
1206
1210
# kafka kills the connection when it doesn't recognize an API request
1207
1211
# so we can send a test request and then follow immediately with a
1208
1212
# vanilla MetadataRequest. If the server did not recognize the first
@@ -1222,6 +1226,7 @@ def check_version(self, timeout=2, strict=False, topics=[]):
1222
1226
1223
1227
for version , request in test_cases :
1224
1228
if not self .connect_blocking (timeout_at - time .time ()):
1229
+ reset_override_configs ()
1225
1230
raise Errors .NodeNotReadyError ()
1226
1231
f = self .send (request )
1227
1232
# HACK: sleeping to wait for socket to send bytes
@@ -1278,10 +1283,10 @@ def check_version(self, timeout=2, strict=False, topics=[]):
1278
1283
log .info ("Broker is not v%s -- it did not recognize %s" ,
1279
1284
version , request .__class__ .__name__ )
1280
1285
else :
1286
+ reset_override_configs ()
1281
1287
raise Errors .UnrecognizedBrokerVersion ()
1282
1288
1283
- for key in stashed :
1284
- self .config [key ] = stashed [key ]
1289
+ reset_override_configs ()
1285
1290
return version
1286
1291
1287
1292
def __str__ (self ):
0 commit comments