Bitswan Tutorials
in this series of tutorials we will walk you through basic and more advanced examples and demos to initiate your adventure with BSPump.
You will learn more about the BSPump architecture and how each component works. However, before you can start on your journey you should know basics of python and be able to set up your programming environment.
Prerequisites
Here are some quick tutorials that will help you installing python and BSPump module using package installer for Python called pip.
Installing python
Firstly you should check whether you don’t already have python installed. Open your command line or terminal and type:
C:/> python --version
> Python 3.8.4
if your python version is lower than 3.8 check Python.org
If you are a complete beginner to python or you want more information about python check out the Python tutorial
Installing BSPump module
To install BSPump module:
pip install asab bspump
or alternatively using
pip install git+https://github.com/LibertyAces/BitSwanPump-BlankApp.git
If you dont have installed pip type:
python get-pip.py
To check the version use.
pip --version
Have you managed to install everything? Then you are ready for creating your first BSPump.
BSPump Highlevel architecture
BSPump is made from several components which are going to be explained in this tutorial. As you probably know, Bitswan is a real-time stream processor. To be able to process and work with large amount of data, BSpump uses so-called Event Stream Processing, data is propagated through a data pipeline in Events. Event is a single data point with a timestamp. To handle these events Pipeline has special components that be compatible with each other. .Therefore, each pipeline is made from several vital compoents: source, processor and sink. However, for the pipeline to work Bitswan uses BSPump Service to handle and register connetions, pipelines etc.
Firstly, we will walk you through each of components and its functionality, so you can later build your own pipeline. Doesn’t that sounds cool?
BSpump Service
Service is part where pipelines and connections are registered.
We will go through the following code and explain each part
import asab
from .pipeline import TCPPipeline
class BlankService(asab.Service):
def __init__(self, app, service_name="blank.BlankService"):
super().__init__(app, service_name)
async def initialize(self, app):
svc = app.get_service("bspump.PumpService")
# Create and register all connections here
# Create and register all matrices here
# Create and register all lookups here
# Create and register all pipelines here
self.TCPPipeline = TCPPipeline(app, "TCPPipeline")
svc.add_pipeline(self.TCPPipeline)
await svc.initialize(app)
async def get_data(self, message="be"):
await self.TCPPipeline.process(message)
return "Check stdout"
In this example we
Connection
To be able to connect to a data source you have to make a connection. connection is usually done in Source class and then registered in service class.
Pipeline
pipeline
import sys
import bspump
import bspump.common
import bspump.socket
from .processor import ShakespeareanEnricher
class TCPPipeline(bspump.Pipeline):
"""
To test this pipeline, use:
socat STDIO TCP:127.0.0.1:8888
or visit http://localhost:8080/blank?message=die
"""
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.socket.TCPSource(app, self, config={"host": "0.0.0.0", "port": 8888}),
ShakespeareanEnricher(app, self),
bspump.common.PPrintSink(app, self, stream=sys.stderr)
)
Lookup
Source
Description about source. What is it ..
Streaming Source
Streaming Source enables events to enter in so-called stream. Events flow through source in real time manner as they are being delivered by the input technology.
Following technologies can be used as a streaming source
Kafka
Elastic Search
RabbitMQ
Elastic Search Source
TODO
Description
Example
Explanation
Kafka Source
TODO
Description
Example
Explanation
Trigger Source
Unlike streaming source, Trigger Source is used when we need to pump data from SQL-like databases or files. They have to be triggered by an external event or a repeating timer (requesting JSON data from APIs every 10 minutes).
Trigger Source can be used for:
HTTP client/server
SQL query
TCP
Files: csv, json etc.
TCP source
Description
TCP Source can be to obtain data from peer to peer connection using TCP.
Use case
TODO
Example
class EchoPipeline(bspump.Pipeline):
'''
To test this pipeline, use:
socat STDIO TCP:127.0.0.1:8083
'''
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.ipc.StreamServerSource(app, self, config={'address': '0.0.0.0 8083'}),
)
HTTP Client Source
Description
HTTP Client Source gets data from a specified API URL.
Use case
if you need pump data from a single API URL you can use this Source.
Example
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.HTTPClientSource(app, self, config={
'url': '<<API URL>>'
}).on(<<Here you will use some type of trigger>>),
)
The API URL can be any API you wish to get data from.
You will need to specify your Trigger type. You can choose your Trigger here : TODO <<reference>>
Note
Full functional example with this source can be found here coindesk
MySQL
Description
Example
Explanation
JSON File
Description
Example
Explanation
CSV File
Description
Example
Explanation
Processor
Processor
import bspump
class ShakespeareanEnricher(bspump.Processor):
def process(self, context, event):
if isinstance(event, bytes):
event = event.decode("utf-8").replace('\r', '').replace('\n', '')
return 'To {0}, or not to {0}?'.format(event)
Sink
Sink is the part responsible for the output of the data to a database, standard output in your computer on into another pipeline.
PPrintSink
In this example we are going to use PPrintSink which prints the data from pipeline to stdout or any other stream that is connected to the pipeline.
To use sink in your pipeline
self.build(
bspump.common.PPrintSink(app, self, stream=sys.stderr)
)
PPrintSink class is added to your pipeline. It should be the last part of the pipeline for the pipeline to work correctly.
to further explain the , bspump.common. is the part where you specify the path to the class PPrintSink is the name of the class. In the parentheses you can specify the output stream. If none is specified stdout is used.
code
class PPrintSink(Sink):
"""
Description:
|
"""
def __init__(self, app, pipeline, id=None, config=None, stream=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id, config)
self.Stream = stream if stream is not None else sys.stdout
The whole code can be found at BitSwan BlankApp