connection

class KafkaConnection(app, id=None, config=None)[source]

Bases: Connection

KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.

config = {"compression_type": "gzip"}
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection", config)
)

ConfigDefaults options:

compression_type (str): Kafka supports several compression types: gzip, snappy and lz4.

This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.

security_protocol (str): Protocol used to communicate with brokers.

Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.

sasl_mechanism (str): Authentication mechanism when security_protocol

is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN

sasl_plain_username (str): username for sasl PLAIN authentication.

Default: None

sasl_plain_password (str): password for sasl PLAIN authentication.

Default: None

__init__(app, id=None, config=None)[source]

initializes variables

Parameters

appApplication

Name of the Application.

id, default = None

ID information.

configJSON or txt, default= None

Configuration file of any supported type.

connection methods

async KafkaConnection.create_producer(**kwargs)[source]

Creates a Producer.

Parameters

**kwargs :

Additional information can be passed to this method.

Returns

producer


KafkaConnection.create_consumer(*topics, **kwargs)[source]

Creates a consumer.

Parameters

*topics :

any number of topics can be passed to this method.

**kwargs :

additional information can be passed to this method.

Returns

consumer


KafkaConnection.get_bootstrap_servers()[source]

Returns parsed bootstrap servers found in config.

Returns

list of url


KafkaConnection.get_compression()[source]

Returns compression type to use in connection

Returns

compression_type