BSPump Reference
feature/restructured-text
  • How it works
  • Bitswan Tutorials
  • Reference Documentation
    • Basics
    • Source Construction
    • Processor
    • Top Level Objects
    • Common
    • Advanced
    • Technologies
      • Apache Kafka
      • Elastic Search
        • Source
          • Source Methods
          • ElasticSearch Aggs Source
          • ElasticSearch Aggs Source Methods
        • ElasticSearch Connection
          • ElasticSearch Connection Methods
          • Elastic Search Bulk
          • Elastic Search Bulk Methods
        • Lookup
          • Lookup methods
        • Sink
          • Sink methods
          • Data Feeder Methods
      • Files
      • InfluxDB
      • IPC and Socket
      • FTP
      • RabbitMQ / AMQP
BSPump Reference
  • »
  • Reference Documentation »
  • Technologies »
  • Elastic Search
  • Edit on GitHub
Previous Next

Elastic Search

Elastic Search is a Analytics and full-text search engine. Commonly used for Application Performance Management mainly Analysis of Logs.

Source

ElasticSearchSource is using standard Elastic’s search API to fetch data.

configs

index - Elastic’s index (default is ‘index-*’).

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

specific pamameters

paging - boolean (default is True)

request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
class ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]

Bases: TriggerSource

Description:

ElasticSearchSource.__init__()[source]

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

Information of the connection.

request_body JSON, default = None

Request body needed for the request API call.

paging : ?, default = True

idID, default = None

ID

configJSON/dict, default = None

Configuration file with additional information.

Source Methods

async ElasticSearchSource.cycle()[source]

Gets data from Elastic and injects them into the pipeline.

ElasticSearch Aggs Source

ElasticSearchAggsSource is used for Elastic’s search aggregations.

configs

index: - Elastic’s index (default is ‘index-*’).

specific pamameters

request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html

Default is:

default_request_body = {
        'query': {
                'bool': {
                        'must': {
                                'match_all': {}
                        }
                }
        },
}
class ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]

Bases: TriggerSource

Description:

ElasticSearchAggsSource.__init__()[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

Information of the connection.

request_body JSON, default = None

Request body needed for the request API call.

idID, default = None

ID info

configJSON/dict, default = None

configuration file with additional information.

ElasticSearch Aggs Source Methods

async ElasticSearchAggsSource.cycle()[source]

Sets request body and path to create query call.


async ElasticSearchAggsSource.process_aggs(path, aggs_name, aggs)[source]

Description:

Parameters

path :

aggs_name :

agss :

async ElasticSearchAggsSource.process_buckets(path, parent, buckets)[source]

Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process

Parameters

path :

parent :

buckets :

ElasticSearch Connection

ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance

usage:

# adding connection to PumpService
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# pass connection name ("ESConnection" in our example) to relevant BSPump's object:

self.build(
                bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
                bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
class ElasticSearchConnection(app, id=None, config=None)[source]

Bases: Connection

Description:

Sample Config

url‘’http’://{ip/localhost}:{port}’

URL of the source. Could be multi-URL. Each URL should be separated by ‘;’ to a node in ElasticSearch cluster.

username‘string’ , default = ‘ ‘

Used when authentication is required

password‘string’, default = ‘ ‘

Used when authentication is required

loader_per_urlint, default = 4

Number of parallel loaders per URL.

output_queue_max_sizeint, default = 10

Maximum queue size.

bulk_out_max_size? * ? * ?, default = 12 * 1024 * 1024

??

timeoutint, default = 300

Timout value.

fail_log_max_sizeint, default = 20

Maximum size of failed log messages.

precise_error_handlingbool, default = False

If True all Errors will be logged, If false soft errors will be omitted in the Logs.

ElasticSearchConnection.__init__()[source]

Description:

Parameters

appApplication

Name of the Application

idID, default= None

ID

configJSON or dict, default= None

configuration file with additional information for the methods.

ElasticSearch Connection Methods

ElasticSearchConnection.get_url()[source]
Returns

list of URLS of nodes connected to the cluster

ElasticSearchConnection.get_session()[source]

Returns current Client Session Authentication and Loop

Returns

aiohttp.ClientSession(auth=self._auth, loop=self.Loop)

ElasticSearchConnection.consume(index, data_feeder_generator, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>)[source]

Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method.

Parameters

index :

data_feeder_generator :

bulk_class=ElasticSearchBulk :

creates a instance of the ElasticSearchBulk class

ElasticSearchConnection.flush(forced=False)[source]

It goes through the list of bulks and calls enqueue for each of them.

Parameters

forced : bool, default = False

ElasticSearchConnection.enqueue(bulk)[source]

Properly enqueue the bulk.

Parameters

bulk :

Elastic Search Bulk

class ElasticSearchBulk(connection, index, max_size)[source]

Bases: object

Description:

ElasticSearchBulk.__init__()[source]

Initializes the variables

Parameters

connectionConnection

Name of the Connection.

indexstr

???

max_sizeint

Maximal size of bulks.

Elastic Search Bulk Methods

ElasticSearchBulk.consume(data_feeder_generator)[source]

Appends all items in data_feeder_generator to Items list. Consumer also resets Aging and Capacity.

Parameters

data_feeder_generatorlist

list of our data that will be passed to a generator and later Uploaded to ElasticSearch.

Returns

self.Capacity <= 0

async ElasticSearchBulk.upload(url, session, timeout)[source]

Uploads data to Elastic Search.

Parameters

urlstring

Uses URL from config to connect to ElasticSearch Rest API.

session?

?

timeoutint

uses timout value from config. Value of time for how long we want to be connected to ElasticSearch.

Returns

?


ElasticSearchBulk.partial_error_callback(response_items)[source]

Description: When an upload to ElasticSearch fails for error items (document could not be inserted), this callback is called.

Parameters

response_items :

Parameters

response_items – list with dict items: {“index”: {“_id”: …, “error”: …}}

ElasticSearchBulk.full_error_callback(bulk_items, return_code)[source]

Description: When an upload to ElasticSearch fails b/c of ElasticSearch error, this callback is called.

Parameters

bulk_itemslist

list with tuple items: (_id, data)

return_code :

ElasticSearch return code

Returns

False if the bulk is to be resumbitted again


Lookup

class ElasticSearchLookup(app, connection, id=None, config=None, cache=None, lazy=False)[source]

Bases: MappingLookup, AsyncLookupMixin

The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits.

configs

index - Elastic’s index

key - field name to match

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

Example:

The ElasticSearchLookup can be then located and used inside a custom enricher:

        class AsyncEnricher(bspump.Generator):

                def __init__(self, app, pipeline, id=None, config=None):
                        super().__init__(app, pipeline, id, config)
                        svc = app.get_service("bspump.PumpService")
                        self.Lookup = svc.locate_lookup("MySQLLookup")

                async def generate(self, context, event, depth):
                        if 'user' not in event:
                                return None

                        info = await self.Lookup.get(event['user'])

                        # Inject a new event into a next depth of the pipeline
                        self.Pipeline.inject(context, event, depth)
ElasticSearchLookup.__init__()[source]

Description:

Parameters

appApplication

Name of the Application.

connectionConnection

Name of the Connection

idID, default= None

ID

configJSON, default= None

Configuration file with additional information.

cache : ?,default= None

lazy : ?, default= None

Lookup methods

async ElasticSearchLookup.get(key)[source]

Obtain the value from lookup asynchronously.

Parameters

key : ?

Returns

value


ElasticSearchLookup.build_find_one_query(key) → dict[source]

Override this method to build your own lookup query

Parameters

key : ?

Returns

Default single-key query


async ElasticSearchLookup.load()[source]

Sets the length of Cache to Count.

Returns

True


classmethod ElasticSearchLookup.construct(app, definition: dict)[source]

Constructs config, id, and connection based on config.

Parameters

appApplication

Name of the Application.

definition:dictDefinition

Definition containing information about certain variables.

Returns

cls(app, newid, connection, config)


Sink

class ElasticSearchSink(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]

Bases: Sink

ElasticSearchSink allows you to insert events into ElasticSearch through POST requests

The following attributes can be passed to the context and thus override the default behavior of the sink:

es_index (STRING): ElasticSearch index name

data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is:

def data_feeder_create_or_index(event):

_id = event.pop(“_id”, None)

if _id is None:

yield b’{“create”:{}}

‘
else:
yield orjson.dumps(

{“index”: {“_id”: _id}}, option=orjson.OPT_APPEND_NEWLINE

)

yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)


ElasticSearchSink.__init__()[source]

Description:

Parameters

appApplication

Name of the Application

pipelinePipeline

Name of the Pipeline

connectionConnection

Name of the Connection

idID, default= None

ID

configJSON, default= None

Configuration file with additional information.

bulk_class=ElasticBulk :

data_feeder=data_feeder_create_or_index :


Sink methods

ElasticSearchSink.process(context, event)[source]

Description:

Parameters

context :

eventany data type

Information with timestamp.

Data Feeder Methods

data_feeder.data_feeder_create_or_index()

Creates an index.

Parameters

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

data_feeder.data_feeder_create()

Creates a data feeder.

Parameters

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

data_feeder.data_feeder_index()

Description:

Parameters

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

data_feeder.data_feeder_update()

Updates data feeder.

Parameters

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

data_feeder.data_feeder_delete()

Deletes data feeder.

Parameters

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

Previous Next

© Copyright 2021, TeskaLabs. Revision 2ed02dec.

Built with Sphinx using a theme provided by Read the Docs.