Source code for bspump.kafka.source

import asyncio
import concurrent
import concurrent.futures
import logging
import re
import aiokafka

import kafka
import kafka.errors
import time

from ..abc.source import Source

#

L = logging.getLogger(__name__)


#


[docs]class KafkaSource(Source): """ KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline. .. code:: python class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance. """ ConfigDefaults = { "topic": "", # Multiple values are allowed, separated by , character "retry": 20, "group_id": "", "client_id": "BSPump-KafkaSource", "auto_offset_reset": "earliest", "max_partition_fetch_bytes": "", "api_version": "auto", "session_timeout_ms": 10000, # Maximum time between two heartbeats that will not cause removal of the consumer from consumer group "consumer_timeout_ms": "", "request_timeout_ms": "", "get_timeout_ms": 20000, # The number of lines after which the main method enters the idle # state to allow other operations to perform their tasks "event_block_size": 1000, "event_idle_time": 0.01, # The time for which the main method enters the idle state (see above) # Specify partitions the consumer should use, use with caution when specifying more topics "user_defined_partitions": '', }
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): """ Initializes parameters. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection information needed to create a connection. id : , default = None config : , default = None """ super().__init__(app, pipeline, id=id, config=config) self.topics = re.split(r'\s*,\s*', self.Config['topic']) consumer_params = {} self._group_id = self.Config.get("group_id") if len(self._group_id) > 0: consumer_params['group_id'] = self._group_id v = self.Config.get('client_id') if v != "": consumer_params['client_id'] = v v = self.Config.get('auto_offset_reset') if v != "": consumer_params['auto_offset_reset'] = v v = self.Config.get('api_version') if v != "": consumer_params['api_version'] = v v = self.Config.get('max_partition_fetch_bytes') if v != "": consumer_params['max_partition_fetch_bytes'] = int(v) v = self.Config.get('session_timeout_ms') if v != "": consumer_params['session_timeout_ms'] = int(v) v = self.Config.get('consumer_timeout_ms') if v != "": consumer_params['consumer_timeout_ms'] = int(v) v = self.Config.get('request_timeout_ms') if v != "": consumer_params['request_timeout_ms'] = int(v) self.MaxRecords = self.Config.get('max_records') self.GetTimeoutMs = int(self.Config.get("get_timeout_ms")) self.Connection = pipeline.locate_connection(app, connection) self.App = app self.Partitions = None self.ConsumerParams = consumer_params self.Consumer = None self.Retry = int(self.Config['retry']) self.Pipeline = pipeline self.MetricsService = app.get_service('asab.MetricsService') self.ProfilerCounter = self.MetricsService.create_counter( 'bspump.kafka.profiler', tags={ 'source': self.Id, 'pipeline': self.Pipeline.Id, }, init_values={'duration': 0.0, 'run': 0}, reset=self.Pipeline.ResetProfiler, ) self.EventCounter = 0 self.EventBlockSize = int(self.Config["event_block_size"]) self.EventIdleTime = float(self.Config["event_idle_time"]) self.Offsets = {}
[docs] def create_consumer(self): """ Creates a consumer. """ if len(self.Config["user_defined_partitions"]) != 0: self.Consumer = self.Connection.create_consumer( **self.ConsumerParams ) self.Partitions = \ [aiokafka.TopicPartition(topic, int(partition)) for topic, partition in zip(self.topics, self.Config["user_defined_partitions"].split(","))] self.Consumer.assign(self.Partitions) else: self.Partitions = None self.Consumer = self.Connection.create_consumer( *self.topics, **self.ConsumerParams )
[docs] async def initialize_consumer(self): """ Creates a consumer after the loop is started. """ # Create consumer after the loop is running self.create_consumer() await self.Consumer.start() self.Partitions = self.Consumer.assignment() self.Pipeline.PubSub.subscribe("bspump.pipeline.not_ready!", self._not_ready_handler)
[docs] async def _not_ready_handler(self, message_type, *args, **kwargs): """ Calls _commit when pipeline is throttled. **Parameters** message_types : *args : **kwargs : """ # Preventive commit, when the pipeline is throttled if len(self._group_id) > 0: # TODO: Consider cases where self.MaxRecords != 1 await self._commit(self.Offsets)
[docs] async def main(self): """ Method that starts the Source. """ await self.initialize_consumer() try: while 1: await self.Pipeline.ready() t0 = time.perf_counter() data = await self.Consumer.getmany(timeout_ms=self.GetTimeoutMs, max_records=self.MaxRecords) self.ProfilerCounter.add('duration', time.perf_counter() - t0) self.ProfilerCounter.add('run', 1) if len(data) == 0: for partition in self.Partitions: await self.Consumer.seek_to_end(partition) t0 = time.perf_counter() data = await self.Consumer.getmany(timeout_ms=self.GetTimeoutMs, max_records=self.MaxRecords) self.ProfilerCounter.add('duration', time.perf_counter() - t0) self.ProfilerCounter.add('run', 1) for topic_partition, messages in data.items(): for message in messages: self.Offsets[topic_partition] = message.offset # Process message context = {"kafka": message} await self.process(message.value, context=context) # Simulate event await self._simulate_event() if topic_partition in self.Offsets: self.Offsets[topic_partition] += 1 if len(self._group_id) > 0: await self._commit() except concurrent.futures._base.CancelledError: pass finally: await self.Consumer.stop()
[docs] async def _commit(self, offsets=None): """ Description: :returns: """ for i in range(self.Retry, 0, -1): try: if offsets is not None and len(offsets) > 0: await self.Consumer.commit(offsets) else: await self.Consumer.commit() break except concurrent.futures._base.CancelledError as e: # Ctrl-C -> terminate and exit raise e except ( kafka.errors.IllegalStateError, kafka.errors.CommitFailedError, kafka.errors.UnknownMemberIdError, kafka.errors.NodeNotReadyError, kafka.errors.RebalanceInProgressError, concurrent.futures.CancelledError, ) as e: # Retry-able errors if i == 1: L.exception("Error {} during Kafka commit".format(e)) self.Pipeline.set_error(None, None, e) return else: L.exception("Error {} during Kafka commit - will retry in 5 seconds".format(e)) # TODO: Think about a more elegant way how to stop the consumer # TODO: aiokafka does not handle exceptions of its components and thus it cannot be fully stopped via stop # TODO: https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/consumer.py#L457 try: await self.Consumer._coordinator.close() except Exception as e: L.exception("Error {} during closing consumer's coordinator after Kafka commit".format(e)) try: await self.Consumer._fetcher.close() except Exception as e: L.exception("Error {} during closing consumer's fetcher after Kafka commit".format(e)) try: await self.Consumer._client.close() except Exception as e: L.exception("Error {} during closing consumer's client after Kafka commit".format(e)) await asyncio.sleep(5) await self.initialize_consumer() except Exception as e: # Hard errors L.exception("Error {} during Kafka commit".format(e)) self.Pipeline.set_error(None, None, e) return
[docs] async def _simulate_event(self): """ Description: The _simulate_event method should be called in main method after a message has been processed. It ensures that all other asynchronous events receive enough time to perform their tasks. Otherwise, the application loop is blocked by a file reader and no other activity makes a progress. """ self.EventCounter += 1 if self.EventCounter % self.EventBlockSize == 0: await asyncio.sleep(self.EventIdleTime) await self.Pipeline.ready() self.EventCounter = 0