Source

class KafkaSource(app, pipeline, connection, id=None, config=None)[source]

Bases: Source

KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
                )

To ensure that after restart, pump will continue receiving messages where it left of, group_id has to
be provided in the configuration.

When the group_id is set, the consumer group is created and the Kafka server will then operate
in the producer-consumer mode. It means that every consumer with the same group_id will be assigned
unique set of partitions, hence all messages will be divided among them and thus unique.

Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous
way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder).
Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer
from the partition, thus causing rebalance.
__init__(app, pipeline, connection, id=None, config=None)[source]

Initializes parameters.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

information needed to create a connection.

id : , default = None

config : , default = None

Source methods

KafkaSource.create_consumer()[source]

Creates a consumer.

async KafkaSource.initialize_consumer()[source]

Creates a consumer after the loop is started.

async KafkaSource._not_ready_handler(message_type, *args, **kwargs)[source]

Calls _commit when pipeline is throttled.

Parameters

message_types :

*args :

**kwargs :

async KafkaSource.main()[source]

Method that starts the Source.

async KafkaSource._commit(offsets=None)[source]

Description:

Returns

async KafkaSource._simulate_event()[source]

Description: The _simulate_event method should be called in main method after a message has been processed.

It ensures that all other asynchronous events receive enough time to perform their tasks. Otherwise, the application loop is blocked by a file reader and no other activity makes a progress.