Source code for bspump.elasticsearch.source

import logging

from ..abc.source import TriggerSource

L = logging.getLogger(__name__)


[docs]class ElasticSearchSource(TriggerSource): """ Description: """ ConfigDefaults = { 'index': 'index-*', 'scroll_timeout': '1m', }
[docs] def __init__(self, app, pipeline, connection, request_body=None, paging=True, id=None, config=None): """ **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection Information of the connection. request_body JSON, default = None Request body needed for the request API call. paging : ?, default = True id : ID, default = None ID config : JSON/dict, default = None Configuration file with additional information. """ super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Index = self.Config['index'] self.ScrollTimeout = self.Config['scroll_timeout'] self.Paging = paging if request_body is not None: self.RequestBody = request_body else: self.RequestBody = { 'query': { 'bool': { 'must': { 'match_all': {} } } }}
[docs] async def cycle(self): """ Gets data from Elastic and injects them into the pipeline. """ scroll_id = None while True: if scroll_id is None: path = '{}/_search?scroll={}'.format(self.Index, self.ScrollTimeout) request_body = self.RequestBody else: path = "_search/scroll" request_body = {"scroll": self.ScrollTimeout, "scroll_id": scroll_id} url = self.Connection.get_url() + path async with self.Connection.get_session() as session: async with session.post( url, json=request_body, headers={'Content-Type': 'application/json'} ) as response: if response.status != 200: data = await response.text() L.error("Failed to fetch data from ElasticSearch: {} from {}\n{}".format(response.status, url, data)) break msg = await response.json() scroll_id = msg.get('_scroll_id') if scroll_id is None: break hits = msg['hits']['hits'] if len(hits) == 0: break # Feed messages into a pipeline for hit in hits: await self.process(hit['_source']) if not self.Paging: break
[docs]class ElasticSearchAggsSource(TriggerSource): """ Description: """ ConfigDefaults = { 'index': 'index-*', }
[docs] def __init__(self, app, pipeline, connection, request_body=None, id=None, config=None): """ Description: **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection Information of the connection. request_body JSON, default = None Request body needed for the request API call. id : ID, default = None ID info config : JSON/dict, default = None configuration file with additional information. """ super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Index = self.Config['index'] if request_body is not None: self.RequestBody = request_body else: self.RequestBody = { 'query': { 'bool': { 'must': { 'match_all': {} } } } }
[docs] async def cycle(self): """ Sets request body and path to create query call. | """ request_body = self.RequestBody path = '{}/_search?'.format(self.Index) url = self.Connection.get_url() + path async with self.Connection.get_session() as session: async with session.post( url, json=request_body, headers={'Content-Type': 'application/json'} ) as response: if response.status != 200: data = await response.text() L.error("Failed to fetch data from ElasticSearch: {} from {}\n{}".format(response.status, url, data)) return msg = await response.json() aggs = msg['aggregations'] if len(aggs) == 0: return start_name = list(aggs.keys())[0] start = aggs[start_name] path = {} await self.process_aggs(path, start_name, start)
[docs] async def process_aggs(self, path, aggs_name, aggs): """ Description: **Parameters** path : aggs_name : agss : """ if 'buckets' in aggs: await self.process_buckets(path, aggs_name, aggs["buckets"]) if 'value' in aggs: path[aggs_name] = aggs['value'] event = {} event.update(path) await self.process(event) path.pop(aggs_name)
[docs] async def process_buckets(self, path, parent, buckets): """ Recursive function for buckets processing. It iterates through keys of the dictionary, looking for 'buckets' or 'value'. If there are 'buckets', calls itself, if there is 'value', calls process_aggs and sends an event to process **Parameters** path : parent : buckets : """ for bucket in buckets: for k in bucket.keys(): if k == 'key': path[parent] = bucket[k] elif isinstance(bucket[k], dict): await self.process_aggs(path, k, bucket[k])