Basics
Basics covers the most fundamental components of a BSPump. We will start with the “backbone” of the BSPump, which is called a “pipeline”.
Pipeline
The pipeline class is responsible for construction of the BSPump pipeline itself. Its methods enable us to maintain a working lifecycle of the system.
Pipeline is responsible for data processing in BSPump.
Individual Pipeline objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers.
Each Pipeline is usually in charge of one concrete task.
Pipeline has three main components:
Source connects different data sources with the Pipeline to be processed
Multiple sources
A Pipeline can have multiple sources.
They are simply passed as a list of sources to a Pipeline build() method.
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MySource1(app, self),
MySource2(app, self),
MySource3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
The main component of the BSPump architecture is a so-called Processor.
This object modifies, transforms and enriches events.
Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behaviour patterns.
Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects.
As regards working with data events, each Pipeline has its unique task.
Processors are passed as a list of Processors to a Pipeline build() method
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
[
MyProcessor1(app, self),
MyProcessor2(app, self),
MyProcessor3(app, self),
]
bspump.common.NullSink(app, self),
)
:meta private:
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
- class Pipeline(app, id=None, config=None)[source]
Bases:
ABC,ConfigurableDescription: Pipeline is …
An example of The
Pipelineconstruction:class MyPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( [ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ] bspump.common.NullSink(app, self), )
Pipeline construction
The following are the core methods of the pipeline.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors, and sink to create the structure of thePipeline.Parameters
- Pipeline.set_source(source)[source]
Sets a specific source or list of sources to the
Pipeline.Parameters
- sourcestr, list optional
ID of a source.
If a list of sources is passed to the method, it adds the entire list of sources to the
Pipeline.
- Pipeline.append_processor(processor)[source]
Adds a
Processorsto the currentPipeline.Parameters
- processorstr
ID of a
processor.
- Hint
The Generator can be added by using this method. It requires a depth parameter.
- Pipeline.remove_processor(processor_id)[source]
Removes a specific
processorfrom thePipeline.Parameters
- processor_idstr
ID of a
processor.
- Returns
Error when
processoris not found.
- Pipeline.insert_before(id, processor)[source]
Inserts the
Processorinto thePipelinein front of anotherprocessorspecified by ID.Parameters
- idstr
ID of a
processorthat we want to insert.- processorstr
Name of the
processorin front of which will be inserted the newprocessor.
- Returns
True on success. False if ID was not found.
- Pipeline.insert_after(id, processor)[source]
Inserts the
Processorinto thePipelinebehind anotherProcessorsspecified by ID.Parameters
- idstr
ID of a processor that we want to insert.
- processorstr
- Returns
True if successful. False if ID was not found.
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processorsin thePipeline.- Yields
A Processor from a list in the
Pipeline.
Other Pipeline Methods
The additional methods below bring more features to the pipeline. However, many of them are very important and almost necessary.
- Pipeline.build(source, *processors)[source]
This method enables to add sources,
Processors, and sink to create the structure of thePipeline.Parameters
- Pipeline.iter_processors()[source]
Uses python generator routine that iterates through all
Processorsin thePipeline.- Yields
A Processor from a list in the
Pipeline.
Other pipeline methods
- Pipeline.time()[source]
Returns correct time.
- Returns
App.time()
- Hint
More information in the ASAB documentation in UTC Time.
- Pipeline.get_throttles()[source]
Returns components from
Pipelinethat are throttled.- Returns
self._throttles Return list of throttles.
- Pipeline.is_error()[source]
Returns False when there is no error, otherwise it returns True.
- Returns
self._error is not None.
- Pipeline.set_error(context, event, exc)[source]
When called with exc is None, it resets error (aka recovery).
When called with exc, it sets exceptions for soft errors.
Parameters
- contexttype?
Context of an error.
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- excException.
Python default exceptions.
- Pipeline.handle_error(exception, context, event)[source]
Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.
Parameters
- exceptionException
Used for setting up a custom Exception.
- contextinformation
Additional information can be passed.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Returns
False for hard errors (stop the
Pipelineprocessing). True for soft errors that will be ignored.
Example:
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def handle_error(self, exception, context, event): if isinstance(exception, json.decoder.JSONDecodeError): return True return False
- Pipeline.link(ancestral_pipeline)[source]
Links this
Pipelinewith an ancestralPipeline. This is needed e. g. for a propagation of the throttling from childPipelinesback to their ancestors. If the childPipelineuses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestralPipeline, so that its source may block incoming events until the internal queue is empty again.Parameters
- ancestral_pipelinestr
ID of a
Pipelinethat will be linked.
- Pipeline.unlink(ancestral_pipeline)[source]
Unlinks an ancestral
Pipelinefrom thisPipeline.Parameters
- ancestral_pipelinestr
ID of a ancestral
Pipelinethat will be unlinked.
- Pipeline.throttle(who, enable=True)[source]
Enables throttling method for a chosen
pipelineand its ancestralpipelines,x if needed.Parameters
- async Pipeline.ready()[source]
Checks if the
Pipelineis ready. The method can be used in source: await self.Pipeline.ready().
- Pipeline.is_ready()[source]
This method is a check up of the event in the Event class.
- Returns
_ready.is_set().
- Pipeline.inject(context, event, depth)[source]
Injects method serves to inject events into the
Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.Parameters
- contextstring
Information propagated through the
Pipeline.- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- depthint
Level of depth.
- Note
For normal operations, it is highly recommended to use process method instead.
- async Pipeline.process(event, context=None)[source]
Process method serves to inject events into the
Pipeline’s depth 0, while incrementing the event in metric.Parameters
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- contextstr, default None
You can add additional information needed for work with event streaming.
- Hint
This is recommended way of inserting events into a
Pipeline.
- Pipeline.create_eps_counter()[source]
Creates a dictionary with information about the
Pipeline. It contains eps (events per second), warnings and errors.- Returns
self.MetricsService Creates eps counter using MetricsService.
- Note
EPS counter can be created using this method or dicertly by using MatricsService method.
- Pipeline.ensure_future(coro)[source]
You can use this method to schedule a future task that will be executed in a context of the
Pipeline. ThePipelinealso manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block thePipelinevia set_error().Parameters
- coro??
??
- Hint
If the number of futures exceeds the configured limit, the
Pipelineis throttled.
- Pipeline.locate_source(address)[source]
Locates a sources based on its ID.
Parameters
- addressstr
ID of the source.
- Pipeline.locate_connection(app, connection_id)[source]
Finds a connection by ID.
Parameters
- appApplication
Name of the Application.
- connection_idstr
ID of connection we want to locate.
- Returns
connection
Source
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
ConfigurableSource class is responsible for connecting to a source, and propagating events or other data from the source to the
processors.
- Source.__init__()[source]
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
Source Construction
Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.
Each source represent a coroutine/Future/Task that is running in the context of the main loop.
The coroutine method main() contains an implementation of each particular source.
Source MUST await a Pipeline ready state prior producing the event.
It is acomplished by await self.Pipeline.ready() call.
- class Source(app, pipeline, id=None, config=None)[source]
Bases:
ConfigurableSource class is responsible for connecting to a source, and propagating events or other data from the source to the
processors.- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- async Source.process(event, context=None)[source]
This method is used to emit event into a
Pipeline.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipelineis throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- Source.start(loop)[source]
Starts the
Pipelinethrough the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async Source.stop()[source]
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Source.restart(loop)[source]
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async Source.main()[source]
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async Source.stopped()[source]
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- Source.locate_address()[source]
Locates address of a
Pipeline.- Returns
ID and ID of a
Pipelineas a string.
- classmethod Source.construct(app, pipeline, definition: dict)[source]
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline Specification of a
Pipeline.- definitiondict
Definition that is used to create a source.
- Returns
cls(app, newid, config)
This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc.
You need to provide a trigger class and implement cycle() method.
Trigger source will stop execution, when a Pipeline is cancelled (raises concurrent.futures.CancelledError).
This typically happens when a program wants to quit in reaction to a on the signal.
You also may overload the main() method to provide additional parameters for a cycle() method.
async def main(self):
async with aiohttp.ClientSession(loop=self.Loop) as session:
await super().main(session)
async def cycle(self, session):
session.get(...)
- class TriggerSource(app, pipeline, id=None, config=None)[source]
Bases:
SourceDescription:
- Returns
- __init__(app, pipeline, id=None, config=None)[source]
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- TriggerSource.time()[source]
Method used for measuring an accurate time.
- Returns
App.time()
- Hint
You can find more information about UTC Time in the ASAB documentation
- TriggerSource.on()[source]
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns
Trigger.add(trigger)
Processor
The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.
Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.
- class Processor(app, pipeline, id=None, config=None)[source]
Bases:
ProcessorBaseInherits from ProcessorBase.
- classmethod Processor.construct()
Can construct a
processorbased on a specific definition. For example, a JSON file.Parameters
- appApplication
Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- pipelinestr
Name of the
Pipeline.- definitiondict
Set of instructions based on which
processorcan be constructed.
- Returns
cls(app, pipeline, id=newid, config=config)
- Processor.process()
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Processor.locate_address()
Returns an ID of a
processorand aPipeline.- Returns
ID of the
Pipelineand self.Id.
- Processor.rest_get()
Description:
- Returns
- Processor.__repr__()
Return repr(self).
Sink
Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.
Connection
- class Connection(app, id=None, config=None)[source]
Bases:
ABC,ConfigurableConnection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processorand sink.- __init__(app, id=None, config=None)[source]
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.