Sink

class KafkaSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]

Bases: Sink

Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.

KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.

class KafkaPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)
                self.build(
                        bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                        bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
        )

There are two ways to use KafkaSink:
- Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline.
- Specify topic separetly for each event in event context - context['kafka_topic'].
        Topic from configuration is than used as a default topic.
        To provide business logic for event distribution, you can create topic selector processor.
Processor example:
class KafkaTopicSelector(bspump.Processor):

        def process(self, context, event):
                if event.get("weight") > 10:
                        context["kafka_topic"] = "heavy"
                else:
                        context["kafka_topic"] = "light"

                return event

Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key'].
If kafka_key is not provided, key defaults to None.
__init__(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]

Initilizes the parameters that are passed to the Sink class.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

information needed to create a connection.

key_serializer : , default = None

id : , default = None

config : , default = None

Sink methods

KafkaSink._on_health_check(message_type)[source]

Description:

Returns

KafkaSink._on_application_stop(message_type, counter)[source]

Description:

Returns

async KafkaSink._connection()[source]

Description:

Returns

KafkaSink.process(context, event: Union[dict, str, bytes])[source]

Outputs events to a chosen location.

Parameters

contexttype

Additional information.

event:typing.Union[dict, str, bytes] :