Advanced

BitSwan Pump provides more advanced Processors that can be used in a pipeline

Generator

Generator object is used to generate one or multiple events in asynchronous way

and pass them to following processors in the pipeline. In the case of Generator, user overrides generate method, not process.

1.) Generator can iterate through an event to create (generate) derived ones and pass them to following processors.

Example of a custom Generator class with generate method:

        class MyGenerator(bspump.Generator):

                async def generate(self, context, event, depth):
                        for item in event.items():
                                self.Pipeline.inject(context, item, depth)

2.) Generator can in the same way also generate completely independent events, if necessary.
In this way, the generator processes originally synchronous events "out-of-band" e.g. out of the synchronous processing within the pipeline.

Specific implementation of the generator should implement the generate method to process events while performing
long running (asynchronous) tasks such as HTTP requests or SQL select.
The long running tasks may enrich events with relevant information, such as output of external calculations.

Example of generate method:
async def generate(self, context, event, depth):

        # Perform possibly long-running asynchronous operation
        async with aiohttp.ClientSession() as session:
                async with session.get("https://example.com/resolve_color/{}".format(event.get("color_id", "unknown"))) as resp:
                        if resp.status != 200:
                                return
                        new_event = await resp.json()

        # Inject a new event into a next depth of the pipeline
        self.Pipeline.inject(context, new_event, depth)
class Generator(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Description:


Generator.__init__()[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idstr, default = None

ID

configJSON, defualt = None

configuration file containing additional information.

Generator Construction

Generator.set_depth(depth)[source]

Description:

Parameters

depth : int

Generator.process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

async Generator.generate(context, event, depth)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

depth : int

Analyzer

This is general analyzer interface, which can be the basement of different analyzers.

analyze_on_clock enables analyzis by timer, which period can be set by analyze_period or Config[“analyze_period”].

In general, the Analyzer contains some object, where it accumulates some information about events. Events go through analyzer unchanged, the information is recorded by evaluate() function. The internal object sometimes should be processed and sent somewhere (e.g. another pipeline), this process can be done by analyze() function, which can be triggered by time, pubsub or externally

class Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Bases: Processor

Description:

Analyzer.__init__()[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

Analyzer Construction

Analyzer.start_timer(event_type)[source]

Description:

Analyzer

The main function, which runs through the analyzed object. Specific for each analyzer. If the analyzed object is Matrix, it is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples: 1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

Analyzer.analyze()[source]

Description:

Analyzer.evaluate(context, event)[source]
The function which records the information from the event into the analyzed object.

Specific for each analyzer.

Parameters

context :

eventany data type

information with timestamp.

Analyzer.predicate(context, event)[source]

This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.

Parameters

context :

eventany data type

information with timestamp.

Returns

True

Analyzer.process(context, event)[source]
The event passes through process(context, event) unchanged.

Meanwhile it is evaluated.

Parameters

context :

eventany data type

information with timestamp.

Returns

event

async Analyzer.on_clock_tick()[source]

Run analyzis every tick.

Analyzing Source

Lookup

Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.

If the “lazy” parameter in the constructor is set to True, no load method is called and the user is expected to call it when necessary.

class Lookup(app, id=None, config=None, lazy=False)[source]

Bases: Configurable

Description:


Returns

Lookup.__init__()[source]

Description:

Lookup Construction

Lookup.time()[source]

Description:

Returns

time


Lookup.ensure_future_update(loop)[source]

Description:

Returns


async Lookup.load() bool[source]

Description:

Lookup.serialize()[source]

Description:

Lookup.deserialize(data)[source]

Description:


Lookup.is_master()[source]

Description:

Returns

??


MappingLookup

class MappingLookup(app, id=None, config=None, lazy=False)[source]

Bases: Lookup, Mapping

Description:


MappingLookup.__init__()

Description:

Async Lookup Mixin

AsyncLookupMixin makes sure the value from the lookup is obtained asynchronously. AsyncLookupMixin is to be used for every technology that is external to BSPump, respective that require a connection to resource server such as SQL etc.

class AsyncLookupMixin(app, id=None, config=None, lazy=False)[source]

Bases: Lookup

Description:

Dictionary Lookup

class DictionaryLookup(app, id=None, config=None, lazy=False)[source]

Bases: MappingLookup

Description:

DictionaryLookup.__init__()[source]

Description:


Dictionary Lookup Methods

DictionaryLookup.__getitem__(key)[source]
DictionaryLookup.__len__()[source]
DictionaryLookup.serialize()[source]

Description:

Returns

json data


DictionaryLookup.deserialize(data)[source]

Description:


DictionaryLookup.rest_get()[source]

Description:

Returns

rest


DictionaryLookup.set(dictionary: dict)[source]

Description:


Lookup Provider

class LookupProviderABC(lookup, url, id=None, config=None)[source]

Bases: ABC, Configurable

Description:


LookupProviderABC.__init__()[source]

Description:

Lookup Provider Methods

async LookupProviderABC.load()[source]

Description:


Lookup BatchProvider ABC

class LookupBatchProviderABC(lookup, url, id=None, config=None)[source]

Bases: LookupProviderABC, ABC

Description:


LookupBatchProviderABC.__init__()

Description:

Anomaly

class Anomaly[source]

Bases: dict

Description: Anomaly is an abstract class to be overriden for a specific anomaly and its type.

Returns

Implement: TYPE, on_tick


Anomaly.__init__()