Apache Kafka
Connection
- class KafkaConnection(app, id=None, config=None)[source]
Bases:
ConnectionKafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.
config = {"compression_type": "gzip"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) )
ConfigDefaultsoptions:- compression_type (str): Kafka supports several compression types:
gzip,snappyandlz4. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.
- security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN
- sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
- compression_type (str): Kafka supports several compression types:
- KafkaConnection.__init__()[source]
initializes variables
Parameters
- appApplication
Name of the Application.
- id, default = None
ID information.
- configJSON or txt, default= None
Configuration file of any supported type.
connection Methods
- async KafkaConnection.create_producer(**kwargs)[source]
Creates a Producer.
Parameters
- **kwargs :
Additional information can be passed to this method.
- Returns
producer
- KafkaConnection.create_consumer(*topics, **kwargs)[source]
Creates a consumer.
Parameters
- *topics :
any number of topics can be passed to this method.
- **kwargs :
additional information can be passed to this method.
- Returns
consumer
Source
- class KafkaSource(app, pipeline, connection, id=None, config=None)[source]
Bases:
SourceKafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance.
- KafkaSource.__init__()[source]
Initializes parameters.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
id : , default = None
config : , default = None
Source Methods
Sink
- class KafkaSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]
Bases:
SinkDescription: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.
KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) There are two ways to use KafkaSink: - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline. - Specify topic separetly for each event in event context - context['kafka_topic']. Topic from configuration is than used as a default topic. To provide business logic for event distribution, you can create topic selector processor. Processor example:
class KafkaTopicSelector(bspump.Processor): def process(self, context, event): if event.get("weight") > 10: context["kafka_topic"] = "heavy" else: context["kafka_topic"] = "light" return event Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key']. If kafka_key is not provided, key defaults to None.
- KafkaSink.__init__()[source]
Initilizes the parameters that are passed to the Sink class.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
key_serializer : , default = None
id : , default = None
config : , default = None
Sink Methods
Key Filter Kafka
- class KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]
Bases:
ProcessorKafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.
Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.
Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.
KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.
- KafkaKeyFilter.__init__()[source]
Initializes variables
Parameters
- appApplication
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.
- pipelinePipeline
Name of the Pipeline.
- keysbytes
keys used to filter out events from the event stream.
id : , default = None
- configJSON, default = None
configuration file in JSON
Batch Sink
- class KafkaBatchSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]
Bases:
KafkaSinkKafkaBatchSink is a sink processor that forwards the event to an Apache Kafka specified by a KafkaConnection object in batches.
It is a proof of concept sink, that allows faster processing of events in the pipeline, but does not guarantee processing of all events in situations when the pump is closed etc.
There is a work to be done with cooperation with aiokafka, so the send_and_wait method works properly and is able to send events in batches.
- KafkaBatchSink.__init__()[source]
Initializing parameters passed to the BatchSink class.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information needed to creates connection.
key_serializer : ,default None
id : , default = None
- configJSON, default = None
Configuration file with additional information.
Batch Sink Methods
Topic Initializer
- class KafkaTopicInitializer(app, connection, id: Optional[str] = None, config: Optional[dict] = None)[source]
Bases:
ConfigurableKafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.
KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.
Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()
- KafkaTopicInitializer.__init__()[source]
Initializes the parameters passed to the class.
Parameters
- appApplication
Name of the Application.
- connectionConnection
Information needed to create a connection.
id: typing.Optional[str] = None :
- config: dict = NoneJSON
configuration file containing important information.
topic initializer methods
- KafkaTopicInitializer.include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]
Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.
Parameters
:
- topic_config, default= None
Topic config file.
kafka_component : , default= None
- pipeline, default= None
Name of the Pipeline.
- config_file, default= None
Configuration file.
- KafkaTopicInitializer.include_topics_from_file(topics_file: str)[source]
Includes topics from a topic file.
Parameters
- topics_file:strstr
Name of a topic file we wanted to include.