Apache Kafka

Connection

class KafkaConnection(app, id=None, config=None)[source]

Bases: Connection

KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.

config = {"compression_type": "gzip"}
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection", config)
)

ConfigDefaults options:

compression_type (str): Kafka supports several compression types: gzip, snappy and lz4.

This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.

security_protocol (str): Protocol used to communicate with brokers.

Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.

sasl_mechanism (str): Authentication mechanism when security_protocol

is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN

sasl_plain_username (str): username for sasl PLAIN authentication.

Default: None

sasl_plain_password (str): password for sasl PLAIN authentication.

Default: None

KafkaConnection.__init__()[source]

initializes variables

Parameters

appApplication

Name of the Application.

id, default = None

ID information.

configJSON or txt, default= None

Configuration file of any supported type.

connection Methods

async KafkaConnection.create_producer(**kwargs)[source]

Creates a Producer.

Parameters

**kwargs :

Additional information can be passed to this method.

Returns

producer


KafkaConnection.create_consumer(*topics, **kwargs)[source]

Creates a consumer.

Parameters

*topics :

any number of topics can be passed to this method.

**kwargs :

additional information can be passed to this method.

Returns

consumer


KafkaConnection.get_bootstrap_servers()[source]

Returns parsed bootstrap servers found in config.

Returns

list of url


KafkaConnection.get_compression()[source]

Returns compression type to use in connection

Returns

compression_type


Source

class KafkaSource(app, pipeline, connection, id=None, config=None)[source]

Bases: Source

KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
                )

To ensure that after restart, pump will continue receiving messages where it left of, group_id has to
be provided in the configuration.

When the group_id is set, the consumer group is created and the Kafka server will then operate
in the producer-consumer mode. It means that every consumer with the same group_id will be assigned
unique set of partitions, hence all messages will be divided among them and thus unique.

Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous
way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder).
Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer
from the partition, thus causing rebalance.
KafkaSource.__init__()[source]

Initializes parameters.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

information needed to create a connection.

id : , default = None

config : , default = None

Source Methods

KafkaSource.create_consumer()[source]

Creates a consumer.

async KafkaSource.initialize_consumer()[source]

Creates a consumer after the loop is started.

async KafkaSource.main()[source]

Method that starts the Source.

Sink

class KafkaSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]

Bases: Sink

Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.

KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
        )

There are two ways to use KafkaSink:
- Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline.
- Specify topic separetly for each event in event context - context['kafka_topic'].
        Topic from configuration is than used as a default topic.
        To provide business logic for event distribution, you can create topic selector processor.
Processor example:
class KafkaTopicSelector(bspump.Processor):

        def process(self, context, event):
                if event.get("weight") > 10:
                        context["kafka_topic"] = "heavy"
                else:
                        context["kafka_topic"] = "light"

                return event

Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key'].
If kafka_key is not provided, key defaults to None.
KafkaSink.__init__()[source]

Initilizes the parameters that are passed to the Sink class.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

information needed to create a connection.

key_serializer : , default = None

id : , default = None

config : , default = None

Sink Methods

KafkaSink.process(context, event: Union[dict, str, bytes])[source]

Outputs events to a chosen location.

Parameters

contexttype

Additional information.

event:typing.Union[dict, str, bytes] :

Key Filter Kafka

class KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]

Bases: Processor

KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.

Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.

Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.

KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.

KafkaKeyFilter.__init__()[source]

Initializes variables

Parameters

appApplication

Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.

pipelinePipeline

Name of the Pipeline.

keysbytes

keys used to filter out events from the event stream.

id : , default = None

configJSON, default = None

configuration file in JSON

KafkaKeyFilter.process(context, event)[source]

Does the filtering processed based on passed key variable.

Parameters

contextContext

additional information passed to the method

event : any type,a single unit of information that flows through the Pipeline.

Batch Sink

class KafkaBatchSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]

Bases: KafkaSink

KafkaBatchSink is a sink processor that forwards the event to an Apache Kafka specified by a KafkaConnection object in batches.

It is a proof of concept sink, that allows faster processing of events in the pipeline, but does not guarantee processing of all events in situations when the pump is closed etc.

There is a work to be done with cooperation with aiokafka, so the send_and_wait method works properly and is able to send events in batches.

KafkaBatchSink.__init__()[source]

Initializing parameters passed to the BatchSink class.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

Information needed to creates connection.

key_serializer : ,default None

id : , default = None

configJSON, default = None

Configuration file with additional information.

Batch Sink Methods

KafkaBatchSink.process(context, event: Union[dict, str, bytes])[source]

Starts the sink process.

Parameters

contexttype?

Additional information.

event: typing.Union[dict, str, bytes] : type?

Topic Initializer

class KafkaTopicInitializer(app, connection, id: Optional[str] = None, config: Optional[dict] = None)[source]

Bases: Configurable

KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.

KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.

Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()

KafkaTopicInitializer.__init__()[source]

Initializes the parameters passed to the class.

Parameters

appApplication

Name of the Application.

connectionConnection

Information needed to create a connection.

id: typing.Optional[str] = None :

config: dict = NoneJSON

configuration file containing important information.

topic initializer methods

KafkaTopicInitializer.include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]

Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.

Parameters

  • :

topic_config, default= None

Topic config file.

kafka_component : , default= None

pipeline, default= None

Name of the Pipeline.

config_file, default= None

Configuration file.

KafkaTopicInitializer.include_topics_from_file(topics_file: str)[source]

Includes topics from a topic file.

Parameters

topics_file:strstr

Name of a topic file we wanted to include.

KafkaTopicInitializer.include_topics_from_config(config_object)[source]

Includes topics using a config

Parameters

config_objectJSON

config object containing information about what topics we want to include.

KafkaTopicInitializer.fetch_existing_topics()[source]

???

KafkaTopicInitializer.check_and_initialize()[source]

Initializes new topics and logs a warning.

KafkaTopicInitializer.initialize_topics()[source]

Initializes topics ??