import asyncio
import logging
import pika
import pkg_resources
from ..abc.source import Source
L = logging.getLogger(__name__)
[docs]class AMQPSource(Source):
"""
Description:
"""
ConfigDefaults = {
'queue': 'q',
'error_exchange': 'error',
'prefetch_count': '1000',
}
[docs] def __init__(self, app, pipeline, connection, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self._connection = pipeline.locate_connection(app, connection)
self._channel = None
self._channel_ready = asyncio.Event(loop=app.Loop)
self._channel_ready.clear()
self._error_exchange = self.Config['error_exchange']
self._queue = asyncio.Queue(loop=app.Loop)
self._connection.PubSub.subscribe("AMQPConnection.open!", self._on_connection_open)
self._connection.PubSub.subscribe("AMQPConnection.close!", self._on_connection_close)
[docs] async def main(self):
"""
Description:
"""
if self._connection.ConnectionEvent.is_set() and self._channel is None:
self._on_connection_open(".local!")
try:
while 1:
await self._channel_ready.wait()
await self.Pipeline.ready()
method, properties, body = await self._queue.get()
await self.process_message(method, properties, body)
self._channel.basic_ack(method.delivery_tag)
except asyncio.CancelledError:
pass
except BaseException as e:
L.exception("Error when processing AMQP message")
self.Pipeline.set_error(None, None, e)
# Requeue rest of the messages
while not self._queue.empty():
method, properties, body = await self._queue.get()
self._channel.basic_nack(method.delivery_tag, requeue=True)
if self._channel is not None:
self._channel.close()
self._channel = None
self._channel_ready.clear()
[docs] async def process_message(self, method, properties, body):
"""
Description:
"""
context = {
'amqp:method': method,
'amqp:properties': properties
}
await self.process(body, context=context)
def _on_connection_open(self, event_name):
assert self._channel is None
self._channel = self._connection.Connection.channel(on_open_callback=self._on_channel_open)
self._channel_ready.set()
def _on_connection_close(self, event_name):
self._channel = None
self._channel_ready.clear()
def _on_qos_applied(self, channel):
if pkg_resources.parse_version(pika.__version__) >= pkg_resources.parse_version('1.0.a'):
self._channel.basic_consume(self.Config['queue'], self._on_consume_message)
else:
self._channel.basic_consume(self._on_consume_message, self.Config['queue'])
def _on_channel_open(self, channel):
channel.basic_qos(callback=self._on_qos_applied, prefetch_count=int(self.Config['prefetch_count']))
def _on_consume_message(self, channel, method, properties, body):
try:
self._queue.put_nowait((method, properties, body))
except Exception:
channel.basic_nack(method.delivery_tag, requeue=True)
[docs] @classmethod
def construct(cls, app, pipeline, definition: dict):
"""
Description:
"""
newid = definition.get('id')
config = definition.get('config')
connection = definition['args']['connection']
return cls(app, pipeline, connection, newid, config)
[docs]class AMQPFullMessageSource(AMQPSource):
"""
Description:
"""
[docs] def process_message(self, method, properties, body):
"""
Description:
"""
self.process((method, properties, body))