connection
- class KafkaConnection(app, id=None, config=None)[source]
Bases:
ConnectionKafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.
config = {"compression_type": "gzip"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) )
ConfigDefaultsoptions:- compression_type (str): Kafka supports several compression types:
gzip,snappyandlz4. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.
- security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN
- sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
- __init__(app, id=None, config=None)[source]
initializes variables
Parameters
- appApplication
Name of the Application.
- id, default = None
ID information.
- configJSON or txt, default= None
Configuration file of any supported type.
- compression_type (str): Kafka supports several compression types:
connection methods
- async KafkaConnection.create_producer(**kwargs)[source]
Creates a Producer.
Parameters
- **kwargs :
Additional information can be passed to this method.
- Returns
producer
- KafkaConnection.create_consumer(*topics, **kwargs)[source]
Creates a consumer.
Parameters
- *topics :
any number of topics can be passed to this method.
- **kwargs :
additional information can be passed to this method.
- Returns
consumer