@@ -341,13 +341,13 @@ def _jaas_config(self):
341
341
342
342
elif self .sasl_mechanism == 'PLAIN' :
343
343
jaas_config = (
344
- 'org.apache.kafka.common.security.plain.PlainLoginModule required\n '
345
- ' username="{user}" password="{password}" user_{user}="{password}";\n '
344
+ 'org.apache.kafka.common.security.plain.PlainLoginModule required'
345
+ ' username="{user}" password="{password}" user_{user}="{password}";\n '
346
346
)
347
347
elif self .sasl_mechanism in ("SCRAM-SHA-256" , "SCRAM-SHA-512" ):
348
348
jaas_config = (
349
- 'org.apache.kafka.common.security.scram.ScramLoginModule required\n '
350
- ' username="{user}" password="{password}";\n '
349
+ 'org.apache.kafka.common.security.scram.ScramLoginModule required'
350
+ ' username="{user}" password="{password}";\n '
351
351
)
352
352
else :
353
353
raise ValueError ("SASL mechanism {} currently not supported" .format (self .sasl_mechanism ))
@@ -361,7 +361,7 @@ def _add_scram_user(self):
361
361
'--entity-name' , self .broker_user ,
362
362
'--add-config' ,
363
363
'{}=[password={}]' .format (self .sasl_mechanism , self .broker_password ),
364
- )
364
+ * self . _cli_connect_args () )
365
365
env = self .kafka_run_class_env ()
366
366
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
367
367
@@ -602,7 +602,8 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
602
602
if num_partitions is None else num_partitions ,
603
603
'--replication-factor' , self .replicas \
604
604
if replication_factor is None \
605
- else replication_factor )
605
+ else replication_factor ,
606
+ * self ._cli_connect_args ())
606
607
if env_kafka_version () >= (0 , 10 ):
607
608
args .append ('--if-not-exists' )
608
609
env = self .kafka_run_class_env ()
@@ -615,16 +616,23 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
615
616
self .out (stderr )
616
617
raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
617
618
619
+ def _cli_connect_args (self ):
620
+ if env_kafka_version () < (3 , 0 , 0 ):
621
+ return ['--zookeeper' , '%s:%s/%s' % (self .zookeeper .host , self .zookeeper .port , self .zk_chroot )]
622
+ else :
623
+ args = ['--bootstrap-server' , '%s:%s' % (self .host , self .port )]
624
+ if self .sasl_enabled :
625
+ command_conf = self .tmp_dir .join ("sasl_command.conf" )
626
+ self .render_template (self .test_resource ("sasl_command.conf" ), command_conf , vars (self ))
627
+ args .append ('--command-config' )
628
+ args .append (command_conf .strpath )
629
+ return args
630
+
618
631
def get_topic_names (self ):
619
- args = self .run_script ('kafka-topics.sh' ,
620
- '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
621
- self .zookeeper .port ,
622
- self .zk_chroot ),
623
- '--list'
624
- )
632
+ cmd = self .run_script ('kafka-topics.sh' , '--list' , * self ._cli_connect_args ())
625
633
env = self .kafka_run_class_env ()
626
634
env .pop ('KAFKA_LOG4J_OPTS' )
627
- proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
635
+ proc = subprocess .Popen (cmd , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
628
636
stdout , stderr = proc .communicate ()
629
637
if proc .returncode != 0 :
630
638
self .out ("Failed to list topics!" )
0 commit comments