Source code for bspump.kafka.connection

import logging
import re

import aiokafka

from ..abc.connection import Connection

#

L = logging.getLogger(__name__)

#


[docs]class KafkaConnection(Connection): """ KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages. .. code:: python config = {"compression_type": "gzip"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) ) .. ``ConfigDefaults`` options: compression_type (str): Kafka supports several compression types: ``gzip``, ``snappy`` and ``lz4``. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically. security_protocol (str): Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN sasl_plain_username (str): username for sasl PLAIN authentication. Default: None sasl_plain_password (str): password for sasl PLAIN authentication. Default: None """ ConfigDefaults = { 'bootstrap_servers': 'localhost:9092', # One or more URLs separated by whitespace or semicolon 'compression_type': '', 'security_protocol': 'PLAINTEXT', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': '', 'sasl_plain_password': '', }
[docs] def __init__(self, app, id=None, config=None): """ initializes variables **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. id : , default = None ID information. config : JSON or txt, default= None Configuration file of any supported type. """ super().__init__(app, id=id, config=config) self.Loop = app.Loop
[docs] async def create_producer(self, **kwargs): """ Creates a Producer. **Parameters** **kwargs : Additional information can be passed to this method. :returns: producer | """ producer = aiokafka.AIOKafkaProducer( loop=self.Loop, bootstrap_servers=self.get_bootstrap_servers(), compression_type=self.get_compression(), security_protocol=self.Config.get('security_protocol'), sasl_mechanism=self.Config.get('sasl_mechanism'), sasl_plain_username=self.Config.get('sasl_plain_username') or None, sasl_plain_password=self.Config.get('sasl_plain_password') or None, **kwargs ) return producer
[docs] def create_consumer(self, *topics, **kwargs): """ Creates a consumer. **Parameters** *topics : any number of topics can be passed to this method. **kwargs : additional information can be passed to this method. :returns: consumer | """ consumer = aiokafka.AIOKafkaConsumer( *topics, loop=self.Loop, bootstrap_servers=self.get_bootstrap_servers(), enable_auto_commit=False, security_protocol=self.Config.get('security_protocol'), sasl_mechanism=self.Config.get('sasl_mechanism'), sasl_plain_username=self.Config.get('sasl_plain_username') or None, sasl_plain_password=self.Config.get('sasl_plain_password') or None, **kwargs ) return consumer
[docs] def get_bootstrap_servers(self): """ Returns parsed bootstrap servers found in config. :returns: list of url | """ return [ url for url in re.split(r"[\s;]+", self.Config['bootstrap_servers']) if url ]
[docs] def get_compression(self): """ Returns compression type to use in connection :returns: compression_type | """ compression_type = self.Config.get("compression_type") if compression_type in ("", "none", "None"): compression_type = None return compression_type