How to connect to Elastic Search
BSPump supports the connection to Elastic Search platform. It is possible to connect to ES just in few lines of code.
Elastic Search Source
You can use Elastic Search Source to take data from Elastic Search index for further analysis over them (e.g. in your pump).
Prerequisites
You can access ElasticSearch only if you have ElasticSearch already installed on your server or you can try to install it locally with this tutorial Install ElasticSearch and Kibana via Docker.
The process of taking data from Elastic Search index is simple, you will need few things.
What you will need:
URL address of your Elastic Search
Index with data
Configuration file
Register the service of ESConnection
Configuration File
You will need to create .conf
file with this configuration
# ElasticSearch Source
[pipeline:<<Name of your pipeline class>>:ElasticSearchSource]
index=<<Name of your index>>
# Elasticsearch connection
[connection:ESConnection]
url=<<Your ElasticSearch URL address>>
The configuration file can contain additional information depending on your implementation. It is essential to include: - index : name of the index that will be used to get data from - url : URL of your connection with ES
For more information visit our quickstart to using configs: Configuration Quickstart.
Code example
To create a connection with Elastic Search you will need to do two things:
Add ElasticSearchSource component to self.build method of the pipeline class
Add trigger which take data from index every defined time
create a service of your ES Connection.
You can implement your own ElasticSearch connection but the default connection will look like this:
import bspump.elasticsearch
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
# Adding ES Source component with trigger set up to trigger data every 5 seconds
bspump.elasticsearch.ElasticSearchSource(app, self, "ESConnection").on(bspump.trigger.PeriodicTrigger(app, 5)),
# Rest of the pipeline with source and processors
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Part where you add the ESConnection service
es_connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
svc.add_connection(es_connection)
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
It is important to include “ESConnection” as a parameter in ElasticSearch connection and source methods.
Elastic Search Sink
You can use Elastic Search sink to store data for further analysis or visualizations using Kibana.
Prerequisites
The process to create ES sink is simple and intuitive but you will need few things to start with.
What you will need:
URL address for connection with Elastic Search
Configuration file
Register the service of ESConnection
Configuration File
you will need to create .conf
file using following syntax
# Elasticsearch connection
[connection:ESConnection]
url=<<YOUR CONNECTION URL>>
# Elasticsearch sink
[pipeline:<<Name of your pipeline class>>:ElasticSearchSink]
index=<<name of your index>>
doctype=_doc
The configuration file can contain additional information depending on your implementation. It is essential to include:
index : name of the index that will be used to store your data in ES
url : URL of your connection with ES
doctype : type of the document, default is _doc
For more information visit our quickstart to using configs: Configuration Quickstart.
Code example
To create a connection with Elastic Search you will need to do two things:
Add ElasticSearchSink component to self.build method of the pipeline class
create a service of your ES Connection.
You can implement your own ElasticSearch connection but the default connection will look like this:
import bspump.elasticsearch
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
#Rest of the pipeline with source and processors
#Adding ES Sink component
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection"),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
#Part where you add the ESConnection service
es_connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
svc.add_connection(es_connection)
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
app.run()
It is important to include “ESConnection” as a parameter in ElasticSearch connection and sink methods.