Elastic Search
Elastic Search is a Analytics and full-text search engine. Commonly used for Application Performance Management mainly Analysis of Logs.
Source
ElasticSearchSource is using standard Elastic’s search API to fetch data.
configs
index - Elastic’s index (default is ‘index-
*’).scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
specific pamameters
paging - boolean (default is True)
request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
- class ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]
Bases:
TriggerSourceDescription:
- ElasticSearchSource.__init__()[source]
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
paging : ?, default = True
- idID, default = None
ID
- configJSON/dict, default = None
Configuration file with additional information.
Source Methods
ElasticSearch Aggs Source
ElasticSearchAggsSource is used for Elastic’s search aggregations.
configs
index: - Elastic’s index (default is ‘index-
*’).specific pamameters
request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
- class ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]
Bases:
TriggerSourceDescription:
- ElasticSearchAggsSource.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
- idID, default = None
ID info
- configJSON/dict, default = None
configuration file with additional information.
ElasticSearch Aggs Source Methods
- async ElasticSearchAggsSource.process_aggs(path, aggs_name, aggs)[source]
Description:
Parameters
path :
aggs_name :
agss :
- async ElasticSearchAggsSource.process_buckets(path, parent, buckets)[source]
Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process
Parameters
path :
parent :
buckets :
ElasticSearch Connection
ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance
usage:
# adding connection to PumpService
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# pass connection name ("ESConnection" in our example) to relevant BSPump's object:
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
- class ElasticSearchConnection(app, id=None, config=None)[source]
Bases:
ConnectionDescription:
Sample Config
- url‘’http’://{ip/localhost}:{port}’
URL of the source. Could be multi-URL. Each URL should be separated by ‘;’ to a node in ElasticSearch cluster.
- username‘string’ , default = ‘ ‘
Used when authentication is required
- password‘string’, default = ‘ ‘
Used when authentication is required
- loader_per_urlint, default = 4
Number of parallel loaders per URL.
- output_queue_max_sizeint, default = 10
Maximum queue size.
- bulk_out_max_size? * ? * ?, default = 12 * 1024 * 1024
??
- timeoutint, default = 300
Timout value.
- fail_log_max_sizeint, default = 20
Maximum size of failed log messages.
- precise_error_handlingbool, default = False
If True all Errors will be logged, If false soft errors will be omitted in the Logs.
- ElasticSearchConnection.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application
- idID, default= None
ID
- configJSON or dict, default= None
configuration file with additional information for the methods.
ElasticSearch Connection Methods
- ElasticSearchConnection.get_session()[source]
Returns current Client Session Authentication and Loop
- Returns
aiohttp.ClientSession(auth=self._auth, loop=self.Loop)
- ElasticSearchConnection.consume(index, data_feeder_generator, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>)[source]
Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method.
Parameters
index :
data_feeder_generator :
- bulk_class=ElasticSearchBulk :
creates a instance of the ElasticSearchBulk class
Elastic Search Bulk
Elastic Search Bulk Methods
- ElasticSearchBulk.consume(data_feeder_generator)[source]
Appends all items in data_feeder_generator to Items list. Consumer also resets Aging and Capacity.
Parameters
- data_feeder_generatorlist
list of our data that will be passed to a generator and later Uploaded to ElasticSearch.
- Returns
self.Capacity <= 0
- async ElasticSearchBulk.upload(url, session, timeout)[source]
Uploads data to Elastic Search.
Parameters
- urlstring
Uses URL from config to connect to ElasticSearch Rest API.
- session?
?
- timeoutint
uses timout value from config. Value of time for how long we want to be connected to ElasticSearch.
- Returns
?
- ElasticSearchBulk.partial_error_callback(response_items)[source]
Description: When an upload to ElasticSearch fails for error items (document could not be inserted), this callback is called.
Parameters
response_items :
- Parameters
response_items – list with dict items: {“index”: {“_id”: …, “error”: …}}
- ElasticSearchBulk.full_error_callback(bulk_items, return_code)[source]
Description: When an upload to ElasticSearch fails b/c of ElasticSearch error, this callback is called.
Parameters
- bulk_itemslist
list with tuple items: (_id, data)
- return_code :
ElasticSearch return code
- Returns
False if the bulk is to be resumbitted again
Lookup
- class ElasticSearchLookup(app, connection, id=None, config=None, cache=None, lazy=False)[source]
Bases:
MappingLookup,AsyncLookupMixinThe lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits.
configs
index - Elastic’s index
key - field name to match
scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
The ElasticSearchLookup can be then located and used inside a custom enricher: class AsyncEnricher(bspump.Generator): def __init__(self, app, pipeline, id=None, config=None): super().__init__(app, pipeline, id, config) svc = app.get_service("bspump.PumpService") self.Lookup = svc.locate_lookup("MySQLLookup") async def generate(self, context, event, depth): if 'user' not in event: return None info = await self.Lookup.get(event['user']) # Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth)
- ElasticSearchLookup.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
cache : ?,default= None
lazy : ?, default= None
Lookup methods
- async ElasticSearchLookup.get(key)[source]
Obtain the value from lookup asynchronously.
Parameters
key : ?
- Returns
value
- ElasticSearchLookup.build_find_one_query(key) dict[source]
Override this method to build your own lookup query
Parameters
key : ?
- Returns
Default single-key query
- classmethod ElasticSearchLookup.construct(app, definition: dict)[source]
Constructs config, id, and connection based on config.
Parameters
- appApplication
Name of the Application.
- definition:dictDefinition
Definition containing information about certain variables.
- Returns
cls(app, newid, connection, config)
Sink
- class ElasticSearchSink(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]
Bases:
SinkElasticSearchSink allows you to insert events into ElasticSearch through POST requests
The following attributes can be passed to the context and thus override the default behavior of the sink:
es_index (STRING): ElasticSearch index name
data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is:
- def data_feeder_create_or_index(event):
_id = event.pop(“_id”, None)
- if _id is None:
yield b’{“create”:{}}
- ‘
- else:
- yield orjson.dumps(
{“index”: {“_id”: _id}}, option=orjson.OPT_APPEND_NEWLINE
)
yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)
- ElasticSearchSink.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
bulk_class=ElasticBulk :
data_feeder=data_feeder_create_or_index :
Sink methods
Data Feeder Methods
- data_feeder.data_feeder_create_or_index()
Creates an index.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_create()
Creates a data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_index()
Description:
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_update()
Updates data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_delete()
Deletes data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.