import asyncio
import json
import logging
import typing
from ..abc.sink import Sink
#
L = logging.getLogger(__name__)
#
[docs]class KafkaSink(Sink):
"""
Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.
KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes
using encoding charset specified in the configuration.
.. code:: python
class KafkaPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
)
There are two ways to use KafkaSink:
- Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline.
- Specify topic separetly for each event in event context - context['kafka_topic'].
Topic from configuration is than used as a default topic.
To provide business logic for event distribution, you can create topic selector processor.
Processor example:
.. code:: python
class KafkaTopicSelector(bspump.Processor):
def process(self, context, event):
if event.get("weight") > 10:
context["kafka_topic"] = "heavy"
else:
context["kafka_topic"] = "light"
return event
Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key'].
If kafka_key is not provided, key defaults to None.
"""
ConfigDefaults = {
"topic": "",
"encoding": "utf-8",
"output_queue_max_size": 100,
"client_id": "", # defaults set in AIOKafka
"metadata_max_age_ms": "",
"request_timeout_ms": "",
"api_version": "",
"acks": "",
"key_serializer": "",
"value_serializer": "",
"max_batch_size": "",
"max_request_size": "",
"linger_ms": "",
"send_backoff_ms": "",
"retry_backoff_ms": "",
"connections_max_idle_ms": "",
"enable_idempotency": "",
"transactional_id": "",
"transaction_timeout_ms": "",
}
[docs] def __init__(self, app, pipeline, connection, key_serializer=None, id=None, config=None):
"""
Initilizes the parameters that are passed to the Sink class.
**Parameters**
app : Application
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_.
pipeline : Pipeline
Name of the Pipeline.
connection : Connection
information needed to create a connection.
key_serializer : , default = None
id : , default = None
config : , default = None
"""
super().__init__(app, pipeline, id=id, config=config)
self.Connection = pipeline.locate_connection(app, connection)
self.Topic = self.Config['topic']
self._key_serializer = key_serializer
self.Encoding = self.Config['encoding']
self._output_queue = asyncio.Queue(loop=app.Loop)
self._output_queue_max_size = int(self.Config['output_queue_max_size'])
assert (self._output_queue_max_size >= 1)
self._conn_future = None
producer_param_definition = {
"client_id": str,
"metadata_max_age_ms": int,
"request_timeout_ms": int,
"api_version": str,
"max_batch_size": int,
"max_request_size": int,
"linger_ms": int,
"send_backoff_ms": int,
"retry_backoff_ms": int,
"connections_max_idle_ms": int,
"enable_idempotence": bool,
"transactional_id": str,
"transaction_timeout_ms": int,
}
self._producer_params = {
x: producer_param_definition[x](y)
for x, y in self.Config.items() if x in producer_param_definition.keys() and y != ""
}
if self.Config.get("acks") is not None: # Mixed int with strings, needs special care
if self.Config["acks"] in (0, 1, -1, "0", "1", "-1"):
self._producer_params["acks"] = int(self.Config["acks"])
if self.Config["acks"] == "all":
self._producer_params["acks"] = self.Config["acks"]
# Subscription
self._on_health_check('connection.open!')
app.PubSub.subscribe("Application.stop!", self._on_application_stop)
app.PubSub.subscribe("Application.tick!", self._on_health_check)
[docs] def _on_health_check(self, message_type):
"""
Description:
:returns:
"""
if self._conn_future is not None:
# Connection future exists
if not self._conn_future.done():
# Connection future didn't result yet
# No sanitization needed
return
try:
self._conn_future.result()
except Exception:
# Connection future threw an error
L.exception("Unexpected connection future error")
# Connection future already resulted (with or without exception)
self._conn_future = None
assert (self._conn_future is None)
self._conn_future = asyncio.ensure_future(
self._connection(),
loop=self.Loop
)
[docs] def _on_application_stop(self, message_type, counter):
"""
Description:
:returns:
"""
self._output_queue.put_nowait((None, None, None))
[docs] async def _connection(self):
"""
Description:
:returns:
"""
producer = await self.Connection.create_producer(**self._producer_params)
try:
await producer.start()
while True:
topic, message, kafka_key = await self._output_queue.get()
if topic is None and message is None:
break
if self._output_queue.qsize() == self._output_queue_max_size - 1:
self.Pipeline.throttle(self, False)
await producer.send_and_wait(topic, message, key=kafka_key)
finally:
await producer.stop()
[docs] def process(self, context, event: typing.Union[dict, str, bytes]):
"""
Outputs events to a chosen location.
**Parameters**
context : type
Additional information.
event:typing.Union[dict, str, bytes] :
"""
if type(event) == dict:
event = json.dumps(event)
event = event.encode(self.Encoding)
elif type(event) == str:
event = event.encode(self.Encoding)
kafka_topic = context.get("kafka_topic", self.Topic)
kafka_key = context.get("kafka_key")
if self._key_serializer is not None and kafka_key is not None:
kafka_key = self._key_serializer(kafka_key)
self._output_queue.put_nowait((kafka_topic, event, kafka_key))
if self._output_queue.qsize() == self._output_queue_max_size:
self.Pipeline.throttle(self, True)