Source code for bspump.amqp.sink

import pika

import asab

from ..abc.sink import Sink
import logging


L = logging.getLogger(__name__)


[docs]class AMQPSink(Sink): ConfigDefaults = { 'exchange': 'amq.direct', 'content_type': 'text/plain', 'routing_key': '' }
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self._connection = pipeline.locate_connection(app, connection) self._channel = None self.Pipeline.throttle(self, True) self._exchange = self.Config['exchange'] self._content_type = self.Config['content_type'] self._routing_key = self.Config['routing_key'] app.PubSub.subscribe_all(self) self._connection.PubSub.subscribe("AMQPConnection.open!", self._on_connection_open) self._connection.PubSub.subscribe("AMQPConnection.close!", self._on_connection_close) if self._connection.ConnectionEvent.is_set(): self._on_connection_open("simulated")
@asab.subscribe("Application.tick/10!") def _on_tick(self, event_name): # Heal the connection if self._channel is None and self._connection.ConnectionEvent.is_set(): self._on_connection_open("simulated") def _on_connection_open(self, event_name): self._connection.Connection.channel(on_open_callback=self._on_channel_open) def _on_connection_close(self, event_name): self._channel = None self.Pipeline.throttle(self, True) def _on_channel_open(self, channel): self._channel = channel self.Pipeline.throttle(self, False)
[docs] def process(self, context, event): if self._channel is None: raise RuntimeError("AMQP channel is not open") if self._exchange == '!context': exchange = context.get('amqp.exchange') else: exchange = self._exchange if exchange is None: L.warning("Exchange is not specified") try: self._channel.basic_publish( exchange, self._routing_key, event, pika.BasicProperties(content_type=self._content_type, delivery_mode=1) ) except pika.exceptions.ChannelClosed: self._channel = None self.Pipeline.throttle(self, True) raise