import json
import logging
import requests
from ..abc.lookup import MappingLookup
from ..abc.lookup import AsyncLookupMixin
from ..cache import CacheDict
L = logging.getLogger(__name__)
[docs]class ElasticSearchLookup(MappingLookup, AsyncLookupMixin):
"""
The lookup that is linked with a ES.
It provides a mapping (dictionary-like) interface to pipelines.
It feeds lookup data from ES using a query.
It also has a simple cache to reduce a number of database hits.
**configs**
*index* - Elastic's index
*key* - field name to match
*scroll_timeout* - Timeout of single scroll request (default is '1m'). Allowed time units:
https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
.. code:: python
The ElasticSearchLookup can be then located and used inside a custom enricher:
class AsyncEnricher(bspump.Generator):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
svc = app.get_service("bspump.PumpService")
self.Lookup = svc.locate_lookup("MySQLLookup")
async def generate(self, context, event, depth):
if 'user' not in event:
return None
info = await self.Lookup.get(event['user'])
# Inject a new event into a next depth of the pipeline
self.Pipeline.inject(context, event, depth)
"""
ConfigDefaults = {
'index': '', # Specify an index
'key': '', # Specify field name to match
'scroll_timeout': '1m',
}
[docs] def __init__(self, app, connection, id=None, config=None, cache=None, lazy=False):
"""
Description:
**Parameters**
app : Application
Name of the Application.
connection : Connection
Name of the Connection
id : ID, default= None
ID
config : JSON, default= None
Configuration file with additional information.
cache : ?,default= None
lazy : ?, default= None
"""
super().__init__(app, id=id, config=config, lazy=lazy)
self.Connection = connection
self.Index = self.Config['index']
self.ScrollTimeout = self.Config['scroll_timeout']
self.Key = self.Config['key']
self.Count = -1
if cache is None:
self.Cache = CacheDict()
else:
self.Cache = cache
metrics_service = app.get_service('asab.MetricsService')
self.CacheCounter = metrics_service.create_counter("es.lookup.cache", tags={}, init_values={'hit': 0, 'miss': 0})
self.SuccessCounter = metrics_service.create_counter("es.lookup.success", tags={}, init_values={'hit': 0, 'miss': 0})
async def _find_one(self, key):
prefix = '_search'
request = {
"size": 1,
"query": self.build_find_one_query(key)
}
url = self.Connection.get_url() + '{}/{}'.format(self.Index, prefix)
async with self.Connection.get_session() as session:
async with session.post(
url,
json=request,
headers={'Content-Type': 'application/json'}
) as response:
if response.status != 200:
data = await response.text()
L.error("Failed to fetch data from ElasticSearch: {} from {}\n{}".format(response.status, url, data))
msg = await response.json()
try:
hit = msg['hits']['hits'][0]
except Exception:
return None
return hit["_source"]
[docs] async def get(self, key):
"""
Obtain the value from lookup asynchronously.
**Parameters**
key : ?
:return: value
|
"""
value = None
try:
value = self.Cache[key]
self.CacheCounter.add('hit', 1)
except KeyError:
try:
value = await self._find_one(key)
if value is not None:
self.Cache[key] = value
self.CacheCounter.add('miss', 1)
except Exception as e:
L.warn("There was an exception {}".format(e))
if value is None:
self.SuccessCounter.add('miss', 1)
else:
self.SuccessCounter.add('hit', 1)
return value
[docs] def build_find_one_query(self, key) -> dict:
"""
Override this method to build your own lookup query
**Parameters**
key : ?
:return: Default single-key query
|
"""
return {
'match': {
self.Key: key
}
}
async def _count(self):
prefix = "_count"
request = {
"query": {
"match_all": {}
}
}
url = self.Connection.get_url() + '{}/{}'.format(self.Index, prefix)
async with self.Connection.get_session() as session:
async with session.post(
url,
json=request,
headers={'Content-Type': 'application/json'}
) as response:
if response.status != 200:
data = await response.text()
L.error("Failed to fetch data from ElasticSearch: {} from {}\n{}".format(response.status, url, data))
msg = await response.json()
return int(msg["count"])
[docs] async def load(self):
"""
Sets the length of Cache to Count.
:return: True
|
"""
self.Count = len(self.Cache)
return True
def __len__(self):
return self.Count
def __getitem__(self, key):
# To avoid synchronous operations completely
raise NotImplementedError()
def __iter__(self):
scroll_id = None
request = {
"size": 10000,
"query": {
"match_all": {}
}
}
all_hits = []
while True:
if scroll_id is None:
path = '{}/_search?scroll={}'.format(self.Index, self.ScrollTimeout)
request_body = request
else:
path = "_search/scroll"
request_body = {"scroll": self.ScrollTimeout, "scroll_id": scroll_id}
url = self.Connection.get_url() + path
response = requests.post(url, json=request_body)
if response.status_code != 200:
data = response.text()
L.error("Failed to fetch data from ElasticSearch: {} from {}\n{}".format(response.status_code, url, data))
break
data = json.loads(response.text)
scroll_id = data.get('_scroll_id')
if scroll_id is None:
break
hits = data['hits']['hits']
if len(hits) == 0:
break
all_hits.extend(hits)
self.Iterator = all_hits.__iter__()
return self
def __next__(self):
element = next(self.Iterator)
key = element['_source'].get(self.Key)
if key is not None:
self.Cache[key] = element['_source']
return key
[docs] @classmethod
def construct(cls, app, definition: dict):
"""
Constructs config, id, and connection based on config.
**Parameters**
app : Application
Name of the Application.
definition:dict : Definition
Definition containing information about certain variables.
:return: cls(app, newid, connection, config)
|
"""
newid = definition.get('id')
config = definition.get('config')
connection = definition['args']['connection']
return cls(app, newid, connection, config)