Source code for bspump.influxdb.sink

import logging

from ..abc.sink import Sink

#

L = logging.getLogger(__name__)

#


[docs]class InfluxDBSink(Sink): """ Description: InfluxDBSink is a sink processor, that stores the event into an InfluxDB database specified in the InfluxDBConnection object. .. code:: python class SamplePipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.socket.TCPStreamSource(app, self, config={'port': 7000}), bspump.influxdb.InfluxDBSink(app, self, "InfluxConnection1") ) """
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): """ Description: **Parameters** app : pipeline : connection : id : ID, default = None config : str,JSON, default = None """ super().__init__(app, pipeline, id=id, config=config) self._connection = pipeline.locate_connection(app, connection) app.PubSub.subscribe("InfluxDBConnection.pause!", self._connection_throttle) app.PubSub.subscribe("InfluxDBConnection.unpause!", self._connection_throttle)
# TODO: Restructure data: { "measurement": "location", "tag_set": "location=us-midwest", "field_set": "temperature=82", "timestamp": 1465839830100400200 }
[docs] def process(self, context, event): """ Description: **Parameters** context : event : any data type Information with timestamp. """ if isinstance(event, tuple): measurement, tag_set, field_set, timestamp = event wire_line = "{},{} {} {}\n".format(measurement, tag_set, field_set, int(timestamp * 1e9)) elif isinstance(event, bytes): wire_line = event.decode('utf-8') if wire_line[-1:] != '\n': wire_line += '\n' elif isinstance(event, str): wire_line = event if wire_line[-1:] != '\n': wire_line += '\n' else: raise RuntimeError("Incorrect format") # Passing the processed event to the connection self._connection.consume(wire_line)
def _connection_throttle(self, event_name, connection): if connection != self._connection: return if event_name == "InfluxDBConnection.pause!": self.Pipeline.throttle(self, True) elif event_name == "InfluxDBConnection.unpause!": self.Pipeline.throttle(self, False) else: raise RuntimeError("Unexpected event name '{}'".format(event_name))