Basics
Basics covers the most fundamental components of a BSPump. We will start with the “backbone” of the BSPump, which is called a “pipeline”.
Pipeline
The pipeline class is responsible for construction of the BSPump pipeline itself. Its methods enable us to maintain a working lifecycle of the system.
Pipeline
is responsible for data processing in BSPump.
Individual Pipeline
objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers.
Each Pipeline
is usually in charge of one concrete task.
Pipeline has three main components:
Source connects different data sources with the Pipeline
to be processed
Multiple sources
A Pipeline
can have multiple sources.
They are simply passed as a list of sources to a Pipeline
build() method.
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MySource1(app, self),
MySource2(app, self),
MySource3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
The main component of the BSPump architecture is a so-called Processor
.
This object modifies, transforms and enriches events.
Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behaviour patterns.
Processors
differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects.
As regards working with data events, each Pipeline
has its unique task.
Processors
are passed as a list of Processors
to a Pipeline
build() method
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MyProcessor1(app, self),
MyProcessor2(app, self),
MyProcessor3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
- class Pipeline(app, id=None, config=None)[source]
Bases:
ABC
,Configurable
Description: Pipeline is …
An example of The
Pipeline
construction:class MyPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( [ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ] bspump.common.NullSink(app, self), )
Pipeline construction
The following are the core methods of the pipeline.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors
, and sink to create the structure of thePipeline
.Parameters
- Pipeline.set_source(source)[source]
Sets a specific source or list of sources to the
Pipeline
.Parameters
- sourcestr, list optional
ID of a source.
If a list of sources is passed to the method, it adds the entire list of sources to the
Pipeline
.
- Pipeline.append_processor(processor)[source]
Adds a
Processors
to the currentPipeline
.Parameters
- processorstr
ID of a
processor
.
- Hint
The Generator can be added by using this method. It requires a depth parameter.
- Pipeline.remove_processor(processor_id)[source]
Removes a specific
processor
from thePipeline
.Parameters
- processor_idstr
ID of a
processor
.
- Returns
Error when
processor
is not found.
- Pipeline.insert_before(id, processor)[source]
Inserts the
Processor
into thePipeline
in front of anotherprocessor
specified by ID.Parameters
- idstr
ID of a
processor
that we want to insert.- processorstr
Name of the
processor
in front of which will be inserted the newprocessor
.
- Returns
True on success. False if ID was not found.
- Pipeline.insert_after(id, processor)[source]
Inserts the
Processor
into thePipeline
behind anotherProcessors
specified by ID.Parameters
- idstr
ID of a processor that we want to insert.
- processorstr
- Returns
True if successful. False if ID was not found.
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processors
in thePipeline
.- Yields
A Processor from a list in the
Pipeline
.
Other Pipeline Methods
The additional methods below bring more features to the pipeline. However, many of them are very important and almost necessary.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors
, and sink to create the structure of thePipeline
.Parameters
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processors
in thePipeline
.- Yields
A Processor from a list in the
Pipeline
.
Other pipeline methods
- Pipeline.time()[source]
Returns correct time.
- Returns
App.time()
- Hint
More information in the ASAB documentation in UTC Time.
- Pipeline.get_throttles()[source]
Returns components from
Pipeline
that are throttled.- Returns
self._throttles Return list of throttles.
- Pipeline.is_error()[source]
Returns False when there is no error, otherwise it returns True.
- Returns
self._error is not None.
- Pipeline.set_error(context, event, exc)[source]
When called with exc is None, it resets error (aka recovery).
When called with exc, it sets exceptions for soft errors.
Parameters
- contexttype?
Context of an error.
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- excException.
Python default exceptions.
- Pipeline.handle_error(exception, context, event)[source]
Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.
Parameters
- exceptionException
Used for setting up a custom Exception.
- contextinformation
Additional information can be passed.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Returns
False for hard errors (stop the
Pipeline
processing). True for soft errors that will be ignored.
Example:
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def handle_error(self, exception, context, event): if isinstance(exception, json.decoder.JSONDecodeError): return True return False
- Pipeline.link(ancestral_pipeline)[source]
Links this
Pipeline
with an ancestralPipeline
. This is needed e. g. for a propagation of the throttling from childPipelines
back to their ancestors. If the childPipeline
uses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestralPipeline
, so that its source may block incoming events until the internal queue is empty again.Parameters
- ancestral_pipelinestr
ID of a
Pipeline
that will be linked.
- Pipeline.unlink(ancestral_pipeline)[source]
Unlinks an ancestral
Pipeline
from thisPipeline
.Parameters
- ancestral_pipelinestr
ID of a ancestral
Pipeline
that will be unlinked.
- Pipeline.throttle(who, enable=True)[source]
Enables throttling method for a chosen
pipeline
and its ancestralpipelines
,x if needed.Parameters
- async Pipeline.ready()[source]
Checks if the
Pipeline
is ready. The method can be used in source: await self.Pipeline.ready().
- Pipeline.is_ready()[source]
This method is a check up of the event in the Event class.
- Returns
_ready.is_set().
- Pipeline.inject(context, event, depth)[source]
Injects method serves to inject events into the
Pipeline
’s depth defined by the depth attribute. Every depth is interconnected with a generator object.Parameters
- contextstring
Information propagated through the
Pipeline
.- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- depthint
Level of depth.
- Note
For normal operations, it is highly recommended to use process method instead.
- async Pipeline.process(event, context=None)[source]
Process method serves to inject events into the
Pipeline
’s depth 0, while incrementing the event in metric.Parameters
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- contextstr, default None
You can add additional information needed for work with event streaming.
- Hint
This is recommended way of inserting events into a
Pipeline
.
- Pipeline.create_eps_counter()[source]
Creates a dictionary with information about the
Pipeline
. It contains eps (events per second), warnings and errors.- Returns
self.MetricsService Creates eps counter using MetricsService.
- Note
EPS counter can be created using this method or dicertly by using MatricsService method.
- Pipeline.ensure_future(coro)[source]
You can use this method to schedule a future task that will be executed in a context of the
Pipeline
. ThePipeline
also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block thePipeline
via set_error().Parameters
- coro??
??
- Hint
If the number of futures exceeds the configured limit, the
Pipeline
is throttled.
- Pipeline.locate_source(address)[source]
Locates a sources based on its ID.
Parameters
- addressstr
ID of the source.
- Pipeline.locate_connection(app, connection_id)[source]
Finds a connection by ID.
Parameters
- appApplication
Name of the Application.
- connection_idstr
ID of connection we want to locate.
- Returns
connection
Source
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
Configurable
Source class is responsible for connecting to a source, and propagating events or other data from the source to the
processors
.
- Source.__init__()[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
Source Construction
Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.
Each source represent a coroutine/Future/Task that is running in the context of the main loop.
The coroutine method main()
contains an implementation of each particular source.
Source MUST await a Pipeline
ready state prior producing the event.
It is acomplished by await self.Pipeline.ready() call.
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
Configurable
Source class is responsible for connecting to a source, and propagating events or other data from the source to the
processors
.- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
- async Source.process(event, context=None)[source]
This method is used to emit event into a
Pipeline
.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline
.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipeline
is throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- Source.start(loop)[source]
Starts the
Pipeline
through the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async Source.stop()[source]
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Source.restart(loop)[source]
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async Source.main()[source]
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async Source.stopped()[source]
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- Source.locate_address()[source]
Locates address of a
Pipeline
.- Returns
ID and ID of a
Pipeline
as a string.
- classmethod Source.construct(app, pipeline, definition: dict)[source]
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline
Specification of a
Pipeline
.- definitiondict
Definition that is used to create a source.
- Returns
cls(app, newid, config)
This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc.
You need to provide a trigger class and implement cycle()
method.
Trigger source will stop execution, when a Pipeline
is cancelled (raises concurrent.futures.CancelledError).
This typically happens when a program wants to quit in reaction to a on the signal.
You also may overload the main()
method to provide additional parameters for a cycle()
method.
async def main(self):
async with aiohttp.ClientSession(loop=self.Loop) as session:
await super().main(session)
async def cycle(self, session):
session.get(...)
- class TriggerSource(app, pipeline, id=None, config=None)[source]
Bases:
Source
Description:
- Returns
- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipeline
and Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline
.- idstr, default None
Name of a the
Pipeline
.- configcompatible config type , default None
Option for adding a configuration file.
- TriggerSource.time()[source]
Method used for measuring an accurate time.
- Returns
App.time()
- Hint
You can find more information about UTC Time in the ASAB documentation
- TriggerSource.on()[source]
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns
Trigger.add(trigger)
Processor
The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.
Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.
- class Processor(app, pipeline, id=None, config=None)[source]
Bases:
ProcessorBase
Inherits from ProcessorBase.
- classmethod Processor.construct()
Can construct a
processor
based on a specific definition. For example, a JSON file.Parameters
- appApplication
Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- pipelinestr
Name of the
Pipeline
.- definitiondict
Set of instructions based on which
processor
can be constructed.
- Returns
cls(app, pipeline, id=newid, config=config)
- Processor.process()
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Processor.locate_address()
Returns an ID of a
processor
and aPipeline
.- Returns
ID of the
Pipeline
and self.Id.
- Processor.rest_get()
Description:
- Returns
- Processor.__repr__()
Return repr(self).
Sink
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
Connection
- class Connection(app, id=None, config=None)[source]
Bases:
ABC
,Configurable
Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processor
and sink.- __init__(app, id=None, config=None)[source]
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.