import asyncio
import logging
import re
import aiohttp
from ..abc.connection import Connection
#
L = logging.getLogger(__name__)
#
[docs]class InfluxDBConnection(Connection):
"""
Description: InfluxDBConnection serves to connect BSPump application with an InfluxDB database.
The InfluxDB server is accessed via URL, and the database is specified
using the `db` parameter in the configuration.
.. code:: python
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.influxdb.InfluxDBConnection(app, "InfluxConnection1")
)
**Config Default**
url : http://localhost:8086/
db : mydb
output_queue_max_size : 10
output_bucket_max_size : 1000 * 1000
timout : 30
retry_enabled : False
response_codes_to_retry : 404, 502, 503, 504
"""
ConfigDefaults = {
"url": 'http://localhost:8086/',
'db': 'mydb',
'output_queue_max_size': 10,
'output_bucket_max_size': 1000 * 1000,
'timeout': 30,
'retry_enabled': False,
'response_codes_to_retry': '404,502,503,504'
}
[docs] def __init__(self, app, id=None, config=None):
"""
Description:
**Parameters**
app : Application
Name of the Application.
id : ID, default = None
config : JSON, default = None
Configuration file with additional information.
"""
super().__init__(app, id=id, config=config)
self.url = self.Config["url"].strip()
if self.url[-1] != '/':
self.url += '/'
self._url_write = self.url + 'write?db=' + self.Config["db"]
self._output_bucket_max_size = int(self.Config["output_bucket_max_size"])
self._output_queue_max_size = int(self.Config['output_queue_max_size'])
self._timeout = aiohttp.ClientTimeout(total=int(self.Config['timeout']))
self.RetryEnabled = self.Config.getboolean("retry_enabled")
self.AllowedBulkResponseCodes = frozenset(
[int(x) for x in re.findall(r"[0-9]+", self.Config['response_codes_to_retry'])]
)
self._output_queue = asyncio.Queue(loop=app.Loop)
self._started = True
self._output_bucket = ""
self.PubSub = app.PubSub
self.PubSub.subscribe("Application.tick!", self._on_tick)
self.PubSub.subscribe("Application.exit!", self._on_exit)
self._future = asyncio.ensure_future(self._loader())
def consume(self, data):
"""
Description: Consumes user-defined data to be stored in the InfluxDB database.
**Parameters**
data :
"""
self._output_bucket += data
if len(self._output_bucket) > self._output_bucket_max_size:
self.flush()
async def _on_exit(self, event_name):
self._started = False
self.flush()
await self._output_queue.put(None) # By sending None via queue, we signalize end of life
await self._future # Wait till the _loader() terminates
async def _on_tick(self, event_name):
if self._started and self._future.done():
# Ups, _loader() task crashed during runtime, we need to restart it
try:
r = self._future.result()
# This error should never happen
L.error("Influx error observed, returned: '{}' (should be None)".format(r))
except Exception as e:
L.exception(f"Influx error, {e}, observed, restoring the order")
self._future = asyncio.ensure_future(self._loader())
self.flush()
def flush(self, event_name=None):
"""
Description: Directly flushes the content of the internal bucket with data to InfluxDB database.
**Parameters**
event_name : ?, default = None
"""
if len(self._output_bucket) == 0:
return
assert (self._output_bucket is not None)
self._output_queue.put_nowait(self._output_bucket)
self._output_bucket = ""
if self._output_queue.qsize() == self._output_queue_max_size:
self.PubSub.publish("InfluxDBConnection.pause!", self)
async def _loader(self):
# A cycle that regularly sends buckets if there are any
while self._started:
_output_bucket = await self._output_queue.get()
if _output_bucket is None:
break
if self._output_queue.qsize() == self._output_queue_max_size - 1:
self.PubSub.publish("InfluxDBConnection.unpause!", self, asynchronously=True)
# Sending the data asynchronously
try:
async with aiohttp.ClientSession(timeout=self._timeout) as session:
async with session.post(self._url_write, data=_output_bucket) as resp:
resp_body = await resp.text()
if resp.status in self.AllowedBulkResponseCodes and self.RetryEnabled:
L.warning(
f"Retryable response code recieved, retrying. Queue size {self._output_queue.qsize()}"
)
self._output_queue.put_nowait(_output_bucket)
self.PubSub.publish("InfluxDBConnection.pause!", self)
await asyncio.sleep(3)
self.PubSub.publish("InfluxDBConnection.unpause!", self)
elif resp.status is None:
L.error(
"Failed to insert a line into Influx status:{} body:{}".format(resp.status, resp_body))
raise RuntimeError("Failed to insert line into Influx")
# Here we define errors, that we want to retry
except OSError:
if self.RetryEnabled:
L.warning(f"Retryable exception raised, retrying. Queue size {self._output_queue.qsize()}")
self._output_queue.put_nowait(_output_bucket)
self.PubSub.publish("InfluxDBConnection.pause!", self)
await asyncio.sleep(3)
self.PubSub.publish("InfluxDBConnection.unpause!", self)