Basics

Basics covers the most fundamental components of a BSPump. We will start with the “backbone” of the BSPump, which is called a “pipeline”.

Pipeline

The pipeline class is responsible for construction of the BSPump pipeline itself. Its methods enable us to maintain a working lifecycle of the system.

Pipeline is responsible for data processing in BSPump. Individual Pipeline objects work asynchronously and independently of one another (provided dependence is not defined explicitly – for instance on a message source from some other pipeline) and can be triggered in unlimited numbers. Each Pipeline is usually in charge of one concrete task.

Pipeline has three main components:

Pipeline diagram

Source connects different data sources with the Pipeline to be processed

Multiple sources

A Pipeline can have multiple sources. They are simply passed as a list of sources to a Pipeline build() method.

class MyPipeline(bspump.Pipeline):

   def __init__(self, app, pipeline_id):
      super().__init__(app, pipeline_id)
      self.build(
         [
            MySource1(app, self),
            MySource2(app, self),
            MySource3(app, self),
         ]
         bspump.common.NullSink(app, self),
      )
:meta private:

The main component of the BSPump architecture is a so-called Processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behaviour patterns.

Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each Pipeline has its unique task.

Processors are passed as a list of Processors to a Pipeline build() method

class MyPipeline(bspump.Pipeline):

   def __init__(self, app, pipeline_id):
      super().__init__(app, pipeline_id)
      self.build(
         [
            MyProcessor1(app, self),
            MyProcessor2(app, self),
            MyProcessor3(app, self),
         ]
         bspump.common.NullSink(app, self),
      )
:meta private:

Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.

class Pipeline(app, id=None, config=None)[source]

Bases: ABC, Configurable

Description: Pipeline is …

An example of The Pipeline construction:

class MyPipeline(bspump.Pipeline):

       def __init__(self, app, pipeline_id):
              super().__init__(app, pipeline_id)
              self.build(
                     [
                        MySource(app, self),
                        MyProcessor(app, self),
                        MyProcessor2(app, self),
                     ]
                     bspump.common.NullSink(app, self),
              )

Pipeline construction

The following are the core methods of the pipeline.

Pipeline.build(source, *processors)[source]

This method enables to add sources, Processors, and sink to create the structure of the Pipeline.

Parameters

sourcestr

ID of a source.

*processorsstr, list optional

ID of Processor or list of IDs.

Pipeline.set_source(source)[source]

Sets a specific source or list of sources to the Pipeline.

Parameters

sourcestr, list optional

ID of a source.

If a list of sources is passed to the method, it adds the entire list of sources to the Pipeline.

Pipeline.append_processor(processor)[source]

Adds a Processors to the current Pipeline.

Parameters

processorstr

ID of a processor.

Hint

The Generator can be added by using this method. It requires a depth parameter.

Pipeline.remove_processor(processor_id)[source]

Removes a specific processor from the Pipeline.

Parameters

processor_idstr

ID of a processor.

Returns

Error when processor is not found.

Pipeline.insert_before(id, processor)[source]

Inserts the Processor into the Pipeline in front of another processor specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of the processor in front of which will be inserted the new processor.

Returns

True on success. False if ID was not found.

Pipeline.insert_after(id, processor)[source]

Inserts the Processor into the Pipeline behind another Processors specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of a processor after which we insert our processor.

Returns

True if successful. False if ID was not found.

Pipeline.iter_processors()[source]

Uses python generator routine that iterates through all Processors in the Pipeline.

Yields

A Processor from a list in the Pipeline.

Other Pipeline Methods

The additional methods below bring more features to the pipeline. However, many of them are very important and almost necessary.

Pipeline.build(source, *processors)[source]

This method enables to add sources, Processors, and sink to create the structure of the Pipeline.

Parameters

sourcestr

ID of a source.

*processorsstr, list optional

ID of Processor or list of IDs.

Pipeline.iter_processors()[source]

Uses python generator routine that iterates through all Processors in the Pipeline.

Yields

A Processor from a list in the Pipeline.

Other pipeline methods

Pipeline.time()[source]

Returns correct time.

Returns

App.time()

Hint

More information in the ASAB documentation in UTC Time.

Pipeline.get_throttles()[source]

Returns components from Pipeline that are throttled.

Returns

self._throttles Return list of throttles.

Pipeline.is_error()[source]

Returns False when there is no error, otherwise it returns True.

Returns

self._error is not None.

Pipeline.set_error(context, event, exc)[source]

When called with exc is None, it resets error (aka recovery).

When called with exc, it sets exceptions for soft errors.

Parameters

contexttype?

Context of an error.

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

excException.

Python default exceptions.

Pipeline.handle_error(exception, context, event)[source]

Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.

Parameters

exceptionException

Used for setting up a custom Exception.

contextinformation

Additional information can be passed.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Returns

False for hard errors (stop the Pipeline processing). True for soft errors that will be ignored.

Example:

class SampleInternalPipeline(bspump.Pipeline):

        def __init__(self, app, pipeline_id):
                super().__init__(app, pipeline_id)

                self.build(
                        bspump.common.InternalSource(app, self),
                        bspump.common.JSONParserProcessor(app, self),
                        bspump.common.PPrintSink(app, self)
                )

        def handle_error(self, exception, context, event):
                if isinstance(exception, json.decoder.JSONDecodeError):
                        return True
                return False

Links this Pipeline with an ancestral Pipeline. This is needed e. g. for a propagation of the throttling from child Pipelines back to their ancestors. If the child Pipeline uses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestral Pipeline, so that its source may block incoming events until the internal queue is empty again.

Parameters

ancestral_pipelinestr

ID of a Pipeline that will be linked.

Unlinks an ancestral Pipeline from this Pipeline.

Parameters

ancestral_pipelinestr

ID of a ancestral Pipeline that will be unlinked.

Pipeline.throttle(who, enable=True)[source]

Enables throttling method for a chosen pipeline and its ancestral pipelines,x if needed.

Parameters

whoID of a processor.

Name of a processor that we want to throttle.

enablebool, defualt True

When True, content of argument ‘who’ is added to _throttles list.

async Pipeline.ready()[source]

Checks if the Pipeline is ready. The method can be used in source: await self.Pipeline.ready().

Pipeline.is_ready()[source]

This method is a check up of the event in the Event class.

Returns

_ready.is_set().

Pipeline.inject(context, event, depth)[source]

Injects method serves to inject events into the Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.

Parameters

contextstring

Information propagated through the Pipeline.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

depthint

Level of depth.

Note

For normal operations, it is highly recommended to use process method instead.

async Pipeline.process(event, context=None)[source]

Process method serves to inject events into the Pipeline’s depth 0, while incrementing the event in metric.

Parameters

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

contextstr, default None

You can add additional information needed for work with event streaming.

Hint

This is recommended way of inserting events into a Pipeline.

Pipeline.create_eps_counter()[source]

Creates a dictionary with information about the Pipeline. It contains eps (events per second), warnings and errors.

Returns

self.MetricsService Creates eps counter using MetricsService.

Note

EPS counter can be created using this method or dicertly by using MatricsService method.

Pipeline.ensure_future(coro)[source]

You can use this method to schedule a future task that will be executed in a context of the Pipeline. The Pipeline also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block the Pipeline via set_error().

Parameters

coro??

??

Hint

If the number of futures exceeds the configured limit, the Pipeline is throttled.


Pipeline.locate_source(address)[source]

Locates a sources based on its ID.

Parameters

addressstr

ID of the source.

Pipeline.locate_connection(app, connection_id)[source]

Finds a connection by ID.

Parameters

appApplication

Name of the Application.

connection_idstr

ID of connection we want to locate.

Returns

connection

Pipeline.locate_processor(processor_id)[source]

Finds a Processor by ID.

Parameters

processor_idstr

ID of a Processor.

Returns

processor


Pipeline.start()[source]

Starts the lifecycle of the Pipeline.

async Pipeline.stop()[source]

Gracefully stops the lifecycle of the Pipeline.

Source

class Source(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Source class is responsible for connecting to a source, and propagating events or other data from the source to the processors.

Source.__init__()[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

Source Construction

Source is an object designed to obtain data from a predefined input. The BSPump contains a lot of universally usable, specific source objects, which are capable of loading data from known data interfaces. The BitSwan product further expands these objects by adding source objects directly usable for specific cases of use in industry field given.

Each source represent a coroutine/Future/Task that is running in the context of the main loop. The coroutine method main() contains an implementation of each particular source.

Source MUST await a Pipeline ready state prior producing the event. It is acomplished by await self.Pipeline.ready() call.

class Source(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Source class is responsible for connecting to a source, and propagating events or other data from the source to the processors.

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async Source.process(event, context=None)[source]

This method is used to emit event into a Pipeline.

Parameters

event: Data with time stamp stored in any data type, usually JSON.

Message or information that is passed to the method and emitted into a Pipeline.

contextdefault None

Additional information.

If there is an error in the processing of the event, the Pipeline is throttled by setting the error and the exception raised.

:hint The source should catch this exception and fail gracefully.

Source.start(loop)[source]

Starts the Pipeline through the _main method, but if main method is implemented it starts the coroutine using main method instead.

Parameters

loop?

Contains the coroutines.

async Source.stop()[source]

Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.

Source.restart(loop)[source]

Restarts the loop of coroutines and returns result() method.

Parameters

loop??

Contains the coroutines.

async Source.main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

async Source.stopped()[source]

Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.

Example:

..code:: python

async def main(self):

#… initialize resources here

await self.stopped()

#… finalize resources here

Source.locate_address()[source]

Locates address of a Pipeline.

Returns

ID and ID of a Pipeline as a string.

classmethod Source.construct(app, pipeline, definition: dict)[source]

Can create a source based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Specification of a Pipeline.

definitiondict

Definition that is used to create a source.

Returns

cls(app, newid, config)

This is an abstract source class intended as a base for implementation of ‘cyclic’ sources such as file readers, SQL extractors etc. You need to provide a trigger class and implement cycle() method.

Trigger source will stop execution, when a Pipeline is cancelled (raises concurrent.futures.CancelledError). This typically happens when a program wants to quit in reaction to a on the signal.

You also may overload the main() method to provide additional parameters for a cycle() method.

async def main(self):
        async with aiohttp.ClientSession(loop=self.Loop) as session:
                await super().main(session)


async def cycle(self, session):
        session.get(...)
class TriggerSource(app, pipeline, id=None, config=None)[source]

Bases: Source

Description:


Returns

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

TriggerSource.time()[source]

Method used for measuring an accurate time.

Returns

App.time()

Hint

You can find more information about UTC Time in the ASAB documentation

TriggerSource.on()[source]

Sets a Trigger which is a method that waits for a given condition.

Parameters

triggerkeyword of a trigger

Given condition that.

Returns

Trigger.add(trigger)

async TriggerSource.main()[source]

Waits for Pipeline, triggers, and calls exceptions when the source is initiated.

Parameters

*args : ?

**kwags : ?


async TriggerSource.cycle()[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

TriggerSource.rest_get()[source]

Description:

Returns

Processor

The main component of the BSPump architecture is a so called processor. This object modifies, transforms and enriches events. Moreover, it is capable of calculating metrics and creating aggregations, detecting anomalies or react to known as well as unknown system behavior patterns.

Processors differ as to their functions and all of them are aligned according to a predefined sequence in pipeline objects. As regards working with data events, each pipeline has its own unique task.

class Processor(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Inherits from ProcessorBase.


__init__(app, pipeline, id=None, config=None)

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

Processor.time()

Accurate representation of a time in the Pipeline.

Returns

App.time()

classmethod Processor.construct()

Can construct a processor based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.

pipelinestr

Name of the Pipeline.

definitiondict

Set of instructions based on which processor can be constructed.

Returns

cls(app, pipeline, id=newid, config=config)


Processor.process()

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Processor.locate_address()

Returns an ID of a processor and a Pipeline.

Returns

ID of the Pipeline and self.Id.


Processor.rest_get()

Description:

Returns

Processor.__repr__()

Return repr(self).

Sink

Sink object serves as a final event destination within the pipeline given. Subsequently, the event is dispatched/written into the system by the BSPump.

class Sink(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Sink is basically a processor. It takes an event sends it to a database where it is stored.


__init__(app, pipeline, id=None, config=None)

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

Connection

class Connection(app, id=None, config=None)[source]

Bases: ABC, Configurable

Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source, processor and sink.


__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

Connection construction

Connection.time()[source]

Returns accurate time of the asynchronous process.

Hint

More information in the ASAB documentation in UTC Time.