Source code for bspump.kafka.topic_initializer

import json
import yaml
import logging
import re
import typing

import kafka.admin

import asab
import bspump
import bspump.abc.sink
import bspump.abc.source

# Fastkafka module is required only to support including topics from FastKafkaSource/Sink
# If it is not available, topics from FastKafkaSource/Sink objects will be ignored
try:
	import fastkafka
except ImportError:
	fastkafka = None


#

L = logging.getLogger(__name__)

#

_TOPIC_CONFIG_OPTIONS = {
	'compression.type',
	'leader.replication.throttled.replicas',
	'message.downconversion.enable',
	'min.insync.replicas',
	'segment.jitter.ms',
	'cleanup.policy',
	'flush.ms',
	'follower.replication.throttled.replicas',
	'segment.bytes',
	'retention.ms',
	'flush.messages',
	'message.format.version',
	'max.compaction.lag.ms',
	'file.delete.delay.ms',
	'max.message.bytes',
	'min.compaction.lag.ms',
	'message.timestamp.type',
	'preallocate',
	'min.cleanable.dirty.ratio',
	'index.interval.bytes',
	'unclean.leader.election.enable',
	'retention.bytes',
	'delete.retention.ms',
	'segment.ms',
	'message.timestamp.difference.max.ms',
	'segment.index.bytes'
}


def _is_kafka_component(component):
	if isinstance(component, bspump.kafka.KafkaSource) \
		   or isinstance(component, bspump.kafka.KafkaSink):
		return True
	if fastkafka is not None:
		return isinstance(component, fastkafka.FastKafkaSource) \
			   or isinstance(component, fastkafka.FastKafkaSink)
	return False


[docs]class KafkaTopicInitializer(asab.ConfigObject): """ KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don't. KafkaAdminClient requires blocking connection, which is why this class doesn't use the connection module from BSPump. Usage: topic_initializer = KafkaTopicInitializer(app, "KafkaConnection") topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics() """ ConfigDefaults = { "client_id": "bspump-topic-initializer", "topics_file": "", "num_partitions_default": 1, "replication_factor_default": 1, }
[docs] def __init__(self, app, connection, id: typing.Optional[str] = None, config: dict = None): """ Initializes the parameters passed to the class. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. connection : Connection Information needed to create a connection. id: typing.Optional[str] = None : config: dict = None : JSON configuration file containing important information. """ _id = id if id is not None else self.__class__.__name__ super().__init__(_id, config) # Keeps the topic objects that need to be checked/initialized self.RequiredTopics = {} # Caches the names of existing topics self.ExistingTopics: typing.Optional[set] = None self.BootstrapServers = None self.ClientId = self.Config.get("client_id") self._get_bootstrap_servers(app, connection) topics_file = self.Config.get("topics_file") if len(topics_file) != 0: self.include_topics_from_file(topics_file)
[docs] def _get_bootstrap_servers(self, app, connection): svc = app.get_service("bspump.PumpService") self.BootstrapServers = re.split(r"[\s,]+", svc.Connections[connection].Config["bootstrap_servers"].strip())
[docs] def include_topics(self, *, topic_config=None, kafka_component=None, pipeline=None, config_file=None): """ Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink. **Parameters** * : topic_config : , default= None Topic config file. kafka_component : , default= None pipeline : , default= None Name of the Pipeline. config_file : , default= None Configuration file. """ # Include topic from config or dict object if topic_config is not None: L.info("Including topics from dictionary") self.include_topics_from_config(topic_config) # Include topic from config file if config_file is not None: L.info("Including topics from '{}'".format(config_file)) self.include_topics_from_file(config_file) # Get topics from Kafka Source or Sink if kafka_component is not None: L.info("Including topics from {}".format(kafka_component.Id)) self.include_topics_from_config(kafka_component.Config) # Scan the pipeline for KafkaSource(s) or KafkaSink if pipeline is not None: for source in pipeline.Sources: if _is_kafka_component(source): L.info("Including topics from {}".format(source.Id)) self.include_topics_from_config(source.Config) sink = pipeline.Processors[0][-1] if _is_kafka_component(sink): L.info("Including topics from {}".format(sink.Id)) self.include_topics_from_config(sink.Config)
[docs] def include_topics_from_file(self, topics_file: str): """ Includes topics from a topic file. **Parameters** topics_file:str : str Name of a topic file we wanted to include. """ # Support yaml and json input ext = topics_file.strip().split(".")[-1].lower() if ext == "json": with open(topics_file, "r") as f: data = json.load(f) elif ext in ("yml", "yaml"): with open(topics_file, "r") as f: data = yaml.safe_load(f) else: L.warning("Unsupported extension: '{}'".format(ext)) for topic in data: if "num_partitions" not in topic: topic["num_partitions"] = int(self.Config.get("num_partitions_default")) if "replication_factor" not in topic: topic["replication_factor"] = int(self.Config.get("replication_factor_default")) self.RequiredTopics[topic["name"]] = kafka.admin.NewTopic(**topic)
[docs] def include_topics_from_config(self, config_object): """ Includes topics using a config **Parameters** config_object : JSON config object containing information about what topics we want to include. """ # Every kafka topic needs to have: name, num_partitions and replication_factor topic_names = config_object.get("topic").split(",") if "num_partitions" in config_object: num_partitions = int(config_object.pop("num_partitions")) else: num_partitions = int(self.Config.get("num_partitions_default")) if "replication_factor" in config_object: replication_factor = int(config_object.pop("replication_factor")) else: replication_factor = int(self.Config.get("replication_factor_default")) # Additional configs are optional topic_configs = {} for config_option in set(config_object.keys()): if config_option in _TOPIC_CONFIG_OPTIONS: topic_configs[config_option] = config_object.pop(config_option) # Create topic objects for name in topic_names: self.RequiredTopics[name] = kafka.admin.NewTopic( name, num_partitions, replication_factor, topic_configs=topic_configs )
[docs] def fetch_existing_topics(self): """ ??? """ admin_client = kafka.admin.KafkaAdminClient( bootstrap_servers=self.BootstrapServers, client_id=self.ClientId ) self.ExistingTopics = set(admin_client.list_topics()) admin_client.close()
[docs] def check_and_initialize(self): """ Initializes new topics and logs a warning. """ L.warning("`check_and_initialize()` is obsoleted, use `initialize_topics()` instead") self.initialize_topics()
[docs] def initialize_topics(self): """ Initializes topics ?? """ if len(self.RequiredTopics) == 0: L.info("No Kafka topics were required.") return # Fetch topics if not cached if self.ExistingTopics is None: try: self.fetch_existing_topics() except Exception as e: L.error("Failed to fetch Kafka topics: {}".format(e)) return # Filter out the topics that already exist missing_topics = [ topic for topic in self.RequiredTopics.values() if topic.name not in self.ExistingTopics ] if len(missing_topics) == 0: L.info("No missing Kafka topics to be initialized.") return admin_client = None try: admin_client = kafka.admin.KafkaAdminClient( bootstrap_servers=self.BootstrapServers, client_id=self.ClientId ) # Create topics # TODO: update configs of existing topics using `admin_client.alter_configs()` admin_client.create_topics(missing_topics) # Update existing topic cache self.ExistingTopics.update(self.RequiredTopics.keys()) L.log( asab.LOG_NOTICE, "Kafka topics created", struct_data={"topics": [topic.name for topic in missing_topics]} ) except Exception as e: L.error("Kafka topic initialization failed: {}".format(e)) finally: if admin_client is not None: admin_client.close()