Source code for bspump.kafka.keyfilter

import bspump
import collections.abc


[docs]class KafkaKeyFilter(bspump.Processor): """ KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event. Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided 'keys', other events will be discarded. Set filtering `keys` as a parameter (in bytes) in the KafkaKeyFilter constructor. KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline. """
[docs] def __init__(self, app, pipeline, keys, id=None, config=None): """ Initializes variables **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_. pipeline : Pipeline Name of the Pipeline. keys : bytes keys used to filter out events from the event stream. id : , default = None config : JSON, default = None configuration file in JSON """ super().__init__(app, pipeline, id, config) if not isinstance(keys, collections.abc.Iterable) or isinstance(keys, bytes): self.Keys = frozenset([keys]) else: self.Keys = frozenset(keys)
[docs] def process(self, context, event): """ Does the filtering processed based on passed key variable. **Parameters** context : Context additional information passed to the method event : any type,a single unit of information that flows through the Pipeline. """ kafka_ctx = context.get("kafka") assert (kafka_ctx is not None) key = kafka_ctx.key if key is not None and key in self.Keys: return event else: return None