topic initializer

class KafkaTopicInitializer(app, connection, id: Optional[str] = None, config: Optional[dict] = None)[source]

Bases: Configurable

KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.

KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.

Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()

__init__(app, connection, id: Optional[str] = None, config: Optional[dict] = None)[source]

Initializes the parameters passed to the class.

Parameters

appApplication

Name of the Application.

connectionConnection

Information needed to create a connection.

id: typing.Optional[str] = None :

config: dict = NoneJSON

configuration file containing important information.

topic initializer methods

KafkaTopicInitializer._get_bootstrap_servers(app, connection)[source]
KafkaTopicInitializer.include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]

Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.

Parameters

  • :

topic_config, default= None

Topic config file.

kafka_component : , default= None

pipeline, default= None

Name of the Pipeline.

config_file, default= None

Configuration file.

KafkaTopicInitializer.include_topics_from_file(topics_file: str)[source]

Includes topics from a topic file.

Parameters

topics_file:strstr

Name of a topic file we wanted to include.

KafkaTopicInitializer.include_topics_from_config(config_object)[source]

Includes topics using a config

Parameters

config_objectJSON

config object containing information about what topics we want to include.

KafkaTopicInitializer.fetch_existing_topics()[source]

???

KafkaTopicInitializer.check_and_initialize()[source]

Initializes new topics and logs a warning.

KafkaTopicInitializer.initialize_topics()[source]

Initializes topics ??