11# (c) Copyright IBM Corp. 2025
22
33
4+ import contextlib
45import os
56import threading
67import time
@@ -43,7 +44,7 @@ def _resource(self) -> Generator[None, None, None]:
4344 self .kafka_config = {"bootstrap.servers" : testenv ["kafka_bootstrap_servers" ][0 ]}
4445 self .kafka_client = AdminClient (self .kafka_config )
4546
46- try :
47+ with contextlib . suppress ( KafkaException ) :
4748 _ = self .kafka_client .create_topics ( # noqa: F841
4849 [
4950 NewTopic (
@@ -68,8 +69,6 @@ def _resource(self) -> Generator[None, None, None]:
6869 ),
6970 ]
7071 )
71- except KafkaException :
72- pass
7372
7473 # Kafka producer
7574 self .producer = Producer (self .kafka_config )
@@ -83,14 +82,12 @@ def _resource(self) -> Generator[None, None, None]:
8382 clear_context ()
8483
8584 # Close connections
86- self .kafka_client .delete_topics (
87- [
88- testenv ["kafka_topic" ],
89- testenv ["kafka_topic" ] + "_1" ,
90- testenv ["kafka_topic" ] + "_2" ,
91- testenv ["kafka_topic" ] + "_3" ,
92- ]
93- )
85+ self .kafka_client .delete_topics ([
86+ testenv ["kafka_topic" ],
87+ testenv ["kafka_topic" ] + "_1" ,
88+ testenv ["kafka_topic" ] + "_2" ,
89+ testenv ["kafka_topic" ] + "_3" ,
90+ ])
9491 time .sleep (3 )
9592
9693 if "tracing" in config :
@@ -414,11 +411,9 @@ def test_filter_confluent_specific_topic(self) -> None:
414411 )
415412 assert span_to_be_filtered not in filtered_spans
416413
417- self .kafka_client .delete_topics (
418- [
419- testenv ["kafka_topic" ] + "_1" ,
420- ]
421- )
414+ self .kafka_client .delete_topics ([
415+ testenv ["kafka_topic" ] + "_1" ,
416+ ])
422417
423418 def test_filter_confluent_specific_topic_with_config_file (self ) -> None :
424419 agent .options .span_filters = parse_filter_rules_yaml (
@@ -459,12 +454,10 @@ def test_confluent_kafka_consumer_root_exit(self) -> None:
459454 consumer_config ["auto.offset.reset" ] = "earliest"
460455
461456 consumer = Consumer (consumer_config )
462- consumer .subscribe (
463- [
464- testenv ["kafka_topic" ] + "_1" ,
465- testenv ["kafka_topic" ] + "_2" ,
466- ]
467- )
457+ consumer .subscribe ([
458+ testenv ["kafka_topic" ] + "_1" ,
459+ testenv ["kafka_topic" ] + "_2" ,
460+ ])
468461
469462 consumer .consume (num_messages = 2 , timeout = 60 ) # noqa: F841
470463
@@ -515,12 +508,10 @@ def test_confluent_kafka_consumer_root_exit(self) -> None:
515508 assert producer_span_2 .s == consumer_span_2 .p
516509 assert producer_span_2 .s != consumer_span_2 .s
517510
518- self .kafka_client .delete_topics (
519- [
520- testenv ["kafka_topic" ] + "_1" ,
521- testenv ["kafka_topic" ] + "_2" ,
522- ]
523- )
511+ self .kafka_client .delete_topics ([
512+ testenv ["kafka_topic" ] + "_1" ,
513+ testenv ["kafka_topic" ] + "_2" ,
514+ ])
524515
525516 def test_confluent_kafka_poll_root_exit_with_trace_correlation (self ) -> None :
526517 agent .options .allow_exit_as_root = True
@@ -698,12 +689,10 @@ def test_confluent_kafka_downstream_suppression(self) -> None:
698689 consumer_config ["auto.offset.reset" ] = "earliest"
699690
700691 consumer = Consumer (consumer_config )
701- consumer .subscribe (
702- [
703- testenv ["kafka_topic" ] + "_1" ,
704- testenv ["kafka_topic" ] + "_2" ,
705- ]
706- )
692+ consumer .subscribe ([
693+ testenv ["kafka_topic" ] + "_1" ,
694+ testenv ["kafka_topic" ] + "_2" ,
695+ ])
707696
708697 messages = consumer .consume (num_messages = 2 , timeout = 60 ) # noqa: F841
709698
@@ -760,12 +749,10 @@ def test_confluent_kafka_downstream_suppression(self) -> None:
760749 ("x_instana_s" , format_span_id (producer_span_2 .s ).encode ("utf-8" )),
761750 ]
762751
763- self .kafka_client .delete_topics (
764- [
765- testenv ["kafka_topic" ] + "_1" ,
766- testenv ["kafka_topic" ] + "_2" ,
767- ]
768- )
752+ self .kafka_client .delete_topics ([
753+ testenv ["kafka_topic" ] + "_1" ,
754+ testenv ["kafka_topic" ] + "_2" ,
755+ ])
769756
770757 def test_save_consumer_span_into_context (self , span : "InstanaSpan" ) -> None :
771758 """Test save_consumer_span_into_context function."""
@@ -969,12 +956,10 @@ def test_confluent_kafka_poll_multithreaded_context_isolation(self) -> None:
969956 for i in range (num_threads ):
970957 topic = f"{ testenv ['kafka_topic' ]} _thread_{ i } "
971958 # Create topic
972- try :
973- self .kafka_client .create_topics (
974- [NewTopic (topic , num_partitions = 1 , replication_factor = 1 )]
975- )
976- except KafkaException :
977- pass
959+ with contextlib .suppress (KafkaException ):
960+ self .kafka_client .create_topics ([
961+ NewTopic (topic , num_partitions = 1 , replication_factor = 1 )
962+ ])
978963
979964 # Produce messages
980965 for j in range (messages_per_topic ):
@@ -1022,22 +1007,22 @@ def consume_from_topic(thread_id: int) -> None:
10221007 consumer .close ()
10231008
10241009 with lock :
1025- thread_results .append (
1026- {
1027- "thread_id" : thread_id ,
1028- "topic" : topic ,
1029- "messages_consumed" : messages_consumed ,
1030- "none_polls" : none_polls ,
1031- "success" : True ,
1032- }
1033- )
1010+ thread_results .append ({
1011+ "thread_id" : thread_id ,
1012+ "topic" : topic ,
1013+ "messages_consumed" : messages_consumed ,
1014+ "none_polls" : none_polls ,
1015+ "success" : True ,
1016+ })
10341017
10351018 except Exception as e :
10361019 with lock :
10371020 thread_errors .append (e )
1038- thread_results .append (
1039- {"thread_id" : thread_id , "success" : False , "error" : str (e )}
1040- )
1021+ thread_results .append ({
1022+ "thread_id" : thread_id ,
1023+ "success" : False ,
1024+ "error" : str (e ),
1025+ })
10411026
10421027 threads = []
10431028 for i in range (num_threads ):
@@ -1052,19 +1037,19 @@ def consume_from_topic(thread_id: int) -> None:
10521037
10531038 assert len (thread_results ) == num_threads
10541039 for result in thread_results :
1055- assert result [
1056- "success "
1057- ], f"Thread { result [ 'thread_id' ] } failed: { result . get ( 'error' ) } "
1058- assert (
1059- result [" messages_consumed" ] == messages_per_topic
1060- ), f"Thread { result [ 'thread_id' ] } consumed { result [ 'messages_consumed' ] } messages, expected { messages_per_topic } "
1040+ assert result ["success" ], (
1041+ f"Thread { result [ 'thread_id' ] } failed: { result . get ( 'error' ) } "
1042+ )
1043+ assert result [ "messages_consumed" ] == messages_per_topic , (
1044+ f"Thread { result ['thread_id' ] } consumed { result [ ' messages_consumed' ] } messages, expected { messages_per_topic } "
1045+ )
10611046
10621047 spans = self .recorder .queued_spans ()
10631048
10641049 expected_min_spans = num_threads * (1 + messages_per_topic * 2 )
1065- assert (
1066- len (spans ) >= expected_min_spans
1067- ), f"Expected at least { expected_min_spans } spans, got { len ( spans ) } "
1050+ assert len ( spans ) >= expected_min_spans , (
1051+ f"Expected at least { expected_min_spans } spans, got { len (spans )} "
1052+ )
10681053
10691054 for i in range (num_threads ):
10701055 topic = f"{ testenv ['kafka_topic' ]} _thread_{ i } "
@@ -1077,9 +1062,9 @@ def consume_from_topic(thread_id: int) -> None:
10771062 and s .data .get ("kafka" , {}).get ("service" ) == topic
10781063 ]
10791064
1080- assert (
1081- len (poll_spans ) >= 1
1082- ), f"Expected poll spans for topic { topic } , got { len ( poll_spans ) } "
1065+ assert len ( poll_spans ) >= 1 , (
1066+ f"Expected poll spans for topic { topic } , got { len (poll_spans )} "
1067+ )
10831068
10841069 topics_to_delete = [
10851070 f"{ testenv ['kafka_topic' ]} _thread_{ i } " for i in range (num_threads )
@@ -1132,21 +1117,21 @@ def poll_empty_topic(thread_id: int) -> None:
11321117 for thread in threads :
11331118 thread .join (timeout = 10 )
11341119
1135- assert (
1136- len ( thread_errors ) == 0
1137- ), f"Context errors in threads: { [ str ( e ) for e in thread_errors ] } "
1120+ assert len ( thread_errors ) == 0 , (
1121+ f"Context errors in threads: { [ str ( e ) for e in thread_errors ] } "
1122+ )
11381123
11391124 spans = self .recorder .queued_spans ()
11401125
11411126 test_spans = [s for s in spans if s .n == "sdk" ]
1142- assert (
1143- len (test_spans ) == num_threads
1144- ), f"Expected { num_threads } test spans, got { len ( test_spans ) } "
1127+ assert len ( test_spans ) == num_threads , (
1128+ f"Expected { num_threads } test spans, got { len (test_spans )} "
1129+ )
11451130
11461131 kafka_spans = [s for s in spans if s .n == "kafka" ]
1147- assert (
1148- len (kafka_spans ) == 0
1149- ), f"Expected no kafka spans for None polls, got { len ( kafka_spans ) } "
1132+ assert len ( kafka_spans ) == 0 , (
1133+ f"Expected no kafka spans for None polls, got { len (kafka_spans )} "
1134+ )
11501135
11511136 def test_filter_confluent_kafka_by_category (self ) -> None :
11521137 os .environ ["INSTANA_TRACING_FILTER_EXCLUDE_CATEGORY_ATTRIBUTES" ] = (
0 commit comments