Source code for bspump.kafka.batchsink

import json
import typing

import asyncio

from .sink import KafkaSink


[docs]class KafkaBatchSink(KafkaSink): """ KafkaBatchSink is a sink processor that forwards the event to an Apache Kafka specified by a KafkaConnection object in batches. It is a proof of concept sink, that allows faster processing of events in the pipeline, but does not guarantee processing of all events in situations when the pump is closed etc. There is a work to be done with cooperation with aiokafka, so the send_and_wait method works properly and is able to send events in batches. """ ConfigDefaults = { "batch_size": 10000, }
[docs] def __init__(self, app, pipeline, connection, key_serializer=None, id=None, config=None): """ Initializing parameters passed to the BatchSink class. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection Information needed to creates connection. key_serializer : ,default None id : , default = None config : JSON, default = None Configuration file with additional information. """ super().__init__(app, pipeline, connection, key_serializer, id, config) self._batch_size = int(self.Config["batch_size"]) self._throttled = False
[docs] def process(self, context, event: typing.Union[dict, str, bytes]): """ Starts the sink process. **Parameters** context : type? Additional information. event: typing.Union[dict, str, bytes] : type? """ if type(event) == dict: event = json.dumps(event) event = event.encode(self.Encoding) elif type(event) == str: event = event.encode(self.Encoding) kafka_topic = context.get("kafka_topic", self.Topic) kafka_key = context.get("kafka_key") if self._key_serializer is not None and kafka_key is not None: kafka_key = self._key_serializer(kafka_key) self._output_queue.put_nowait((kafka_topic, event, kafka_key)) if not self._throttled and self._output_queue.qsize() >= self._output_queue_max_size: self.Pipeline.throttle(self, True) self._throttled = True
[docs] async def _connection(self): producer = await self.Connection.create_producer(**self._producer_params) try: await producer.start() while True: futures = [] iterations = 0 while iterations < self._batch_size: iterations = iterations + 1 topic, message, kafka_key = await self._output_queue.get() if topic is None and message is None: continue future = await producer.send(topic, message, key=kafka_key) futures.append(future) if self._throttled and self._output_queue.qsize() < self._output_queue_max_size: self.Pipeline.throttle(self, False) self._throttled = False if futures: await asyncio.gather(*futures) finally: await producer.stop()