BSPump
Introduction
BitSwan is a product designed to real-time data processing. By means of so-called real-time processors BitSwan is able to analyze hundreds of data streams from a lot of various sources at the same time, which makes it suitable to detect anomalies and data patterns as well as other situations when instantaneous action is needed. BitSwan is based on Python language.
How to install BitSwan
Use command in your command prompt
pip install bspump
or you can clone the github repository BitsSwanPump
pip install git+https://github.com/LibertyAces/BitSwanPump.git
How it works
heeelp
Pipeline
Pipeline
is responsible for data processing in BSPump.
Individual Pipeline
objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline).
Each Pipeline
is usually in charge of one concrete task.
Pipeline has three main components:

Source connects different data sources with the Pipeline
to be processed
Multiple sources
A Pipeline
can have multiple sources.
They are simply passed as a list of sources to a Pipeline
build() method.
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MySource1(app, self),
MySource2(app, self),
MySource3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
The main component of the BSPump architecture is a so-called Processor
.
This object modifies, transforms and enriches events.
Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behaviour patterns.
Processors
differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects.
As regards working with data events, each Pipeline
has its unique task.
Processors
are passed as a list of Processors
to a Pipeline
build() method
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MyProcessor1(app, self),
MyProcessor2(app, self),
MyProcessor3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump
Source
Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.
Each source represent a coroutine/Future/Task that is running in the context of the main loop.
The coroutine method main()
contains an implementation of each particular source.
Source MUST await a Pipeline
ready state prior producing the event.
It is acomplished by await self.Pipeline.ready() call.
Trigger Source
This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc.
You need to provide a trigger class and implement cycle()
method.
Trigger source will stop execution, when a Pipeline
is cancelled (raises concurrent.futures.CancelledError).
This typically happens when a program wants to quit in reaction to a on the signal.
You also may overload the main()
method to provide additional parameters for a cycle()
method.
async def main(self):
async with aiohttp.ClientSession(loop=self.Loop) as session:
await super().main(session)
async def cycle(self, session):
session.get(...)
Processor
The main component of the BSPump architecture is a so called processor
.
This object modifies, transforms and enriches events.
Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.
Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.
Bitswan Tutorials
Bitswan Tutorials
in this series of tutorials we will walk you through basic and more advanced examples and demos to initiate your adventure with BSPump.
You will learn more about the BSPump architecture and how each component works. However, before you can start on your journey you should know basics of python and be able to set up your programming environment.
Prerequisites
Here are some quick tutorials that will help you installing python and BSPump module using package installer for Python called pip.
Installing python
Firstly you should check whether you don’t already have python installed. Open your command line or terminal and type:
C:/> python --version
> Python 3.8.4
if your python version is lower than 3.8 check Python.org
If you are a complete beginner to python or you want more information about python check out the Python tutorial
Installing BSPump module
To install BSPump module:
pip install asab bspump
or alternatively using
pip install git+https://github.com/LibertyAces/BitSwanPump-BlankApp.git
If you dont have installed pip type:
python get-pip.py
To check the version use.
pip --version
Have you managed to install everything? Then you are ready for creating your first BSPump.
BSPump Highlevel architecture
BSPump is made from several components which are going to be explained in this tutorial. As you probably know, Bitswan is a real-time stream processor. To be able to process and work with large amount of data, BSpump uses so-called Event Stream Processing, data is propagated through a data pipeline in Events. Event is a single data point with a timestamp. To handle these events Pipeline has special components that be compatible with each other. .Therefore, each pipeline is made from several vital compoents: source, processor and sink. However, for the pipeline to work Bitswan uses BSPump Service to handle and register connetions, pipelines etc.

Firstly, we will walk you through each of components and its functionality, so you can later build your own pipeline. Doesn’t that sounds cool?
BSpump Service
Service is part where pipelines and connections are registered.
We will go through the following code and explain each part
import asab
from .pipeline import TCPPipeline
class BlankService(asab.Service):
def __init__(self, app, service_name="blank.BlankService"):
super().__init__(app, service_name)
async def initialize(self, app):
svc = app.get_service("bspump.PumpService")
# Create and register all connections here
# Create and register all matrices here
# Create and register all lookups here
# Create and register all pipelines here
self.TCPPipeline = TCPPipeline(app, "TCPPipeline")
svc.add_pipeline(self.TCPPipeline)
await svc.initialize(app)
async def get_data(self, message="be"):
await self.TCPPipeline.process(message)
return "Check stdout"
In this example we
Connection
To be able to connect to a data source you have to make a connection. connection is usually done in Source class and then registered in service class.
Pipeline
pipeline
import sys
import bspump
import bspump.common
import bspump.socket
from .processor import ShakespeareanEnricher
class TCPPipeline(bspump.Pipeline):
"""
To test this pipeline, use:
socat STDIO TCP:127.0.0.1:8888
or visit http://localhost:8080/blank?message=die
"""
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.socket.TCPSource(app, self, config={"host": "0.0.0.0", "port": 8888}),
ShakespeareanEnricher(app, self),
bspump.common.PPrintSink(app, self, stream=sys.stderr)
)
Lookup
Source
Description about source. What is it ..
Streaming Source
Streaming Source enables events to enter in so-called stream. Events flow through source in real time manner as they are being delivered by the input technology.
Following technologies can be used as a streaming source
Kafka
Elastic Search
RabbitMQ
Elastic Search Source
TODO
Description
Example
Explanation
Kafka Source
TODO
Description
Example
Explanation
Trigger Source
Unlike streaming source, Trigger Source is used when we need to pump data from SQL-like databases or files. They have to be triggered by an external event or a repeating timer (requesting JSON data from APIs every 10 minutes).
Trigger Source can be used for:
HTTP client/server
SQL query
TCP
Files: csv, json etc.
TCP source
Description
TCP Source can be to obtain data from peer to peer connection using TCP.
Use case
TODO
Example
class EchoPipeline(bspump.Pipeline):
'''
To test this pipeline, use:
socat STDIO TCP:127.0.0.1:8083
'''
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.ipc.StreamServerSource(app, self, config={'address': '0.0.0.0 8083'}),
)
HTTP Client Source
Description
HTTP Client Source gets data from a specified API URL.
Use case
if you need pump data from a single API URL you can use this Source.
Example
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': '<<API URL>>'
}).on(<<Here you will use some type of trigger>>),
)
The API URL can be any API you wish to get data from.
You will need to specify your Trigger type. You can choose your Trigger here : TODO <<reference>>
Note
Full functional example with this source can be found here coindesk
MySQL
Description
Example
Explanation
JSON File
Description
Example
Explanation
CSV File
Description
Example
Explanation
Processor
Processor
import bspump
class ShakespeareanEnricher(bspump.Processor):
def process(self, context, event):
if isinstance(event, bytes):
event = event.decode("utf-8").replace('\r', '').replace('\n', '')
return 'To {0}, or not to {0}?'.format(event)
Sink
Sink is the part responsible for the output of the data to a database, standard output in your computer on into another pipeline.
PPrintSink
In this example we are going to use PPrintSink which prints the data from pipeline to stdout or any other stream that is connected to the pipeline.
To use sink in your pipeline
self.build(
bspump.common.PPrintSink(app, self, stream=sys.stderr)
)
PPrintSink class is added to your pipeline. It should be the last part of the pipeline for the pipeline to work correctly.
to further explain the , bspump.common. is the part where you specify the path to the class PPrintSink is the name of the class. In the parentheses you can specify the output stream. If none is specified stdout is used.
code
class PPrintSink(Sink):
"""
Description:
|
"""
def __init__(self, app, pipeline, id=None, config=None, stream=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id, config)
self.Stream = stream if stream is not None else sys.stdout
The whole code can be found at BitSwan BlankApp
Coindesk API Example
About
In this example we will learn how to extract any data from API. We will be using a HTTP Client Source for the API request.
In this example we will be using API from Coindesk to get the current price of Bitcoin.
The final pipeline will simply get data from the API request as a JSON, covert it to python dictionary, and output the data to Command Prompt. Additionally, I will show you how to create your own Processor to enrich the data.
The following code can be found here in our GitHub repo.
A diagram of the final pipeline.

Source and Sink
In the code below, you can see the basic structure of a pipeline. The important part is the self.build()
method, where its
parameters are the single components of the pipeline. In this part we will use two main components each pipeline must contain:
Source and Sink. Do not copy this part of code yet, because it is not example on its own
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
#Source of the pipeline
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
#Sink of the pipeline
bspump.common.PPrintSink(app, self),
)
Source is a component that supplies the pipeline with data. In our example we will use a specific type of Source. Because we need to Pump data from API, we need to send a request to the API to receive our data. This means that our Source has to regularly and send the request using API. For this reason we will be using so-called Trigger Source. More about Trigger Source .
HTTP Client Source can have many configurations, but in our example we just need to specify our URL address, using
config={'url': '<OUR URL>'}
as parameter in HTTP Client Source.
Because we are using Trigger Source, we need to specify which Trigger we will be using. There are many types of
Triggers, but in our example we will be using PeriodicTrigger, which triggers in time intervals specified in the
parameter. bspump.trigger.PeriodicTrigger(app, <<Time parameter in seconds>>))
Each pipeline has to have Sink. In our example we want to see the result of the data, so we will be using PPrintSink, which simply prints the data to the Command Prompt.
You can try to copy paste this chunk of code and try it yourself. Make use you have BSPump module installed for your Python, if not you can follow our guide Installing BSPump module .
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.PPrintSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
The program should output a JSON similar to this
(b'{"time":{"updated":"Jan 31, 2022 15:47:00 UTC","updatedISO":"2022-01-31T15:4'
b'7:00+00:00","updateduk":"Jan 31, 2022 at 15:47 GMT"},"disclaimer":"This data'
b' was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency '
b'data converted using hourly conversion rate from openexchangerates.org","cha'
b'rtName":"Bitcoin","bpi":{"USD":{"code":"USD","symbol":"$","rate":"37,789'
b'.6250","description":"United States Dollar","rate_float":37789.625},"GBP":{"'
b'code":"GBP","symbol":"£","rate":"28,145.2970","description":"British P'
b'ound Sterling","rate_float":28145.297},"EUR":{"code":"EUR","symbol":"€"'
b',"rate":"33,772.9280","description":"Euro","rate_float":33772.928}}}')
As you can see this is not ideal format to read our data from. We will need to convert our incoming data.
Your First Processor
After we have a functional pipeline, we can start with the more interesting part, Processors. The Processor is the component which works with data of an event. In this example we will use a simple Processor, StdJsonToDictParser, which only converts the incoming JSON to python Dict type, that is much easier to work with in python.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.StdJsonToDictParser(app, self),
bspump.common.PPrintSink(app, self),
)
this Processor is added simply by adding it to self.build()
between Source and Sink.
You should be getting more organized output
{'bpi': {'EUR': {'code': 'EUR',
'description': 'Euro',
'rate': '33,794.5989',
'rate_float': 33794.5989,
'symbol': '€'},
'GBP': {'code': 'GBP',
'description': 'British Pound Sterling',
'rate': '28,163.3569',
'rate_float': 28163.3569,
'symbol': '£'},
'USD': {'code': 'USD',
'description': 'United States Dollar',
'rate': '37,813.8733',
'rate_float': 37813.8733,
'symbol': '$'}},
'chartName': 'Bitcoin',
'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index '
'(USD). Non-USD currency data converted using hourly conversion '
'rate from openexchangerates.org',
'time': {'updated': 'Jan 31, 2022 15:49:00 UTC',
'updatedISO': '2022-01-31T15:49:00+00:00',
'updateduk': 'Jan 31, 2022 at 15:49 GMT'}}
Creating Custom Processor
Because a most of your use cases will be unique, it is most likely that there will be no existing Processor that could do the work. Consequently, you will have to implement your own Processor.
Creating new Processor is not a complicated task. You will need to follow the basic structure of an general Processor. You can simply copy-paste the code below:
class EnrichProcessor(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def process(self, context, event):
return event
This a sample processor class. The most important part of this processor class is the process method. This method will be called when an event is passed to the Processor. As you can see, the default implementation of process method returns the event return event. Event must be passed to the following component, another Processor or Sink.
If you wish to use your new Processor in our case EnrichProcessor You will need to reference it in self.build method. You can do that simply by adding it to self.build parameters.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.StdJsonToDictParser(app, self),
EnrichProcessor(app, self),
bspump.common.PPrintSink(app, self),
)
The last step is implementation. For our example, I created a simple script that takes the incoming event (python dictionary that contains price of Bitcoin in USD, Euro, and Pounds) and adds a new branch with a Japanese yen. The EnrichProcessor class has a new method convertUSDtoJPY which calculates the price of yen based on USD conversion rate (Note: The exchange rate is outdated for sake of simplicity of this example).
class EnrichProcessor(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def convertUSDtoJPY(self, usd):
return usd * 113.70 #outdated rate usd/jpy
def process(self, context, event):
jpyPrice = str(self.convertUSDtoJPY(event["bpi"]["USD"]["rate_float"]))
event["bpi"]["JPY"] = {
"code": "JPY",
"symbol": "¥",
"rate": ''.join((jpyPrice[:3], ',', jpyPrice[3:])),
"description": "JPY",
"rate_float": jpyPrice
}
return event
When we add all parts together we get this functional code.
Your ouput should look something like this:
{'bpi': {'EUR': {'code': 'EUR',
'description': 'Euro',
'rate': '33,796.7930',
'rate_float': 33796.793,
'symbol': '€'},
'GBP': {'code': 'GBP',
'description': 'British Pound Sterling',
'rate': '28,165.1854',
'rate_float': 28165.1854,
'symbol': '£'},
'JPY': {'code': 'JPY',
'description': 'JPY',
'rate': '429,9716.52771',
'rate_float': '4299716.52771',
'symbol': '¥'},
'USD': {'code': 'USD',
'description': 'United States Dollar',
'rate': '37,816.3283',
'rate_float': 37816.3283,
'symbol': '$'}},
'chartName': 'Bitcoin',
'disclaimer': 'This data was produced from the CoinDesk Bitcoin Price Index '
'(USD). Non-USD currency data converted using hourly conversion '
'rate from openexchangerates.org',
'time': {'updated': 'Jan 31, 2022 15:53:00 UTC',
'updatedISO': '2022-01-31T15:53:00+00:00',
'updateduk': 'Jan 31, 2022 at 15:53 GMT'}}
To Summarize what we did in this example:
we created a sample pipeline with a Source and Sink
we added a new Processor that converts incoming events to python dictionary
we created a custom Processor that adds a information about Japanese currency to the incoming event and passes it to Sink .
Next steps
You can change and modify the pipeline in any manner you want. For example, instead of using PPrintSink you can use our Elasticsearch Sink which loads the data to Elasticsearch. Read more about How to connect to Elastic Search .
Weather API Example
About
In this example we will learn how get data from one or multiple HTTP sources using an API request. In this case we cannot use basic HTTPClientSource, because it returns data only from one API query, so to get data from different queries we will have to define a new source for this use case.
The final pipeline will get data from multiple API requests in one time as a JSON, convert it to python dictionary, and output the data to Command Prompt.
In this example we will be using API from Open Weather to get current weather data (e.g, temperature, feels like temperature, biometric pressure etc).
In this example we will use .conf
file to store configuration for our pump. More about how to write configuration is
here Configuration Quickstart.
A diagram of the finished pipeline

Pipeline
In the code below you can see the structure of SamplePipeline
which we need for this use case. The important part is the
self.build()
method where its parameters are the single components of the pipeline. Do not forget that every pipeline
requires both source and sink to function correctly.
Source is a component that supply the pipeline with data. In our example we will use a specific type of source. Because we need to Pump data from API, we need to send request to the API to receive our data. This means that our source has to be “trigger” the request and send it to the API. For this reason we will be using a so-called trigger source. More about Trigger Source.
Because we are using Trigger Source. We need to specify which trigger we will be using. There are more types of triggers,
but in our example we will be using PeriodicTrigger, which triggers in time intervals specified in the parameter.
bspump.trigger.PeriodicTrigger(app, <<Time parameter in seconds>>))
Each pipeline requires a sink. In our example we want to see the result of the data, so we will be using PPrintSink which simply prints the data to the Command Prompt.
You can try to copy-paste this chunk of code and try it yourself. You must have BSPump module installed. Follow our guide Installing BSPump module.
Simply rewrite <<LOCATION>>
to city you want to obtain data from and put your API key which you will get after you
register on https://openweathermap.org/ to <<YOUR PRIVATE API KEY>>
section.
You can find more about how to modify your URL here `https://openweathermap.org/current`_
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.openweathermap.org/data/2.5/weather?q=<<LOCATION>>&units=metric&appid=<<YOUR PRIVATE API KEY>>'
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.PPrintSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
You should get output like this:
~python3 example.py
BitSwan BSPump version 21.11-17-g6b346fd
27-Jan-2022 18:43:00.177421 NOTICE asab.application is ready.
1 pipeline(s) ready.
(b'{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":802,"main":"Clouds",'
b'"description":"scattered clouds","icon":"03n"}],"base":"stations","main":{"t'
b'emp":8.91,"feels_like":6.86,"temp_min":6.8,"temp_max":10.14,"pressure":1030,'
b'"humidity":71},"visibility":10000,"wind":{"speed":3.6,"deg":290},"clouds":{"'
b'all":35},"dt":1643304840,"sys":{"type":2,"id":2019646,"country":"GB","sunris'
b'e":1643269577,"sunset":1643301595},"timezone":0,"id":2643743,"name":"London"'
b',"cod":200}')
Multiple locations source
In the code above, the pump simply returns data from one location. But in our use case we need to get data from multiple locations, which means we need to get data from multiple API’s URL. Next, we define our specific trigger source.
We use ClientSession from aiohttp library to create session where get data from GET method as response for every city in our list. Then we store the data from response to event variable and process the event to pipeline. More about aiohttp session can be found here
class LoadSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.cities = ['London','New York','Berlin'] #List of cities
async def cycle(self):
async with aiohttp.ClientSession() as session:
#goes through the list of cities and requests from API for each city
for city in self.cities:
async with session.get(url=self.Config['url'].format(city=city, api_key=self.Config['api_key'])) as response:
event = await response.content.read()
await self.process(event)
You can see that in this example we are using self.Config
method to get the API key and the url from the configuration file. It is
good to have the API key and the url in configuration file, because changes can be made simply in the configuration file.
For example, create a weather-pump.conf
file, and into that file you can copy/paste code below
[pipeline:SamplePipeline:LoadSource]
url = https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={api_key}
api_key = <<YOUR PRIVATE API KEY>>
When you run your pump with configuration file you have to run it with -c
switch. So after you finish your pump and
you need to test it, type python3 your-pump-name.py -c weather-pump.conf
to the terminal.
You can change the list of cities to any locations you wish. The important part of this source is async def cycle(self)
method where we request the API’s url for every location from our list and process them in the pipeline.
Just be sure that you import aiohttp
package and change HTTPClientSource
with our new specified LoadSource
.
You can copy-paste the final code here:
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
import aiohttp
class LoadSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.cities = ['London','New York','Berlin'] #List of cities
async def cycle(self):
async with aiohttp.ClientSession() as session:
#goes through the list of cities and requests from API for each city
for city in self.cities:
async with session.get(url=self.Config['url'].format(city=city, api_key=self.Config['api_key'])) as response:
event = await response.content.read()
await self.process(event)
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
LoadSource(app, self).on(
bspump.trigger.PeriodicTrigger(app, 5)
),
bspump.common.PPrintSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
After you execute this code you should get this output in terminal:
~ python3 example.py -c example.conf
BitSwan BSPump version 21.11-17-g6b346fd
27-Jan-2022 18:56:14.058308 NOTICE asab.application is ready.
1 pipeline(s) ready.
(b'{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":802,"main":"Clouds",'
b'"description":"scattered clouds","icon":"03n"}],"base":"stations","main":{"t'
b'emp":8.79,"feels_like":6.72,"temp_min":6.8,"temp_max":10.14,"pressure":1030,'
b'"humidity":70},"visibility":10000,"wind":{"speed":3.6,"deg":290},"clouds":{"'
b'all":35},"dt":1643305383,"sys":{"type":2,"id":2019646,"country":"GB","sunris'
b'e":1643269577,"sunset":1643301595},"timezone":0,"id":2643743,"name":"London"'
b',"cod":200}')
(b'{"coord":{"lon":-74.006,"lat":40.7143},"weather":[{"id":801,"main":"Clouds",'
b'"description":"few clouds","icon":"02d"}],"base":"stations","main":{"temp":-'
b'1.13,"feels_like":-1.13,"temp_min":-3.36,"temp_max":0.9,"pressure":1030,"hum'
b'idity":51},"visibility":10000,"wind":{"speed":0.45,"deg":34,"gust":1.34},"cl'
b'ouds":{"all":19},"dt":1643305980,"sys":{"type":2,"id":2039034,"country":"US"'
b',"sunrise":1643285428,"sunset":1643321212},"timezone":-18000,"id":5128581,"n'
b'ame":"New York","cod":200}')
(b'{"coord":{"lon":13.4105,"lat":52.5244},"weather":[{"id":803,"main":"Clouds",'
b'"description":"broken clouds","icon":"04n"}],"base":"stations","main":{"temp'
b'":6.01,"feels_like":1.09,"temp_min":5.01,"temp_max":6.85,"pressure":1003,"hu'
b'midity":91},"visibility":10000,"wind":{"speed":9.39,"deg":251,"gust":15.2},"'
b'clouds":{"all":75},"dt":1643305512,"sys":{"type":2,"id":2011538,"country":"D'
b'E","sunrise":1643266558,"sunset":1643298116},"timezone":3600,"id":2950159,"n'
b'ame":"Berlin","cod":200}')
Connect to ES
You can change and modify the pipeline in any manner you want. For example, instead of using PPrintSink you can use our Elasticsearch Sink which loads the data to Elasticsearch. If you want to read more about How to connect to Elastic Search.
Configuration Quickstart
In this tutorial you will learn about configuration in BSPump and how to use it.
What is configuration?
Every BitSwan object inside BSPump application can be configured using user-defined configuration options.
It’s good practice to write configuration in .conf
files, because making changes will be much easier.
Every object has default configuration values set in ConfigDefaults
. If you set ConfigDefaults
in your specific
class you override same values in ConfigDefaults
, which are inherited from the parent class.
Configuration .conf
files in are built-in in ASAB, the platform on which BSPump is built. You can find more
about it in ASAB documentation
There are 3 methods to configure object
1. By defining ConfigDefaults
dictionary inside specific class
class MySQLSource(TriggerSource):
ConfigDefaults = {
'query': 'SELECT id, name, surname FROM people;'
}
2.Using config
parameter in the object’s constructor
bspump.mysql.MySQLSource(app, self, "MySQLConnection1", config={'query': 'SELECT id, name, surname FROM people;'})
3. By creating .conf
file
[pipeline:PipelineID]
query = SELECT id, name, surname FROM people;
Example
This example shows how to create a configuration file to get data from API via basic HTTPClientSource.
In first step we create .conf file where we store API key
[pipeline:SamplePipeline]
url = https://api.openweathermap.org/data/2.5/weather?q=London&units=metric&appid={api_key}
api_key = <YOUR PRIVATE API KEY>
[pipeline:SamplePipeline]
in this line we specify which class the configuration applies to.
Values below this line override the same values in ConfigDefaults
of specified classes.
Configuration in .conf file is accessible via self.Config method (in this case we use self.Config['api_key']
to get
API key from our .conf
file)
In next step we have a sample pipeline that gets data through https://openweathermap.org/ API using API’s URL and API key from .conf file. See more in coindesk.
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self,
config={'url': self.Config['url'].format(api_key = self.Config['api_key'])}).on(bspump.trigger.PeriodicTrigger(app, 2)),
bspump.common.StdJsonToDictParser(app, self),
bspump.common.PPrintSink(app, self)
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
Running your pump with configuration files
When you want to run your pump with configuration file there are two ways to do that.
In terminal
To run your pump with a configuration file, use -c
switch in the terminal, after that switch there has to be file_path/file_name.conf
.
For example when you have configuration file in same folder
~python3 mypumptest.py -c mypumpconfiguration.conf
In your IDE
To run your pump in IDE you have to set the run parameters. For example in PyCharm you have to go to Run -> Edit Configurations…
and then change the run parameters to -c file_path/nameOfYourConfig.conf

How to connect to Elastic Search
BSPump supports the connection to Elastic Search platform. It is possible to connect to ES just in few lines of code.
Elastic Search Source
You can use Elastic Search Source to take data from Elastic Search index for further analysis over them (e.g. in your pump).
Prerequisites
You can access ElasticSearch only if you have ElasticSearch already installed on your server or you can try to install it locally with this tutorial Install ElasticSearch and Kibana via Docker.
The process of taking data from Elastic Search index is simple, you will need few things.
What you will need:
URL address of your Elastic Search
Index with data
Configuration file
Register the service of ESConnection
Configuration File
You will need to create .conf
file with this configuration
# ElasticSearch Source
[pipeline:<<Name of your pipeline class>>:ElasticSearchSource]
index=<<Name of your index>>
# Elasticsearch connection
[connection:ESConnection]
url=<<Your ElasticSearch URL address>>
The configuration file can contain additional information depending on your implementation. It is essential to include: - index : name of the index that will be used to get data from - url : URL of your connection with ES
For more information visit our quickstart to using configs: Configuration Quickstart.
Code example
To create a connection with Elastic Search you will need to do two things:
Add ElasticSearchSource component to self.build method of the pipeline class
Add trigger which take data from index every defined time
create a service of your ES Connection.
You can implement your own ElasticSearch connection but the default connection will look like this:
import bspump.elasticsearch
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
# Adding ES Source component with trigger set up to trigger data every 5 seconds
bspump.elasticsearch.ElasticSearchSource(app, self, "ESConnection").on(bspump.trigger.PeriodicTrigger(app, 5)),
# Rest of the pipeline with source and processors
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Part where you add the ESConnection service
es_connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
svc.add_connection(es_connection)
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
It is important to include “ESConnection” as a parameter in ElasticSearch connection and source methods.
Elastic Search Sink
You can use Elastic Search sink to store data for further analysis or visualizations using Kibana.
Prerequisites
The process to create ES sink is simple and intuitive but you will need few things to start with.
What you will need:
URL address for connection with Elastic Search
Configuration file
Register the service of ESConnection
Configuration File
you will need to create .conf
file using following syntax
# Elasticsearch connection
[connection:ESConnection]
url=<<YOUR CONNECTION URL>>
# Elasticsearch sink
[pipeline:<<Name of your pipeline class>>:ElasticSearchSink]
index=<<name of your index>>
doctype=_doc
The configuration file can contain additional information depending on your implementation. It is essential to include:
index : name of the index that will be used to store your data in ES
url : URL of your connection with ES
doctype : type of the document, default is _doc
For more information visit our quickstart to using configs: Configuration Quickstart.
Code example
To create a connection with Elastic Search you will need to do two things:
Add ElasticSearchSink component to self.build method of the pipeline class
create a service of your ES Connection.
You can implement your own ElasticSearch connection but the default connection will look like this:
import bspump.elasticsearch
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
#Rest of the pipeline with source and processors
#Adding ES Sink component
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection"),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
#Part where you add the ESConnection service
es_connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
svc.add_connection(es_connection)
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
app.run()
It is important to include “ESConnection” as a parameter in ElasticSearch connection and sink methods.
Escape From Tarkov Craft Profit Counter
About
Pipeline in this example is inspired by game Escape from Tarkov. It is a realistic FPS game. Beside shooting enemies, Players can earn and sell items in a game market which is driven by players themselves. The price of each item is changing in real-time based on Demand-supply mechanics. Another important game mechanic is that players can create the items themself in their specific stations. Items created and required for each crafts can be bought on the market, so players can earn in-game money by producing the items. Because price of each item is not stable, some crafts are more profitable than others. My idea was to take data from an API source that gives information of all available crafts players can do together with price of each item. I will use this data to sort and analyze the data and output them in form that might help players know which crafts is more profitable and suitable.
In this example I will show you a process of creating pipeline with a bit more complicated use. You will learn about creating a source that enables us to use query in our API requests.
Source
First we have to create our source to pump the data to the pipeline. We will be using aiohttp library for our custom source. We will start by creating our source class. As you can see in the code below.
class IOHTTPSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
async def cycle(self):
async with aiohttp.ClientSession() as session:
async with session.post('https://tarkov-tools.com/graphql', json={'query': query}) as response:
if response.status == 200:
event = await response.json()
else:
raise Exception("Query failed to run by returning code of {}. {}".format(response.status, query))
await self.process(event)
As you can see in the cycle method. We are using asynchronous functions for the API requests. As you can see in the code I am creating Session which is used in aiohttp for more information check AIOHTTP Documentation. I am using post method with a query parameter as seen below.
query = """
query {
crafts {
source
duration
rewardItems {
quantity
item {
shortName
lastLowPrice
}
}
requiredItems {
quantity
item {
shortName
lastLowPrice
}
}
}
}
"""
I created this query using playground interface made by the API authors. Here is the link if you would like to use this API.
Now you can try to copy-paste the code below and try it for yourself.
#!/usr/bin/env python3
import aiohttp
import bspump
import bspump.common
import bspump.http
import bspump.trigger
import pandas as pd
import bspump.file
query = """
query {
crafts {
source
duration
rewardItems {
quantity
item {
shortName
lastLowPrice
}
}
requiredItems {
quantity
item {
shortName
lastLowPrice
}
}
}
}
"""
class IOHTTPSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
async def cycle(self):
async with aiohttp.ClientSession() as session:
async with session.post('https://tarkov-tools.com/graphql', json={'query': query}) as response:
if response.status == 200:
event = await response.json()
else:
raise Exception("Query failed to run by returning code of {}. {}".format(response.status, query))
await self.process(event)
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.PPrintProcessor(app,self),
bspump.common.NullSink(app, self),
)
If everything works correctly, you should be getting similar output.
'source': 'Workbench level 3'},
{'duration': 60000,
'requiredItems': [{'item': {'lastLowPrice': 39000,
'shortName': 'Eagle'},
'quantity': 2},
{'item': {'lastLowPrice': 15000,
'shortName': 'Kite'},
'quantity': 2}],
'rewardItems': [{'item': {'lastLowPrice': None,
'shortName': 'BP'},
'quantity': 120}],
'source': 'Workbench level 3'},
{'duration': 61270,
'requiredItems': [{'item': {'lastLowPrice': 15000,
'shortName': 'Kite'},
'quantity': 2},
{'item': {'lastLowPrice': 39000,
'shortName': 'Eagle'},
'quantity': 2},
{'item': {'lastLowPrice': 31111,
'shortName': 'Hawk'},
'quantity': 2}],
'rewardItems': [{'item': {'lastLowPrice': None,
'shortName': 'PPBS'},
'quantity': 150}],
.
.
.
There are probably hundreds of JSON lines in your console right now. It is not a nice way to output your data right? Let’s implement our filter processor then.
Filter Processor
This filter processor is used for very specific use-case in this example. The goal as you can remember was to filter incoming data. The goal is to create a dataframe that contains data where each row has information about station in which the craft is created, duration of the craft ,price of items needed to perform the craft, name and price of item/s that we obtain by the craft, profit of the craft, and profit per hour. As you can see there is a lot of indexes we have to create.
class FilterByStation(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def process(self, context, event):
my_columns = ['station', 'name', 'output_price_item', 'duration', 'input_price_item', 'profit', 'profit_per_hour']
df = pd.DataFrame(columns=my_columns)
for item in event["data"]["crafts"]:
duration = round((item["duration"])/60/60, ndigits=3)
reward = item["rewardItems"][0]
name_output = reward["item"]["shortName"]
quantity = reward["quantity"]
output_item_price = reward["item"]["lastLowPrice"]
if output_item_price is None: # checks for NULL values
output_item_price = 0
output_price_item = quantity * int(output_item_price)
station_name = item["source"]
profit = 0
profit_p_hour = 0
input_price_item = 0
for item2 in range(len(item["requiredItems"])):
required_item = item["requiredItems"][item2]
quantity_i = required_item["quantity"]
input_item = required_item["item"]["lastLowPrice"]
if input_item is None:
input_item = 0
price_of_input_item = input_item * quantity_i
input_price_item = input_price_item + price_of_input_item
profit = output_price_item - input_price_item
profit_p_hour = round(profit / duration, ndigits=3)
df = df.append(
pd.Series([station_name,
name_output,
output_price_item,
duration,
input_price_item,
profit,
profit_p_hour],
index=my_columns), ignore_index=True)
event = df
return event
You can copy-paste the code above and everything should work just fine. Don’t forget to reference the processor in the self.build() method.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
FilterByStation(app, self),
bspump.common.PPrintProcessor(app, self),
bspump.common.NullSink(app, self),
)
If you want more detail of what it does. It firstly goes through the whole json, then it gets data for each of the index if possible (otherwise zero is used instead of null), and appends the record as a row in our dataframe. I am using Pandas in this example. If you are not familiar with Pandas make sure you checked their Documentation
Now output in your console should like like this:
station name output_price_item duration input_price_item profit profit_per_hour
0 Booze generator level 1 Moonshine 286999 3.056 236998 50001 16361.584
1 Intelligence Center level 2 Flash drive 180000 34.222 151498 28502 832.856
2 Intelligence Center level 2 Virtex 88000 37.611 210993 -122993 -3270.134
3 Intelligence Center level 2 SG-C10 130000 38.889 206978 -76978 -1979.429
4 Intelligence Center level 2 RFIDR 215000 53.333 40000 175000 3281.271
.. ... ... ... ... ... ... ...
128 Workbench level 3 PBP 0 11.972 265888 -265888 -22209.155
129 Workbench level 3 M995 0 15.994 211000 -211000 -13192.447
130 Workbench level 3 M61 0 16.644 233331 -233331 -14018.926
131 Workbench level 3 BP 0 16.667 108000 -108000 -6479.870
132 Workbench level 3 PPBS 0 17.019 170222 -170222 -10001.880
[133 rows x 7 columns]
We can agree that this looks much more better than raw JSON, but this is not the end we still need to send the data somewhere for out bot
Dataframe to csv Processor
To make the data available for our Discord bot, we will save them to a directory as a csv file. This processor is really simple as we call only one function from the Pandas library.
You can copy paste the code of the processor
class DataFrameToCSV(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def process(self, context, event):
event.to_csv('./Data/TarkovData.csv', index=False)
return event
Once again dont forget to include the processor in our self.build() method.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
FilterByStation(app, self),
bspump.common.PPrintProcessor(app, self),
DataFrameToCSV(app, self),
bspump.common.NullSink(app, self),
)
This wont change our output in console, but it should create a csv file in your current directory.
What next
Now we have a function pipeline. You can do anything with the output data. For example, I created a simple discord bot that sends a message with the updated data you can try to make your own discord bot using this tutorial: Getting Started with Discord Bots.

Fortnite Current Store Example
About
In this example we will get data from one HTTP course using an API request and use filtering processors on those datas
and export the data to .csv
file which can be used for example for Discord bot.
The final pipeline will get data form API request, filter some values from dataframe, does some calculation with values and then export it to CSV file.
We will be using API from Fortnite Tracker to get current Fortnite store items.
We will work with configuration files in this example. If you already doesn’t know how to work with configuration files try this quickstart Configuration Quickstart.
A diagram of the finished pipeline

First sample pipeline
In the code below you can see the structure of SamplePipeline
which we need for this use case. The important part is the
self.build()
method where its parameters are the single components of the pipeline. Do not forget that every pipeline
requires both source and sink to function correctly.
Source is a component that supply the pipeline with data. In our example we will use a specific type of source. Because we need to Pump data from API, we need to send request to the API to receive our data. This means that our source has to be “trigger” the request and send it to the API. For this reason we will be using a so-called trigger source. More about Trigger Source.
Because we are using Trigger Source. We need to specify which trigger we will be using. There are more types of triggers,
but in our example we will be using PeriodicTrigger, which triggers in time intervals specified in the parameter.
bspump.trigger.PeriodicTrigger(app, <<Time parameter in seconds>>))
Each pipeline requires a sink. We will use PPrintSink for now to see incoming data. But in the next steps we will be using NullSink which I describe later.
First we need to create configuration file. Create config.conf
file in your pump folder. To this configuration file
copy-paste this chunk of code and rewrite <YOUR PRIVATE API>
section with your API key which you will get by
following steps here
[pipeline:SamplePipeline]
url = https://api.fortnitetracker.com/v1/store
api_key = <YOUR PRIVATE API KEY>
After you have your configuration file finished you can copy-paste code below and try it yourself. Be sure you have BSPump module installed. If not follow our guide Installing BSPump module
import bspump
import bspump.common
import bspump.http
import bspump.trigger
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self,
config={'url': self.Config['url']},
headers={'TRN-Api-Key': self.Config['api_key']}).on(bspump.trigger.PeriodicTrigger(app, 2)),
bspump.common.PPrintSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
You can run this code with ~ python3 yourpumpname.py -c config.conf
command in terminal.
Well done! Now we are pumping data about items which are in Fortnite store right now.
You should get output like this:
~ python3 docs1.py -c config.conf
BitSwan BSPump version 21.11-17-g6b346fd
04-Feb-2022 18:00:30.503021 NOTICE asab.application is ready.
1 pipeline(s) ready.
(b'[\r\n {\r\n "imageUrl": "https://trackercdn.com/legacycdn/fortnite/8BD06'
b'909_large.png",\r\n "manifestId": 6909,\r\n "name": "Marsh Walk",\r'
b'\n "rarity": "Sturdy",\r\n "storeCategory": "BRSpecialFeatured",\r'
b'\n "vBucks": 500\r\n },\r\n {\r\n "imageUrl": "https://trackercdn.c'
b'om/legacycdn/fortnite/275915210_large.png",\r\n "manifestId": 15210,\r\n '
b' "name": "Arcane Vi",\r\n "rarity": "Epic",\r\n "storeCategory": "BR'
b'SpecialFeatured",\r\n "vBucks": 0\r\n },\r\n {\r\n "imageUrl": "http'
b's://trackercdn.com/legacycdn/fortnite/2AC415212_large.png",\r\n "manife'
b'stId": 15212,\r\n "name": "Piltover Warden Hammer",\r\n "rarity": "Epi'
b'c",\r\n "storeCategory": "BRSpecialFeatured",\r\n "vBucks": 800\r\n '
b' },\r\n {\r\n "imageUrl": "https://trackercdn.com/legacycdn/fortnite/6C4'
b'015364_large.png",\r\n "manifestId": 15364,\r\n "name": "Marsha",\r'
b'\n "rarity": "Epic",\r\n "storeCategory": "BRSpecialFeatured",\r\n '
b' "vBucks": 1500\r\n },\r\n {\r\n "imageUrl": "https://trackercdn.co'
b'm/legacycdn/fortnite/46F66923_large.png",\r\n "manifestId": 6923,\r\n '
b'"name": "Marshmello",\r\n "rarity": "Quality",\r\n "storeCategory": "B'
b'RSpecialFeatured",\r\n "vBucks": 1500\r\n },\r\n {\r\n "imageUrl": "'
b'https://trackercdn.com/legacycdn/fortnite/B84F13565_large.png",\r\n "ma'
b'nifestId": 13565,\r\n "name": "Arcane Jinx",\r\n "rarity": "Epic",'
b'\r\n "storeCategory": "BRSpecialFeatured",\r\n "vBucks": 0\r\n },\r\n'
b' {\r\n "imageUrl": "https://trackercdn.com/legacycdn/fortnite/61841528'
b'7_large.png",\r\n "manifestId": 15287,\r\n "name": "Goblin Glider"'
b',\r\n "rarity": "Epic",\r\n "storeCategory": "BRSpecialFeatured",\r'
b'\n "vBucks": 800\r\n },\r\n ...
Export to CSV
Awesome! Now we are pumping data but we want to store them somewhere. In the end we want to create Discord Bot which will show us current Fortnite Store when we write command to discord chat. Discord bot can work easily with CSV file so we need to export our data do .csv file.
We have to import pandas library to our pump which can export JSON file to CSV file and then we define our exporting processor.
The processor convert JSON file to dataframe with pandas library and then export it as CSV file and create specified file in same folder like our pump (you can define path you want).
This will be our processor:
class JSONtoCSV(bspump.Processor):
def process(self, context, event):
df = pd.read_json(event)
event = df.to_csv('data.csv', index=False)
return event
Now we add this processor to our pump, we have to change PPrintSink to NullSink because we don’t want to store or print data anywhere, we will have it in our CSV file.
You can copy-paste code below and look into your pump folder if there is a CSV file with our data.
import bspump
import bspump.common
import bspump.http
import bspump.trigger
import pandas as pd
class JSONtoCSV(bspump.Processor):
def process(self, context, event):
df = pd.read_json(event)
event = df.to_csv('data.csv', index=False)
return event
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self,
config={'url': self.Config['url']},
headers={'TRN-Api-Key': self.Config['api_key']}).on(bspump.trigger.PeriodicTrigger(app, 2)),
JSONtoCSV(app, self),
bspump.common.NullSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
The CSV file should looks this way:

Processor with pandas script
You can see that in our data set there aren’t so many interesting datas. So we want to add column with coefficient of price over rarity which will be useful in our Discord bot, because player could know which items is the most advantageous for purchase.
We create basic pandas script to go through rows and calculate the coefficient from rarity and vBucks column values and then add to list which will create new column called Coef at the end. More about pandas here
You have to convert the dataframe back to JSON file, because pipeline can’t work with dataframes.
The processor:
class AddRarityPriceCoef(bspump.Processor):
def process(self, context, event):
df = pd.read_json(event)
coefs = []
for row in df.itertuples():
if row.vBucks == 0:
price = 1
else:
price = row.vBucks
if row.rarity.lower() == 'handmade':
coefs.append((1/price)*100)
elif row.rarity.lower() == 'uncommon':
coefs.append((2/price)*100)
elif row.rarity.lower() == 'rare':
coefs.append((3/price)*100)
elif row.rarity.lower() == 'epic':
coefs.append((4/price)*100)
elif row.rarity.lower() == 'legendary':
coefs.append((5/price)*100)
elif row.rarity.lower() == 'mythic':
coefs.append((6/price)*100)
elif row.rarity.lower() == 'exotic':
coefs.append((7/price)*100)
else:
coefs.append(1)
df['Coef'] = coefs
event = df.to_json()
return event
Now we add the processor to our pump and after you copy-paste the code and run the pump you can see that the new column was added with our calculated values.
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
import pandas as pd
class JSONtoCSV(bspump.Processor):
def process(self, context, event):
df = pd.read_json(event)
print(df)
event = df.to_csv('data.csv', index=False)
return event
class AddRarityPriceCoef(bspump.Processor):
def process(self, context, event):
df = pd.read_json(event)
coefs = []
for row in df.itertuples():
if row.vBucks == 0:
price = 1
else:
price = row.vBucks
if row.rarity.lower() == 'handmade':
coefs.append((1/price)*100)
elif row.rarity.lower() == 'uncommon':
coefs.append((2/price)*100)
elif row.rarity.lower() == 'rare':
coefs.append((3/price)*100)
elif row.rarity.lower() == 'epic':
coefs.append((4/price)*100)
elif row.rarity.lower() == 'legendary':
coefs.append((5/price)*100)
elif row.rarity.lower() == 'mythic':
coefs.append((6/price)*100)
elif row.rarity.lower() == 'exotic':
coefs.append((7/price)*100)
else:
coefs.append(1)
df['Coef'] = coefs
event = df.to_json()
return event
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self,
config={'url': self.Config['url']},
headers={'TRN-Api-Key': self.Config['api_key']}).on(bspump.trigger.PeriodicTrigger(app, 2)),
# Add price over rarity coefficient to dataframe
AddRarityPriceCoef(app, self),
# Converts incoming json event to CSV data
JSONtoCSV(app, self),
# We can also push datas to ES or Kafka
bspump.common.NullSink(app, self),
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
Data in CSV file:

Conclusion
So, in this example we learnt how to get data from basic API request and export it to CSV file. Then we create script with pandas library to make price over rarity coefficient and add it as a new column to our dataset. You can also add some other processors which can filter data or make some calculation over the datas.
What next?
Now I will show you how can you use the pump to create your Discord bot for yourself or your friends.
You can find how to create Discord bot here
The following discord bot can looks like this:

Install ElasticSearch and Kibana via Docker
About
This example is focused on how to install ElasticSearch and Kibana on your localhost and use the ES via Kibana GUI. We will be using Docker and Docker compose to install ElasticSearch environment. Be sure you have set up Docker and Docker compose, if not follow this guide to install Docker and Docker compose.
In the end we use Docker image of our Weather Pump, which can be found here Weather API Example, to pump data to index in our local ElasticSearch.
Docker is a platform which provides the ability to package and run an application in a loosely isolated environment called a container. More about Docker you can also read our quickstart how to use Docker with BSPump module here: Docker File Quickstart
Docker compose is a tool for defining and running multi-container Docker applications. More about Docker compose.
Docker compose with ES and Kibana
Now we create Docker compose file to run ElasticSearch and Kibana on our localhost. Create docker-compose.yml
file in your specified folder.
In docker compose you have to define your services which you want to use. In our case we define elasticsearch
and kibana
.
We choose which image of ES and Kibana we want to use. The image will automatically download from official Docker hub of Elastic.
Then we set a names of container and set a condition when the container restart after unexpected exit. In next step we set the environment of container.
In this case we don’t want to have security, we will have just one ElasticSearch single-node and we set up a connection between ES and Kibana in ELASTICSEARCH_HOSTS
.
Volumes is where the data will be stored in container file system. And in the end we specified on which localhost port container will be running.
You can also set that one service will be wait for another in depends_on
.
Just copy-paste this chunk of code into your docker-compose.yml
file:
version: '3.9'
services:
# Elastic Search single node cluster
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0
container_name: elasticsearch
restart: always
environment:
- xpack.security.enabled=false
- discovery.type=single-node
volumes:
- elasticsearch-data-volume:/usr/share/elasticsearch/data/
ports:
- 9200:9200
- 9300:9300
# Kibana UI for Elastic Search
kibana:
image: docker.elastic.co/kibana/kibana:8.0.0
container_name: kibana
restart: always
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- 5601:5601
depends_on:
- elasticsearch
volumes:
elasticsearch-data-volume:
driver: local
Now when we have defined your docker compose file we can try to run our first Docker compose app. Be sure you are in same folder like your
docker-compose file and type ~ docker compose up -d
into terminal.
The -d
flag means that your app will be running in detached mode. You can check
if all containers are running with docker ps
command.
You should see this:

You can also enter the Kibana GUI. Go to your browser and type localhost:5601
into search bar. You can see that you type localhost port which
we define in the docker compose file.
Wow! If everything is okay you will see this:

Run Weather pump to pump data to Elastic Search index
Well done! We installed ElasticSearch and Kibana locally and we are able to access the ElasticSearch with Kibana GUI. Now we can try to run pump which take weather data and we store them in Elasticsearch index. We already build Weather pump image so you basically pull the image from Docker hub and run it.
To do it simply run this command in your terminal:
~ docker run --network=host -dit lukasvecerka/bspump-weather
You have to set --network=host
which mean that your container can now access the localhost on your host machine.
If you type docker ps
the incoming output in terminal should be this:

Now go to this url address. Its page of Index Management where you can see all of your stored indexes.
If your containers are running correctly you can see that there is index called weather-pump-test
. This is the index where we store data from
our weather pump.

Summarize
That’s all for this example! In this example we learnt how to work with Docker and especially with Docker compose tool. How to set services in our application in Docker compose. As conclusion we installed ElasticSearch and Kibana locally and pump data on index in ElasticSearch with our pump.
What next
In the future you can add more services into your docker compose application and extend your environment with this services. You can build your own Docker image and push it to Docker hub and then use it in your docker compose.
More about how to create BSPump Docker image is here Docker File Quickstart
Install Kafka and KafDrop via Docker
About
This example is focused on how to install Kafka nad KafDrop on your localhost and search topics from Kafka in KafDrop. We will be using Docker and Docker Compose to install ElasticSearch environment. Be sure you have set up Docker and Docker Compose. If not follow this guide to install Docker and Docker compose.
In the end we will use Docker image of our Coindesk API pump, which can be found here coindesk, to pump data to topic in our local Kafka.
Docker is a platform which provides the ability to package and run an application in a loosely isolated environment called a container. More about Docker.
You can also read our quickstart how to use Docker with BSPump module here: Docker File Quickstart
Docker compose is a tool for defining and running multi-container Docker applications. More about Docker compose.
Docker compose with Kafka and KafDrop
Now we create Docker Compose file to run Kafka and KafDrop on our localhost. Create docker-compose.yml
file in our desired folder.
In docker compose you have to define your services which you want to use. Each service is one container which will be running.
In our case we define zookeeper
, kafka
and kafdrop
. ZooKeeper is essentially a service for distributed systems offering a hierarchical key-value store,
which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.
Services are consist of these values:
image: we choose which image will be download from DockerHub (after we run the docker compose its automatically pull the image)
hostname: name of service in multi-container network
ports: specified ports where the container will runs
environments: setting up the services configuration (e.g. Kafka Broker ID etc.)
depends_on: service will wait until specified service in depends_on will start
restart: service try to restart after unexpected end
Just copy-paste this chung od code into you docker-compose.yml
file:
version: '3.9'
services:
zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- /data/zookeeper/data:/data
- /data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19092"
depends_on:
- kafka1
Now when we have defined your docker compose file we can try to run our first Docker compose app. Be sure you are in same folder like your
docker-compose file and type ~ docker compose up -d
into terminal.
The -d
flag means that your app will be running in detached mode. You have to wait little bit
when all the images is pulled. After that you can check
if all containers are running with docker ps
command.
You should see this:

You can also enter the KafDrop. Go to your browser and type localhost:9000
to the search bar. You can see that you specify the port that we setup in docker compose.
Wow! If everything work correctly you can see thin page:

Pump data to Kafka topic
Well done! We’ve installed Kafka and KafDrop locally and we are able to see topics in KafDrop. Now we can try to run pump which take data from CoinDesk API and store them in Kafka topic. We already build the Coindesk pump image so you basically use the image and run it.
Simply type this command to your terminal and we will see what’s happen.
~ docker run --network=host -dit lukasvecerka/bspump-kafkasink-example
You have to set --network=host
which mean that your container can now access the localhost on your host machine.
Now when you look into KafDrop you can see coindesk-data
topic:

You can look on messages which you pump to this topic. Just click on topic name, then on View Messages
and again on View Messages
and you should see something like this:

Summarize
That’s all for this example! In this example we learnt how to work with Docker and especially with Docker compose tool. We learnt how to set services in our application in Docker compose. In the end we installed Kafka, Zookeeper and KafDrop locally and we run pump with Docker container to pump data to Kafka topic.
What next
In the future you can add more services into your docker compose application nad extend your environment with this services. You can build your own Docker image and push it to Docker hub and then use it in your docker compose or simply run it as a container.
More about how to create BSPump Docker image is here Docker File Quickstart
Docker File Quickstart
About
This tutorial will help you to create your own Docker image for your pipeline. First things first, I would recommend you to go through Docker Documentation if this is your first time with Docker.
quickstart to docker
Docker can help you with deployment of your app on other devices. Everything you need to do is to setup docker one device and then it works on every other device. Firstly you have to create docker image for you application. In our case we will create image for our BS Pipeline. To do that we have to firstly create a docker file for our pipeline.
We will be using code from one of our examples coindesk. You can simply copy paste the code and everything should be working if you have a bspump python module installed
docker file
Creating a docker file is very easy thing to do. You have only copy-paste the code below
FROM teskalabs/bspump:nightly
WORKDIR /opt/coindesk
COPY coindesk.py ./coindesk.py
CMD ["python3", "coindesk.py"]
To explain what is does:
1. keyword FROM
specifies what docker image you are using. In this case we will be using a “preset” for a bspump.
This image is running on Alpine linux and has all libraries installed.
WORKDIR
specifies the name of your working directory to where other files will be copiedCOPY
this command is used to copy any files you will be using including the source code of your app.
4. CMD
is a command for running commands in your container. You have to write a command sequence as a list where
each element is one word of the command. In our case we want to execute our program using python3 coindesk.py
Creating docker image
To build your docker image use this command. Make sure to use -t switch and match <<your docker nickname>> to your docker login name. This must match for successful push of the image to the docker desktop.
docker build -t <<your docker nickname>>/<<name of your image>> .
Now you can try to run your docker image using:
docker run -it <<your docker nickname>>/<<name of your image>>
now your container should be running in your console. If you want to terminate it open another console and type
docker ps
This command will show you all your running containers and with
docker kill <<CONTAINER ID>>
It will terminate the container. Container ID should be found next to the running image after typing docker ps
If you want to see all containers that were initiated type
docker ps -a
Now if you want to use this image from other devices for docker compose for example. You can push the image to your repository using:
docker push <<your docker nickname>>/<<name of your image>>
if you haven’t tagged your container before use
docker tag <<name of your image>> <<your docker nickname>>/<<name of your image>>
Now you should have running docker container and you know how to push it to your docker hub. If you are still not sure how to use docker I would recommend to check docker documentation once again. Docker is not complicated, but it takes some time to get used to it.
additional commands
TODO
what next
if you have successfully created your own docker image you can try to connected your pipeline with other technologies like elastic search or kafka. Check our Install ElasticSearch and Kibana via Docker for working with docker compose.
WebSocket Example
This example will show you how can you can connect two pipelines connection using socket server connection.
what is socket
Socket is a peer-to-peer connection between two computers. You can imagine it like two computers have access to one directory and can share data between each other.
explain server/client consumer/producer
The pipeline you will create can be either a server or a client. Server is a script that listens on a certain IP address and port, client is the one who “connects” to a certain port of the server. Both client and server can be either consumers, meaning that consumer (consumes) the data, and producer is the one who produce the data. The specific combination of server/client consumer/producers mainly depends on what do you wanna do. In this example we will show both server/consumer - client/producer type of connection and server/producer - client/consumer connection.
Server consumer
Server consumer means that this pipeline will be waiting for any client trying to make a connection and if there is a connection with a client the server will get the incoming data into its pipeline. This server pipeline will use Websocket Source as its Source.
To create this kind of pipeline we have to use our WebSocketSource and specify the address and port on which it will listen for any possible connections. In this example we will run both pipelines on localhost, so you do not have to waste your time setting up your own network.
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.web
import bspump.http
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.WebSocketSource(app, self),
bspump.common.PPrintSink(app, self)
)
if __name__ == '__main__':
app = bspump.BSPumpApplication(web_listen="0.0.0.0:8080") #set web_listen variable to the address you want
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
#you have to use add_get method to set up address using the handler.
app.WebContainer.WebApp.router.add_get('/bspump/ws', pl.WebSocketSource.handler)
app.run()
You can copy-paste the code above. The pipeline is really simple the only thing you have to do is to add WebSocket Source.
Just make sure to set up the web_listen
variable in the BSPumpApplication
object, and do not forget that you have to call the add_get
method TODO
Now you can run the script and your server should be running listening for any possible connections.
Client producer
We have a running server, so now we have to create a client that can connect to the server and feed it with the data.
#!/usr/bin/env python3
import bspump
import bspump.common
import bspump.http
import bspump.trigger
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.Counter = 1
# self.Source = bspump.common.InternalSource(app, self)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
# Trigger that triggers the source every second (based on the method parameter)
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.http.HTTPClientWebSocketSink(app, self, config={
'url': 'http://127.0.0.1:8080/bspump/ws',
})
)
if __name__ == '__main__':
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Construct and register Pipeline
pl = SamplePipeline(app, 'SamplePipeline')
svc.add_pipeline(pl)
app.run()
Creating the client is much more easier than the server. All you have to do is to use HTTPClientSocketSink
with config
where you specify the url of the server you want to connect to. In this case it is http://127.0.0.1:8080/bspump/ws
what next
This example should have you given an idea how to use and connect pipelines using socket connection.
Blank App
In this tutorial you will learn how divide a pipeline into several file components. This approach is beneficial for creating more advanced pipelines as you can use some of the components without the need of copy pasting code. It is also much more clear. This is a general guide so you can apply this structure to your pipeline. We will be using so-called blank app in this tutorial for simplicity you can find the code here.
In this tutorial we will use code from our previous tutorial coindesk, but don’t worry once you create this structure it is easy to make changes for your own pipeline.
first you will create similar file hierarchy like on this image.

pipeline
In this file you will have your pipelien with self.build
method. If you want to use your own processors, sources
or sinks you have to import them from another file. In this example I want to use my processor for coindesk, so I have to use
from .processor import EnrichProcessor
and then I can reference it in self.build
method.
import bspump
import bspump.common
import bspump.http
import bspump.trigger
from .processor import EnrichProcessor
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
# Source that GET requests from the API source.
bspump.http.HTTPClientSource(app, self, config={
'url': 'https://api.coindesk.com/v1/bpi/currentprice.json'
# Trigger that triggers the source every second (based on the method parameter)
}).on(bspump.trigger.PeriodicTrigger(app, 5)),
# Converts incoming json event to dict data type.
bspump.common.StdJsonToDictParser(app, self),
# Adds a CZK currency to the dict
EnrichProcessor(app, self),
bspump.common.StdDictToJsonParser(app,self),
# prints the event to a console
bspump.common.PPrintSink(app, self),
)
Only remember that name of your pipeline (the name of the class) will be used in other files.
processor
To create processor file you can simply copy-paste your processor class.
- note
Do not forget to import bspump module, so your processor can function normally.
import bspump
class EnrichProcessor(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def convertUSDtoJPY(self, usd):
return usd * 113.70 # outdated rate usd/jpy
def process(self, context, event):
jpyPrice = str(self.convertUSDtoJPY(event["bpi"]["USD"]["rate_float"]))
event["bpi"]["JPY"] = {
"code": "JPY",
"symbol": "¥",
"rate": ''.join((jpyPrice[:3], ',', jpyPrice[3:])),
"description": "JPY",
"rate_float": jpyPrice
}
return event
service
In service you have to register your pipeline. You can also register more pipelines.
- note
Remember to import your pipeline class here, so you can register the pipeline.
import asab
from .pipeline import SamplePipeline
class BlankService(asab.Service):
def __init__(self, app, service_name="blank.BlankService"):
super().__init__(app, service_name)
async def initialize(self, app):
svc = app.get_service("bspump.PumpService")
# Create and register all connections here
# Create and register all matrices here
# Create and register all lookups here
# Create and register all pipelines here
self.SamplePipeline = SamplePipeline(app, "SamplePipeline")
svc.add_pipeline(self.SamplePipeline)
await svc.initialize(app)
self.SamplePipeline = SamplePipeline(app, "SamplePipeline")
svc.add_pipeline(self.SamplePipeline)
These two lines of the code register your pipeline.
module
In module you create a module of your service. You can create more modules from several services.
import asab
from .service import BlankService
class BlankModule(asab.Module):
def __init__(self, app):
super().__init__(app)
self.BlankService = BlankService(app)
app
In app you create the whole application. You have to only include the module you have created. You can include more modules here.
import bspump
class BlankAppApplication(bspump.BSPumpApplication):
def __init__(self):
super().__init__()
from .module import BlankModule
self.add_module(BlankModule)
init
create this file for initialization of your pipeline.
from .app import BlankAppApplication
how to start the pipeline
to start your pipeline create another file. For example, bspump-blank-app.py
and copy-paste this code
from mypipeline.app import BlankAppApplication
if __name__ == '__main__':
app = BlankAppApplication()
app.run()
mypipeline.app
is the path to your app
python file. and BlankAppApplication
is the name of your pipeline class.
Then you create an object of your class and run it.
Reference Documentation
BSPump Reference Documentation describes the bspump Python library. Based on ASAB library. ASAB is a platform that enables BSPump to be efficient and easy to configure.
Basics
Basics covers the most fundamental components of a BSPump. We will start with the “backbone” of the BSPump, which is called a “pipeline”.
Pipeline
The pipeline class is responsible for construction of the BSPump pipeline itself. Its methods enable us to maintain a working lifecycle of the system.
Pipeline
is responsible for data processing in BSPump.
Individual Pipeline
objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers.
Each Pipeline
is usually in charge of one concrete task.
Pipeline has three main components:

Source connects different data sources with the Pipeline
to be processed
Multiple sources
A Pipeline
can have multiple sources.
They are simply passed as a list of sources to a Pipeline
build() method.
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MySource1(app, self),
MySource2(app, self),
MySource3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
The main component of the BSPump architecture is a so-called Processor
.
This object modifies, transforms and enriches events.
Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behaviour patterns.
Processors
differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects.
As regards working with data events, each Pipeline
has its unique task.
Processors
are passed as a list of Processors
to a Pipeline
build() method
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MyProcessor1(app, self),
MyProcessor2(app, self),
MyProcessor3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
- class Pipeline(app, id=None, config=None)[source]
Bases:
ABC
,Configurable
Description: Pipeline is …
An example of The
Pipeline
construction:class MyPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( [ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ] bspump.common.NullSink(app, self), )
Pipeline construction
The following are the core methods of the pipeline.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors
, and sink to create the structure of thePipeline
.Parameters
- Pipeline.set_source(source)[source]
Sets a specific source or list of sources to the
Pipeline
.Parameters
- sourcestr, list optional
ID of a source.
If a list of sources is passed to the method, it adds the entire list of sources to the
Pipeline
.
- Pipeline.append_processor(processor)[source]
Adds a
Processors
to the currentPipeline
.Parameters
- processorstr
ID of a
processor
.
- Hint
The Generator can be added by using this method. It requires a depth parameter.
- Pipeline.remove_processor(processor_id)[source]
Removes a specific
processor
from thePipeline
.Parameters
- processor_idstr
ID of a
processor
.
- Returns
Error when
processor
is not found.
- Pipeline.insert_before(id, processor)[source]
Inserts the
Processor
into thePipeline
in front of anotherprocessor
specified by ID.Parameters
- idstr
ID of a
processor
that we want to insert.- processorstr
Name of the
processor
in front of which will be inserted the newprocessor
.
- Returns
True on success. False if ID was not found.
- Pipeline.insert_after(id, processor)[source]
Inserts the
Processor
into thePipeline
behind anotherProcessors
specified by ID.Parameters
- idstr
ID of a processor that we want to insert.
- processorstr
- Returns
True if successful. False if ID was not found.
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processors
in thePipeline
.- Yields
A Processor from a list in the
Pipeline
.
Other Pipeline Methods
The additional methods below bring more features to the pipeline. However, many of them are very important and almost necessary.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors
, and sink to create the structure of thePipeline
.Parameters
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processors
in thePipeline
.- Yields
A Processor from a list in the
Pipeline
.
Other pipeline methods
- Pipeline.time()[source]
Returns correct time.
- Returns
App.time()
- Hint
More information in the ASAB documentation in UTC Time.
- Pipeline.get_throttles()[source]
Returns components from
Pipeline
that are throttled.- Returns
self._throttles Return list of throttles.
- Pipeline.is_error()[source]
Returns False when there is no error, otherwise it returns True.
- Returns
self._error is not None.
- Pipeline.set_error(context, event, exc)[source]
When called with exc is None, it resets error (aka recovery).
When called with exc, it sets exceptions for soft errors.
Parameters
- contexttype?
Context of an error.
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- excException.
Python default exceptions.
- Pipeline.handle_error(exception, context, event)[source]
Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.
Parameters
- exceptionException
Used for setting up a custom Exception.
- contextinformation
Additional information can be passed.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Returns
False for hard errors (stop the
Pipeline
processing). True for soft errors that will be ignored.
Example:
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def handle_error(self, exception, context, event): if isinstance(exception, json.decoder.JSONDecodeError): return True return False
- Pipeline.link(ancestral_pipeline)[source]
Links this
Pipeline
with an ancestralPipeline
. This is needed e. g. for a propagation of the throttling from childPipelines
back to their ancestors. If the childPipeline
uses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestralPipeline
, so that its source may block incoming events until the internal queue is empty again.Parameters
- ancestral_pipelinestr
ID of a
Pipeline
that will be linked.
- Pipeline.unlink(ancestral_pipeline)[source]
Unlinks an ancestral
Pipeline
from thisPipeline
.Parameters
- ancestral_pipelinestr
ID of a ancestral
Pipeline
that will be unlinked.
- Pipeline.throttle(who, enable=True)[source]
Enables throttling method for a chosen
pipeline
and its ancestralpipelines
,x if needed.Parameters
- async Pipeline.ready()[source]
Checks if the
Pipeline
is ready. The method can be used in source: await self.Pipeline.ready().
- Pipeline.is_ready()[source]
This method is a check up of the event in the Event class.
- Returns
_ready.is_set().
- Pipeline.inject(context, event, depth)[source]
Injects method serves to inject events into the
Pipeline
’s depth defined by the depth attribute. Every depth is interconnected with a generator object.Parameters
- contextstring
Information propagated through the
Pipeline
.- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- depthint
Level of depth.
- Note
For normal operations, it is highly recommended to use process method instead.
- async Pipeline.process(event, context=None)[source]
Process method serves to inject events into the
Pipeline
’s depth 0, while incrementing the event in metric.Parameters
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- contextstr, default None
You can add additional information needed for work with event streaming.
- Hint
This is recommended way of inserting events into a
Pipeline
.
- Pipeline.create_eps_counter()[source]
Creates a dictionary with information about the
Pipeline
. It contains eps (events per second), warnings and errors.- Returns
self.MetricsService Creates eps counter using MetricsService.
- Note
EPS counter can be created using this method or dicertly by using MatricsService method.
- Pipeline.ensure_future(coro)[source]
You can use this method to schedule a future task that will be executed in a context of the
Pipeline
. ThePipeline
also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block thePipeline
via set_error().Parameters
- coro??
??
- Hint
If the number of futures exceeds the configured limit, the
Pipeline
is throttled.
- Pipeline.locate_source(address)[source]
Locates a sources based on its ID.
Parameters
- addressstr
ID of the source.
- Pipeline.locate_connection(app, connection_id)[source]
Finds a connection by ID.
Parameters
- appApplication
Name of the Application.
- connection_idstr
ID of connection we want to locate.
- Returns
connection
Source
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
Configurable
Source class is responsible for connecting to a source, and propagating events or other data from the source to the
processors
.
- Source.__init__()[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
Source Construction
Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.
Each source represent a coroutine/Future/Task that is running in the context of the main loop.
The coroutine method main()
contains an implementation of each particular source.
Source MUST await a Pipeline
ready state prior producing the event.
It is acomplished by await self.Pipeline.ready() call.
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
Configurable
Source class is responsible for connecting to a source, and propagating events or other data from the source to the
processors
.- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
- async Source.process(event, context=None)[source]
This method is used to emit event into a
Pipeline
.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline
.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipeline
is throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- Source.start(loop)[source]
Starts the
Pipeline
through the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async Source.stop()[source]
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Source.restart(loop)[source]
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async Source.main()[source]
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async Source.stopped()[source]
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- Source.locate_address()[source]
Locates address of a
Pipeline
.- Returns
ID and ID of a
Pipeline
as a string.
- classmethod Source.construct(app, pipeline, definition: dict)[source]
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline
Specification of a
Pipeline
.- definitiondict
Definition that is used to create a source.
- Returns
cls(app, newid, config)
This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc.
You need to provide a trigger class and implement cycle()
method.
Trigger source will stop execution, when a Pipeline
is cancelled (raises concurrent.futures.CancelledError).
This typically happens when a program wants to quit in reaction to a on the signal.
You also may overload the main()
method to provide additional parameters for a cycle()
method.
async def main(self):
async with aiohttp.ClientSession(loop=self.Loop) as session:
await super().main(session)
async def cycle(self, session):
session.get(...)
- class TriggerSource(app, pipeline, id=None, config=None)[source]
Bases:
Source
Description:
- Returns
- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
- TriggerSource.time()[source]
Method used for measuring an accurate time.
- Returns
App.time()
- Hint
You can find more information about UTC Time in the ASAB documentation
- TriggerSource.on()[source]
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns
Trigger.add(trigger)
Processor
The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.
Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.
- class Processor(app, pipeline, id=None, config=None)[source]
Bases:
ProcessorBase
Inherits from ProcessorBase.
- classmethod Processor.construct()
Can construct a
processor
based on a specific definition. For example, a JSON file.Parameters
- appApplication
Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- pipelinestr
Name of the
Pipeline
.- definitiondict
Set of instructions based on which
processor
can be constructed.
- Returns
cls(app, pipeline, id=newid, config=config)
- Processor.process()
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Processor.locate_address()
Returns an ID of a
processor
and aPipeline
.- Returns
ID of the
Pipeline
and self.Id.
- Processor.rest_get()
Description:
- Returns
- Processor.__repr__()
Return repr(self).
Sink
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
Connection
- class Connection(app, id=None, config=None)[source]
Bases:
ABC
,Configurable
Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processor
and sink.- __init__(app, id=None, config=None)[source]
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
Connection construction
Top Level Objects
BSPumpApplication
- class BSPumpApplication(*args, **kwargs)[source]
Bases:
Application
BSPumpApplication is responsible for the main life cycle of the Application. It is based on ASAB Application class
- BSPumpApplication.__init__()[source]
Initiates the Application and looks for config with additional arguments.
Parameters
args : default= None
web_listen : default= None
BSPumpApplication Construction
- BSPumpApplication.create_argument_parser()[source]
Enables to create arguments that can be called within the command prompt when starting the application
- Returns
parser
- BSPumpApplication.parse_arguments(args=None)[source]
Parses argument in the ASAB Application using super() method.
Parameters
args : default= None
- Returns
args
BSPumpService
- class BSPumpService(app, service_name='bspump.PumpService')[source]
Bases:
Service
Service registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_.
- BSPumpService.__init__()[source]
Initializes parameters passed to the Service class.
Parameters
- appApplication
Name of the Application.
- service_namestr, Service name
string variable containing “”bspump.PumpService
BSPumpService Methods
- BSPumpService.locate(address)[source]
locates pipeline, source or processor based on the adressed parameter
Parameters
- addressstr, ID
Address of an pipeline component.
- BSPumpService.add_pipeline(pipeline)[source]
Adds a pipeline to the BSPump.
Parameters
- pipelinePipeline
Name of the Pipeline.
- BSPumpService.add_pipelines(*pipelines)[source]
Adds a pipelines the BSPump.
Parameters
- *pipelineslist
List of pipelines that are add to the BSPump.
- BSPumpService.del_pipeline(pipeline)[source]
Deletes a pipeline from a list of Pipelines.
**Parameters*
- pipelinestr, ID
ID of a pipeline.
- BSPumpService.add_connection(connection)[source]
Adds a connection to the Connection dictionary.
Parameters
- connectionstr, ID
ID of a connection.
- Returns
connection
- BSPumpService.add_connections(*connections)[source]
Adds a connections to the Connection dictionary.
Parameters
- *connectionstr, ID
list of IDs of a connections.
- BSPumpService.locate_connection(connection_id)[source]
Locates connection based on connection ID.
Parameters
- connection_idID
Connection ID.
- BSPumpService.add_lookup(lookup)[source]
Sets a lookup based on Lookup.
Parameters
- lookupLookup
Name of the Lookup.
- Returns
lookup
- BSPumpService.add_lookups(*lookups)[source]
Adds a list of lookups to the Pipeline.
Parameters
- lookupLookup
List of Lookups.
- BSPumpService.locate_lookup(lookup_id, context=None)[source]
Locates lookup based on ID.
Parameters
- lookup_idID
ID of a Lookup.
- context,default = None
Additional information.
- Returns
lookup from the lookup service or form the internal dictionary.
- BSPumpService.add_lookup_factory(lookup_factory)[source]
Adds a lookup factory
Parameters
- lookup_factory :
Name of lookup factory.
- BSPumpService.add_matrix(matrix)[source]
Adds a matrix to the Pipeline.
Parameters
- matrixMatrix
Name of Matrix.
- Returns
matrix
- BSPumpService.add_matrixes(*matrixes)[source]
Adds a list of Matrices to the Pipeline.
Parameters
- *matrixeslist
List of matrices.
- BSPumpService.locate_matrix(matrix_id)[source]
Locates a matrix based on matrix ID
Parameters
- matrix_idstr, ID
ID of a matrix.
- async BSPumpService.initialize(app)[source]
Initializes an Application based on ASAB Application
Parameters
- appApplication
Name of the Application
- async BSPumpService.finalize(app)[source]
Stops all the pipelines
Parameters
- appApplication
Name of the Application
Common
Aggregator
Aggregation Strategy
- AggregationStrategy.__init__()
List Aggregation Strategy
- class ListAggregationStrategy[source]
Bases:
AggregationStrategy
Description: … test
String Aggregation Strategy
- class StringAggregationStrategy(delimiter='\n')[source]
Bases:
AggregationStrategy
Description:
Aggregator
- class Aggregator(app, pipeline, aggregation_strategy: ~bspump.common.aggregator.AggregationStrategy = <bspump.common.aggregator.ListAggregationStrategy object>, id=None, config=None)[source]
Bases:
Generator
Description:
Bytes
String to Bytes Parser
- class StringToBytesParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
** Default Config **
encoding : utf-8
Bytes To String Parser
- class BytesToStringParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Flatten
Flatten Dict Processor
- class FlattenDictProcessor(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description: ….
Inspired by https://github.com/amirziai/flatten
Example:
- “person”: {
- “details”: {
“first_name”: “John”, “last_name”: “Doe”
}, “address”: {
“country”: “GB”, “city”: “London”, “postal_code”: “WC2N 5DU”
}
}
Gets converted to:
- {
“person.details.first_name”: “John”, “person.details.last_name”: “Doe”, “person.address.country”: “GB”, “person.address.city”: “London”, “person.address.postal_code”: “WC2N 5DU”
}
..automethod:: bspump.common.FlattenDictProcessor.__init__()
Hexlify
Hexlify Processor
Iterator
Hexlify Processor
- class IteratorSource(app, pipeline, iterator: Iterator, id=None, config=None)[source]
Bases:
TriggerSource
Description:
Hexlify Processor
- IteratorGenerator.__init__()
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, defualt = None
configuration file containing additional information.
Json
CySimd Json Parser
- class CySimdJsonParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Fast JSON parser. Expects json bytes represented as bytes as input Based on https://github.com/TeskaLabs/cysimdjson
Std Dict To Json Parser
- class StdDictToJsonParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Std Json To Dict Parser
- class StdJsonToDictParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Dict To JsonBytes Parser
- class DictToJsonBytesParser(app, pipeline, id=None, config=None)[source]
Bases:
Processor
DictToJsonBytesParser transforms a dictionary to JSON-string encoded in bytes. The encoding charset can be specified in the configuration in encoding field.
- DictToJsonBytesParser.process(context, event)[source]
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
Mapping
Mapping Keys Processor
- class MappingKeysProcessor(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description: Mapping Keys Processor
Mapping Values Processor
- class MappingValuesProcessor(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Mapping Items Processor
- class MappingItemsProcessor(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Mapping Keys Generator
- class MappingKeysGenerator(app, pipeline, id=None, config=None)[source]
Bases:
Generator
Description:
Null
Null Sink
Print
Print Sink
PPrint Sink
- class PPrintSink(app, pipeline, id=None, config=None, stream=None)[source]
Bases:
Sink
Description:
Print Processor
- class PrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]
Bases:
Processor
Description:
PPrint Processor
- class PPrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]
Bases:
Processor
Description:
Print Context Processor
- class PrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]
Bases:
Processor
Description:
PPrint Context Processor
- class PPrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]
Bases:
Processor
Description:
Routing
Direct Source
- class DirectSource(app, pipeline, id=None, config=None)[source]
Bases:
Source
Description: This source processes inserted event synchronously.
Internal Source
- InternalSource.put(context, event, copy_context=False, copy_event=False)[source]
Description: Context can be an empty dictionary if is not provided.
If you are getting a asyncio.queues.QueueFull exception, you likely did not implemented backpressure handling. The simpliest approach is to use RouterSink / RouterProcessor.
- async InternalSource.put_async(context, event, copy_context=False, copy_event=False)[source]
Description: This method allows to put an event into InternalSource asynchronously. Since a processing in the pipeline is synchronous, this method is useful mainly for situation, when an event is created outside of the pipeline processing. It is designed to handle situation when the queue is becoming full.
Context can be an empty dictionary if is not provided.
Router Mix In
- RouterMixIn.__init__()
- RouterMixIn.unlocate(source_id)[source]
Description: Undo locate() call, it means that it removes the source from a cache + remove throttling binds
- Returns
??
Router Sink
- class RouterSink(app, pipeline, id=None, config=None)[source]
Bases:
Sink
,RouterMixIn
Description: Abstract Sink that dispatches events to other internal sources. One should override the process() method and call route() with target source id.
Router Processor
- class RouterProcessor(app, pipeline, id=None, config=None)[source]
Bases:
Processor
,RouterMixIn
Description: Abstract Processor that dispatches events to other internal sources. One should override the process() method and call route() with target source id.
Tee
Tee Source Processor
- class TeeSource(app, pipeline, id=None, config=None)[source]
Bases:
InternalSource
Description:
class SamplePipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
- self.build(
bspump.socket.TCPStreamSource(app, self, config={‘port’: 7000}), bspump.common.TeeProcessor(app, self).bind(“SampleTeePipeline.*TeeSource”), bspump.common.PPrintSink(app, self)
)
class SampleTeePipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
- self.build(
bspump.common.TeeSource(app, self), bspump.common.PPrintSink(app, self)
)
Tee Processor
- class TeeProcessor(app, pipeline, id=None, config=None)[source]
Bases:
RouterProcessor
Description: See TeeSource for details.
Time
Time Zone Normalizer
- class TimeZoneNormalizer(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description: Normalizes datetime from local timezone (e.g. in config) to UTC, which is preferred internal datetime form
Transfr
Mapping Transformator
- class MappingTransformator(app, pipeline, id=None, config=None)[source]
Bases:
Processor
Description:
Advanced
BitSwan Pump provides more advanced Processors that can be used in a pipeline
Generator
- Generator object is used to generate one or multiple events in asynchronous way
and pass them to following processors in the pipeline. In the case of Generator, user overrides generate method, not process.
1.) Generator can iterate through an event to create (generate) derived ones and pass them to following processors.
Example of a custom Generator class with generate method:
class MyGenerator(bspump.Generator):
async def generate(self, context, event, depth):
for item in event.items():
self.Pipeline.inject(context, item, depth)
2.) Generator can in the same way also generate completely independent events, if necessary.
In this way, the generator processes originally synchronous events "out-of-band" e.g. out of the synchronous processing within the pipeline.
Specific implementation of the generator should implement the generate method to process events while performing
long running (asynchronous) tasks such as HTTP requests or SQL select.
The long running tasks may enrich events with relevant information, such as output of external calculations.
Example of generate method:
async def generate(self, context, event, depth):
# Perform possibly long-running asynchronous operation
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com/resolve_color/{}".format(event.get("color_id", "unknown"))) as resp:
if resp.status != 200:
return
new_event = await resp.json()
# Inject a new event into a next depth of the pipeline
self.Pipeline.inject(context, new_event, depth)
- Generator.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, defualt = None
configuration file containing additional information.
Generator Construction
Analyzer
- This is general analyzer interface, which can be the basement of different analyzers.
analyze_on_clock enables analyzis by timer, which period can be set by analyze_period or Config[“analyze_period”].
In general, the Analyzer contains some object, where it accumulates some information about events. Events go through analyzer unchanged, the information is recorded by evaluate() function. The internal object sometimes should be processed and sent somewhere (e.g. another pipeline), this process can be done by analyze() function, which can be triggered by time, pubsub or externally
- class Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]
Bases:
Processor
Description:
Analyzer Construction
Analyzer
The main function, which runs through the analyzed object. Specific for each analyzer. If the analyzed object is Matrix, it is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples: 1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
- Analyzer.evaluate(context, event)[source]
- The function which records the information from the event into the analyzed object.
Specific for each analyzer.
Parameters
context :
- eventany data type
information with timestamp.
- Analyzer.predicate(context, event)[source]
This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.
Parameters
context :
- eventany data type
information with timestamp.
- Returns
True
Analyzing Source
Lookup
Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.
If the “lazy” parameter in the constructor is set to True, no load method is called and the user is expected to call it when necessary.
- class Lookup(app, id=None, config=None, lazy=False)[source]
Bases:
Configurable
Description:
- Returns
Lookup Construction
MappingLookup
- class MappingLookup(app, id=None, config=None, lazy=False)[source]
Bases:
Lookup
,Mapping
Description:
- MappingLookup.__init__()
Description:
Async Lookup Mixin
AsyncLookupMixin makes sure the value from the lookup is obtained asynchronously. AsyncLookupMixin is to be used for every technology that is external to BSPump, respective that require a connection to resource server such as SQL etc.
Dictionary Lookup
- class DictionaryLookup(app, id=None, config=None, lazy=False)[source]
Bases:
MappingLookup
Description:
Dictionary Lookup Methods
Lookup Provider
Lookup Provider Methods
Lookup BatchProvider ABC
- class LookupBatchProviderABC(lookup, url, id=None, config=None)[source]
Bases:
LookupProviderABC
,ABC
Description:
- LookupBatchProviderABC.__init__()
Description:
Anomaly
- class Anomaly[source]
Bases:
dict
Description: Anomaly is an abstract class to be overriden for a specific anomaly and its type.
- Returns
Implement: TYPE, on_tick
- Anomaly.__init__()
Technologies
Technologies Reference Documentation describes the Technologies section.
Apache Kafka
Connection
- class KafkaConnection(app, id=None, config=None)[source]
Bases:
Connection
KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.
config = {"compression_type": "gzip"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) )
ConfigDefaults
options:- compression_type (str): Kafka supports several compression types:
gzip
,snappy
andlz4
. This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.
- security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN
- sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
- compression_type (str): Kafka supports several compression types:
- KafkaConnection.__init__()[source]
initializes variables
Parameters
- appApplication
Name of the Application.
- id, default = None
ID information.
- configJSON or txt, default= None
Configuration file of any supported type.
- async KafkaConnection.create_producer(**kwargs)[source]
Creates a Producer.
Parameters
- **kwargs :
Additional information can be passed to this method.
- Returns
producer
- KafkaConnection.create_consumer(*topics, **kwargs)[source]
Creates a consumer.
Parameters
- *topics :
any number of topics can be passed to this method.
- **kwargs :
additional information can be passed to this method.
- Returns
consumer
Source
- class KafkaSource(app, pipeline, connection, id=None, config=None)[source]
Bases:
Source
KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance.
- KafkaSource.__init__()[source]
Initializes parameters.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
id : , default = None
config : , default = None
Sink
- class KafkaSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]
Bases:
Sink
Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.
KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) There are two ways to use KafkaSink: - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline. - Specify topic separetly for each event in event context - context['kafka_topic']. Topic from configuration is than used as a default topic. To provide business logic for event distribution, you can create topic selector processor. Processor example:
class KafkaTopicSelector(bspump.Processor): def process(self, context, event): if event.get("weight") > 10: context["kafka_topic"] = "heavy" else: context["kafka_topic"] = "light" return event Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key']. If kafka_key is not provided, key defaults to None.
- KafkaSink.__init__()[source]
Initilizes the parameters that are passed to the Sink class.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
key_serializer : , default = None
id : , default = None
config : , default = None
Key Filter Kafka
- class KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]
Bases:
Processor
KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.
Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.
Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.
KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.
- KafkaKeyFilter.__init__()[source]
Initializes variables
Parameters
- appApplication
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.
- pipelinePipeline
Name of the Pipeline.
- keysbytes
keys used to filter out events from the event stream.
id : , default = None
- configJSON, default = None
configuration file in JSON
Batch Sink
- class KafkaBatchSink(app, pipeline, connection, key_serializer=None, id=None, config=None)[source]
Bases:
KafkaSink
KafkaBatchSink is a sink processor that forwards the event to an Apache Kafka specified by a KafkaConnection object in batches.
It is a proof of concept sink, that allows faster processing of events in the pipeline, but does not guarantee processing of all events in situations when the pump is closed etc.
There is a work to be done with cooperation with aiokafka, so the send_and_wait method works properly and is able to send events in batches.
- KafkaBatchSink.__init__()[source]
Initializing parameters passed to the BatchSink class.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information needed to creates connection.
key_serializer : ,default None
id : , default = None
- configJSON, default = None
Configuration file with additional information.
Topic Initializer
- class KafkaTopicInitializer(app, connection, id: Optional[str] = None, config: Optional[dict] = None)[source]
Bases:
Configurable
KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.
KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.
Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()
- KafkaTopicInitializer.__init__()[source]
Initializes the parameters passed to the class.
Parameters
- appApplication
Name of the Application.
- connectionConnection
Information needed to create a connection.
id: typing.Optional[str] = None :
- config: dict = NoneJSON
configuration file containing important information.
- KafkaTopicInitializer.include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]
Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.
Parameters
:
- topic_config, default= None
Topic config file.
kafka_component : , default= None
- pipeline, default= None
Name of the Pipeline.
- config_file, default= None
Configuration file.
- KafkaTopicInitializer.include_topics_from_file(topics_file: str)[source]
Includes topics from a topic file.
Parameters
- topics_file:strstr
Name of a topic file we wanted to include.
Elastic Search
Elastic Search is a Analytics and full-text search engine. Commonly used for Application Performance Management mainly Analysis of Logs.
Source
ElasticSearchSource is using standard Elastic’s search API to fetch data.
configs
index - Elastic’s index (default is ‘index-
*
’).scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
specific pamameters
paging - boolean (default is True)
request_body - dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
- class ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]
Bases:
TriggerSource
Description:
- ElasticSearchSource.__init__()[source]
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
paging : ?, default = True
- idID, default = None
ID
- configJSON/dict, default = None
Configuration file with additional information.
ElasticSearchAggsSource is used for Elastic’s search aggregations.
configs
index: - Elastic’s index (default is ‘index-
*
’).specific pamameters
request_body dictionary described by Elastic’s doc: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
Default is:
default_request_body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
},
}
- class ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]
Bases:
TriggerSource
Description:
- ElasticSearchAggsSource.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
- idID, default = None
ID info
- configJSON/dict, default = None
configuration file with additional information.
- async ElasticSearchAggsSource.process_aggs(path, aggs_name, aggs)[source]
Description:
Parameters
path :
aggs_name :
agss :
- async ElasticSearchAggsSource.process_buckets(path, parent, buckets)[source]
Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process
Parameters
path :
parent :
buckets :
ElasticSearch Connection
ElasticSearchConnection allows your ES source, sink or lookup to connect to ElasticSearch instance
usage:
# adding connection to PumpService
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
)
# pass connection name ("ESConnection" in our example) to relevant BSPump's object:
self.build(
bspump.kafka.KafkaSource(app, self, "KafkaConnection"),
bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
)
- class ElasticSearchConnection(app, id=None, config=None)[source]
Bases:
Connection
Description:
Sample Config
- url‘’http’://{ip/localhost}:{port}’
URL of the source. Could be multi-URL. Each URL should be separated by ‘;’ to a node in ElasticSearch cluster.
- username‘string’ , default = ‘ ‘
Used when authentication is required
- password‘string’, default = ‘ ‘
Used when authentication is required
- loader_per_urlint, default = 4
Number of parallel loaders per URL.
- output_queue_max_sizeint, default = 10
Maximum queue size.
- bulk_out_max_size? * ? * ?, default = 12 * 1024 * 1024
??
- timeoutint, default = 300
Timout value.
- fail_log_max_sizeint, default = 20
Maximum size of failed log messages.
- precise_error_handlingbool, default = False
If True all Errors will be logged, If false soft errors will be omitted in the Logs.
- ElasticSearchConnection.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application
- idID, default= None
ID
- configJSON or dict, default= None
configuration file with additional information for the methods.
- ElasticSearchConnection.get_session()[source]
Returns current Client Session Authentication and Loop
- Returns
aiohttp.ClientSession(auth=self._auth, loop=self.Loop)
- ElasticSearchConnection.consume(index, data_feeder_generator, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>)[source]
Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method.
Parameters
index :
data_feeder_generator :
- bulk_class=ElasticSearchBulk :
creates a instance of the ElasticSearchBulk class
- ElasticSearchBulk.consume(data_feeder_generator)[source]
Appends all items in data_feeder_generator to Items list. Consumer also resets Aging and Capacity.
Parameters
- data_feeder_generatorlist
list of our data that will be passed to a generator and later Uploaded to ElasticSearch.
- Returns
self.Capacity <= 0
- async ElasticSearchBulk.upload(url, session, timeout)[source]
Uploads data to Elastic Search.
Parameters
- urlstring
Uses URL from config to connect to ElasticSearch Rest API.
- session?
?
- timeoutint
uses timout value from config. Value of time for how long we want to be connected to ElasticSearch.
- Returns
?
- ElasticSearchBulk.partial_error_callback(response_items)[source]
Description: When an upload to ElasticSearch fails for error items (document could not be inserted), this callback is called.
Parameters
response_items :
- Parameters
response_items – list with dict items: {“index”: {“_id”: …, “error”: …}}
- ElasticSearchBulk.full_error_callback(bulk_items, return_code)[source]
Description: When an upload to ElasticSearch fails b/c of ElasticSearch error, this callback is called.
Parameters
- bulk_itemslist
list with tuple items: (_id, data)
- return_code :
ElasticSearch return code
- Returns
False if the bulk is to be resumbitted again
Lookup
- class ElasticSearchLookup(app, connection, id=None, config=None, cache=None, lazy=False)[source]
Bases:
MappingLookup
,AsyncLookupMixin
The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits.
configs
index - Elastic’s index
key - field name to match
scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
The ElasticSearchLookup can be then located and used inside a custom enricher: class AsyncEnricher(bspump.Generator): def __init__(self, app, pipeline, id=None, config=None): super().__init__(app, pipeline, id, config) svc = app.get_service("bspump.PumpService") self.Lookup = svc.locate_lookup("MySQLLookup") async def generate(self, context, event, depth): if 'user' not in event: return None info = await self.Lookup.get(event['user']) # Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth)
- ElasticSearchLookup.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
cache : ?,default= None
lazy : ?, default= None
- async ElasticSearchLookup.get(key)[source]
Obtain the value from lookup asynchronously.
Parameters
key : ?
- Returns
value
- ElasticSearchLookup.build_find_one_query(key) dict [source]
Override this method to build your own lookup query
Parameters
key : ?
- Returns
Default single-key query
- classmethod ElasticSearchLookup.construct(app, definition: dict)[source]
Constructs config, id, and connection based on config.
Parameters
- appApplication
Name of the Application.
- definition:dictDefinition
Definition containing information about certain variables.
- Returns
cls(app, newid, connection, config)
Sink
- class ElasticSearchSink(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]
Bases:
Sink
ElasticSearchSink allows you to insert events into ElasticSearch through POST requests
The following attributes can be passed to the context and thus override the default behavior of the sink:
es_index (STRING): ElasticSearch index name
data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is:
- def data_feeder_create_or_index(event):
_id = event.pop(“_id”, None)
- if _id is None:
yield b’{“create”:{}}
- ‘
- else:
- yield orjson.dumps(
{“index”: {“_id”: _id}}, option=orjson.OPT_APPEND_NEWLINE
)
yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)
- ElasticSearchSink.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
bulk_class=ElasticBulk :
data_feeder=data_feeder_create_or_index :
- data_feeder.data_feeder_create_or_index()
Creates an index.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_create()
Creates a data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_index()
Description:
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_update()
Updates data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- data_feeder.data_feeder_delete()
Deletes data feeder.
Parameters
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
Files
File ABC Source
- class FileABCSource(app, pipeline, id=None, config=None)[source]
Bases:
TriggerSource
Description:
- FileABCSource.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idID, default = None
ID
- configJSON, default = None
Configuration file with additional information.
- async FileABCSource.simulate_event()[source]
The simulate_event method should be called in read method after a file line has been processed.
It ensures that all other asynchronous events receive enough time to perform their tasks. Otherwise, the application loop is blocked by a file reader and no other activity makes a progress.
File Block Source
- class FileBlockSource(app, pipeline, id=None, config=None)[source]
Bases:
FileABCSource
Description:
File Block Sink
- class FileBlockSink(app, pipeline, id=None, config=None)[source]
Bases:
Sink
Description:
** Config Defaults **
path : ‘’
mode : wb
flags : O_CREAT
- FileBlockSink.__init__()[source]
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline.
- idID, default = None
ID
- configJSON, default = None
Configuration file with additional information.
File csv Source
- class FileCSVSource(app, pipeline, fieldnames=None, id=None, config=None)[source]
Bases:
FileABCSource
Description:
File csv Sink
- class FileCSVSink(app, pipeline, id=None, config=None)[source]
Bases:
Sink
Description:
** Default Config**
path : ‘’
dialect : ‘excel’
delimiter : ‘,’
doublequote : True
escapechar : “”
lineterminator : os.linesep
quotechar : ‘”’
quoting : csv.QUOTE_MINIMAL
skipinitialspace : False
strict : False
- FileCSVSink.get_file_name(context, event)[source]
Description: Override this method to gain control over output file name.
Parameters
context :
event :
- Returns
path of context and config
- FileCSVSink.writer(f, fieldnames)[source]
Description:
Parameters
f :
- fieldnamesfile
Name of the file.
- Returns
dialect and fieldnames
File json Source
- class FileJSONSource(app, pipeline, id=None, config=None)[source]
Bases:
FileABCSource
Description: This file source is optimized to load even large JSONs from a file and parse that. The loading & parsing is off-loaded to the worker thread so that it doesn’t block the IO loop.
File line Source
- class FileLineSource(app, pipeline, id=None, config=None)[source]
Bases:
FileABCSource
Description:
- FileLineSource.__init__()[source]
Description:
Parameters
- app: Application
Name of the Application
- pipelinePipeline
Name of the Pipeline
id : ID, default = None
- configJSON, default = None
Configuration file with additional information
- class FileMultiLineSource(app, pipeline, separator, id=None, config=None)[source]
Bases:
FileABCSource
Description: Read file line by line but try to join multi-line events by separator. Separator is a (fixed) pattern that should present at the begin of the line, if it is a new event.
Example: <133>1 2018-03-24T02:37:01+00:00 machine program 22068 - Start of the multiline event
2nd line of the event 3rd line of the event
<133>1 2018-03-24T02:37:01+00:00 machine program 22068 - New event
The separatpr is ‘<’ string in this case
- FileMultiLineSource.__init__()[source]
Description:
Parameters
- app: Application
Name of the Application
- pipelinePipeline
Name of the Pipeline
separator :
id : ID, default = None
- configJSON, default = None
Configuration file with additional information
Lookup Provider
- class FileBatchLookupProvider(lookup, url, id=None, config=None)[source]
Bases:
LookupBatchProviderABC
Loads lookup data from a file on local filesystem.
InfluxDB
Connection
- class InfluxDBConnection(app, id=None, config=None)[source]
Bases:
Connection
Description: InfluxDBConnection serves to connect BSPump application with an InfluxDB database. The InfluxDB server is accessed via URL, and the database is specified using the db parameter in the configuration.
app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.influxdb.InfluxDBConnection(app, "InfluxConnection1") ) **Config Default** url : http://localhost:8086/ db : mydb output_queue_max_size : 10 output_bucket_max_size : 1000 * 1000 timout : 30 retry_enabled : False response_codes_to_retry : 404, 502, 503, 504
Sink
- class InfluxDBSink(app, pipeline, connection, id=None, config=None)[source]
Bases:
Sink
Description: InfluxDBSink is a sink processor, that stores the event into an InfluxDB database specified in the InfluxDBConnection object.
class SamplePipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.socket.TCPStreamSource(app, self, config={'port': 7000}), bspump.influxdb.InfluxDBSink(app, self, "InfluxConnection1") )
IPC and Socket
Datagram
Protocol
- class SourceProtocolABC(app, pipeline, config)[source]
Bases:
object
Source protocol is a handler class, that basically gets the socket (in reader) and extract the payload from it in a way that is conformant to expected protocol.
That is happening in the handle() method. The output is to be shipped to source.process() method.
- class LineSourceProtocol(app, pipeline, config)[source]
Bases:
SourceProtocolABC
Description: Basically readline() for reading lines from a socket.
Stream
- class Stream(loop, socket, outbound_queue=None)[source]
Bases:
object
Description: This object represent a client connection. It is unencrypted STREAM socket.
Steam Server Source
Stream Client Sink
FTP
connection
source
RabbitMQ / AMQP
Source
- class AMQPSource(app, pipeline, connection, id=None, config=None)[source]
Bases:
Source
Description:
- AMQPSource.__init__()[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
- class AMQPFullMessageSource(app, pipeline, connection, id=None, config=None)[source]
Bases:
AMQPSource
Description:
Sink
Connection
- class AMQPConnection(app, id=None, config=None)[source]
Bases:
Connection
- AMQPConnection.__init__()[source]
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.