Source
- class KafkaSource(app, pipeline, connection, id=None, config=None)[source]
Bases:
SourceKafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance.
- __init__(app, pipeline, connection, id=None, config=None)[source]
Initializes parameters.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
id : , default = None
config : , default = None
Source methods
- async KafkaSource._not_ready_handler(message_type, *args, **kwargs)[source]
Calls _commit when pipeline is throttled.
Parameters
message_types :
*args :
**kwargs :
- async KafkaSource._simulate_event()[source]
Description: The _simulate_event method should be called in main method after a message has been processed.
It ensures that all other asynchronous events receive enough time to perform their tasks. Otherwise, the application loop is blocked by a file reader and no other activity makes a progress.