Skip to content

Commit 8eb26b6

Browse files
committed
Be explicit with tuples for %s formatting
Fix #1633
1 parent 7bd6b5d commit 8eb26b6

26 files changed

+45
-45
lines changed

kafka/admin/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def __init__(self, **configs):
166166
log.debug("Starting Kafka administration interface")
167167
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
168168
if extra_configs:
169-
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
169+
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
170170

171171
self.config = copy.copy(self.DEFAULT_CONFIG)
172172
self.config.update(configs)

kafka/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
174174

175175
return decoder_fn(future.value)
176176

177-
raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
177+
raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,))
178178

179179
def _payloads_by_broker(self, payloads):
180180
payloads_by_broker = collections.defaultdict(list)

kafka/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ def _maybe_connect(self, node_id):
355355
conn = self._conns.get(node_id)
356356

357357
if conn is None:
358-
assert broker, 'Broker id %s not in current metadata' % node_id
358+
assert broker, 'Broker id %s not in current metadata' % (node_id,)
359359

360360
log.debug("Initiating connection to node %s at %s:%s",
361361
node_id, broker.host, broker.port)

kafka/consumer/fetcher.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
298298
remaining_ms = timeout_ms - elapsed_ms
299299

300300
raise Errors.KafkaTimeoutError(
301-
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
301+
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
302302

303303
def fetched_records(self, max_records=None):
304304
"""Returns previously fetched records and updates consumed offsets.
@@ -911,7 +911,7 @@ def record(self, partition, num_bytes, num_records):
911911
class FetchManagerMetrics(object):
912912
def __init__(self, metrics, prefix):
913913
self.metrics = metrics
914-
self.group_name = '%s-fetch-manager-metrics' % prefix
914+
self.group_name = '%s-fetch-manager-metrics' % (prefix,)
915915

916916
self.bytes_fetched = metrics.sensor('bytes-fetched')
917917
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
@@ -955,15 +955,15 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
955955
bytes_fetched = self.metrics.sensor(name)
956956
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
957957
self.group_name,
958-
'The average number of bytes fetched per request for topic %s' % topic,
958+
'The average number of bytes fetched per request for topic %s' % (topic,),
959959
metric_tags), Avg())
960960
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
961961
self.group_name,
962-
'The maximum number of bytes fetched per request for topic %s' % topic,
962+
'The maximum number of bytes fetched per request for topic %s' % (topic,),
963963
metric_tags), Max())
964964
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
965965
self.group_name,
966-
'The average number of bytes consumed per second for topic %s' % topic,
966+
'The average number of bytes consumed per second for topic %s' % (topic,),
967967
metric_tags), Rate())
968968
bytes_fetched.record(num_bytes)
969969

@@ -976,10 +976,10 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
976976
records_fetched = self.metrics.sensor(name)
977977
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
978978
self.group_name,
979-
'The average number of records in each request for topic %s' % topic,
979+
'The average number of records in each request for topic %s' % (topic,),
980980
metric_tags), Avg())
981981
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
982982
self.group_name,
983-
'The average number of records consumed per second for topic %s' % topic,
983+
'The average number of records consumed per second for topic %s' % (topic,),
984984
metric_tags), Rate())
985985
records_fetched.record(num_records)

kafka/consumer/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def __init__(self, *topics, **configs):
309309
# Only check for extra config keys in top-level class
310310
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
311311
if extra_configs:
312-
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
312+
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
313313

314314
self.config = copy.copy(self.DEFAULT_CONFIG)
315315
self.config.update(configs)

kafka/consumer/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def seek(self, offset, whence=None, partition=None):
247247
self.offsets[resp.partition] = \
248248
resp.offsets[0] + deltas[resp.partition]
249249
else:
250-
raise ValueError('Unexpected value for `whence`, %d' % whence)
250+
raise ValueError('Unexpected value for `whence`, %d' % (whence,))
251251

252252
# Reset queue and fetch offsets since they are invalid
253253
self.fetch_offsets = self.offsets.copy()

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def assign_from_subscribed(self, assignments):
247247

248248
for tp in assignments:
249249
if tp.topic not in self.subscription:
250-
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
250+
raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))
251251

252252
# after rebalancing, we always reinitialize the assignment state
253253
self.assignment.clear()

kafka/coordinator/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def _on_join_complete(self, generation, member_id, protocol,
216216
self._assignment_snapshot = None
217217

218218
assignor = self._lookup_assignor(protocol)
219-
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
219+
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)
220220

221221
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
222222

@@ -297,7 +297,7 @@ def time_to_next_poll(self):
297297

298298
def _perform_assignment(self, leader_id, assignment_strategy, members):
299299
assignor = self._lookup_assignor(assignment_strategy)
300-
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
300+
assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
301301
member_metadata = {}
302302
all_subscribed_topics = set()
303303
for member_id, metadata_bytes in members:
@@ -804,7 +804,7 @@ def _maybe_auto_commit_offsets_async(self):
804804
class ConsumerCoordinatorMetrics(object):
805805
def __init__(self, metrics, metric_group_prefix, subscription):
806806
self.metrics = metrics
807-
self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
807+
self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)
808808

809809
self.commit_latency = metrics.sensor('commit-latency')
810810
self.commit_latency.add(metrics.metric_name(

kafka/metrics/metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def register_metric(self, metric):
225225
with self._lock:
226226
if metric.metric_name in self.metrics:
227227
raise ValueError('A metric named "%s" already exists, cannot'
228-
' register another one.' % metric.metric_name)
228+
' register another one.' % (metric.metric_name,))
229229
self.metrics[metric.metric_name] = metric
230230
for reporter in self._reporters:
231231
reporter.metric_change(metric)

kafka/metrics/stats/percentiles.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
2727
' to be 0.0.')
2828
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
2929
else:
30-
ValueError('Unknown bucket type: %s' % bucketing)
30+
ValueError('Unknown bucket type: %s' % (bucketing,))
3131

3232
def stats(self):
3333
measurables = []

kafka/metrics/stats/rate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def convert(self, time_ms):
101101
elif self._unit == TimeUnit.DAYS:
102102
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
103103
else:
104-
raise ValueError('Unknown unit: %s' % self._unit)
104+
raise ValueError('Unknown unit: %s' % (self._unit,))
105105

106106

107107
class SampledTotal(AbstractSampledStat):

kafka/metrics/stats/sensor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def _check_forest(self, sensors):
3535
"""Validate that this sensor doesn't end up referencing itself."""
3636
if self in sensors:
3737
raise ValueError('Circular dependency in sensors: %s is its own'
38-
'parent.' % self.name)
38+
'parent.' % (self.name,))
3939
sensors.add(self)
4040
for parent in self._parents:
4141
parent._check_forest(sensors)

kafka/producer/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ def __init__(self, client,
316316
if codec is None:
317317
codec = CODEC_NONE
318318
elif codec not in ALL_CODECS:
319-
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
319+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
320320

321321
self.codec = codec
322322
self.codec_compresslevel = codec_compresslevel
@@ -419,7 +419,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
419419
raise AsyncProducerQueueFull(
420420
msg[idx:],
421421
'Producer async queue overfilled. '
422-
'Current queue size %d.' % self.queue.qsize())
422+
'Current queue size %d.' % (self.queue.qsize(),))
423423
resp = []
424424
else:
425425
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)

kafka/producer/future.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def _produce_success(self, offset_and_timestamp):
5959
def get(self, timeout=None):
6060
if not self.is_done and not self._produce_future.wait(timeout):
6161
raise Errors.KafkaTimeoutError(
62-
"Timeout after waiting for %s secs." % timeout)
62+
"Timeout after waiting for %s secs." % (timeout,))
6363
assert self.is_done
6464
if self.failed():
6565
raise self.exception # pylint: disable-msg=raising-bad-type

kafka/producer/kafka.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,11 @@ def __init__(self, **configs):
340340
self.config[key] = configs.pop(key)
341341

342342
# Only check for extra config keys in top-level class
343-
assert not configs, 'Unrecognized configs: %s' % configs
343+
assert not configs, 'Unrecognized configs: %s' % (configs,)
344344

345345
if self.config['client_id'] is None:
346346
self.config['client_id'] = 'kafka-python-producer-%s' % \
347-
PRODUCER_CLIENT_ID_SEQUENCE.increment()
347+
(PRODUCER_CLIENT_ID_SEQUENCE.increment(),)
348348

349349
if self.config['acks'] == 'all':
350350
self.config['acks'] = -1
@@ -633,12 +633,12 @@ def _ensure_valid_record_size(self, size):
633633
raise Errors.MessageSizeTooLargeError(
634634
"The message is %d bytes when serialized which is larger than"
635635
" the maximum request size you have configured with the"
636-
" max_request_size configuration" % size)
636+
" max_request_size configuration" % (size,))
637637
if size > self.config['buffer_memory']:
638638
raise Errors.MessageSizeTooLargeError(
639639
"The message is %d bytes when serialized which is larger than"
640640
" the total memory buffer you have configured with the"
641-
" buffer_memory configuration." % size)
641+
" buffer_memory configuration." % (size,))
642642

643643
def _wait_on_metadata(self, topic, max_wait):
644644
"""
@@ -679,7 +679,7 @@ def _wait_on_metadata(self, topic, max_wait):
679679
elapsed = time.time() - begin
680680
if not metadata_event.is_set():
681681
raise Errors.KafkaTimeoutError(
682-
"Failed to update metadata after %.1f secs." % max_wait)
682+
"Failed to update metadata after %.1f secs." % (max_wait,))
683683
elif topic in self._metadata.unauthorized_topics:
684684
raise Errors.TopicAuthorizationFailedError(topic)
685685
else:

kafka/producer/keyed.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,4 @@ def send(self, topic, key, msg):
4646
return self.send_messages(topic, key, msg)
4747

4848
def __repr__(self):
49-
return '<KeyedProducer batch=%s>' % self.async_send
49+
return '<KeyedProducer batch=%s>' % (self.async_send,)

kafka/producer/record_accumulator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
102102

103103
error = None
104104
if not self.in_retry() and is_full and timeout < since_append:
105-
error = "%d seconds have passed since last append" % since_append
105+
error = "%d seconds have passed since last append" % (since_append,)
106106
elif not self.in_retry() and timeout < since_ready:
107-
error = "%d seconds have passed since batch creation plus linger time" % since_ready
107+
error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
108108
elif self.in_retry() and timeout < since_backoff:
109-
error = "%d seconds have passed since last attempt plus backoff time" % since_backoff
109+
error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)
110110

111111
if error:
112112
self.records.close()

kafka/producer/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
5151
)
5252

5353
def __repr__(self):
54-
return '<SimpleProducer batch=%s>' % self.async_send
54+
return '<SimpleProducer batch=%s>' % (self.async_send,)

kafka/protocol/legacy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None)
471471
elif codec == CODEC_SNAPPY:
472472
return [create_snappy_message(messages, key)]
473473
else:
474-
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
474+
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))

kafka/protocol/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _encode_self(self, recalc_crc=True):
7777
elif version == 0:
7878
fields = (self.crc, self.magic, self.attributes, self.key, self.value)
7979
else:
80-
raise ValueError('Unrecognized message version: %s' % version)
80+
raise ValueError('Unrecognized message version: %s' % (version,))
8181
message = Message.SCHEMAS[version].encode(fields)
8282
if not recalc_crc:
8383
return message
@@ -143,7 +143,7 @@ def __hash__(self):
143143

144144
class PartialMessage(bytes):
145145
def __repr__(self):
146-
return 'PartialMessage(%s)' % self
146+
return 'PartialMessage(%s)' % (self,)
147147

148148

149149
class MessageSet(AbstractType):

kafka/protocol/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def _process_response(self, read_buffer):
136136
raise Errors.CorrelationIdError(
137137
'No in-flight-request found for server response'
138138
' with correlation ID %d'
139-
% recv_correlation_id)
139+
% (recv_correlation_id,))
140140

141141
(correlation_id, request) = self.in_flight_requests.popleft()
142142

kafka/record/legacy_records.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def __iter__(self):
254254
# There should only ever be a single layer of compression
255255
assert not attrs & self.CODEC_MASK, (
256256
'MessageSet at offset %d appears double-compressed. This '
257-
'should not happen -- check your producers!' % offset)
257+
'should not happen -- check your producers!' % (offset,))
258258

259259
# When magic value is greater than 0, the timestamp
260260
# of a compressed message depends on the

test/fixtures.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ def kafka_run_class_args(cls, *args):
102102
def kafka_run_class_env(self):
103103
env = os.environ.copy()
104104
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
105-
self.test_resource("log4j.properties")
105+
(self.test_resource("log4j.properties"),)
106106
return env
107107

108108
@classmethod
109109
def render_template(cls, source_file, target_file, binding):
110110
log.info('Rendering %s from template %s', target_file.strpath, source_file)
111111
with open(source_file, "r") as handle:
112112
template = handle.read()
113-
assert len(template) > 0, 'Empty template %s' % source_file
113+
assert len(template) > 0, 'Empty template %s' % (source_file,)
114114
with open(target_file.strpath, "w") as handle:
115115
handle.write(template.format(**binding))
116116
handle.flush()
@@ -257,7 +257,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
257257
# TODO: checking for port connection would be better than scanning logs
258258
# until then, we need the pattern to work across all supported broker versions
259259
# The logging format changed slightly in 1.0.0
260-
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
260+
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)
261261

262262
self.zookeeper = zookeeper
263263
self.zk_chroot = zk_chroot
@@ -291,7 +291,7 @@ def _create_zk_chroot(self):
291291
"%s:%d" % (self.zookeeper.host,
292292
self.zookeeper.port),
293293
"create",
294-
"/%s" % self.zk_chroot,
294+
"/%s" % (self.zk_chroot,),
295295
"kafka-python")
296296
env = self.kafka_run_class_env()
297297
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

test/test_metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ def test_reporter(metrics):
469469

470470
for key in list(expected.keys()):
471471
metrics = expected.pop(key)
472-
expected['foo.%s' % key] = metrics
472+
expected['foo.%s' % (key,)] = metrics
473473
assert expected == foo_reporter.snapshot()
474474

475475

test/test_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def test_end_to_end(kafka_broker, compression):
6565
except StopIteration:
6666
break
6767

68-
assert msgs == set(['msg %d' % i for i in range(messages)])
68+
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
6969
consumer.close()
7070

7171

test/testutil.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def construct_lambda(s):
3232
op_str = s[0:2] # >= <=
3333
v_str = s[2:]
3434
else:
35-
raise ValueError('Unrecognized kafka version / operator: %s' % s)
35+
raise ValueError('Unrecognized kafka version / operator: %s' % (s,))
3636

3737
op_map = {
3838
'=': operator.eq,

0 commit comments

Comments
 (0)