Advanced
BitSwan Pump provides more advanced Processors that can be used in a pipeline
Generator
- Generator object is used to generate one or multiple events in asynchronous way
and pass them to following processors in the pipeline. In the case of Generator, user overrides generate method, not process.
1.) Generator can iterate through an event to create (generate) derived ones and pass them to following processors.
Example of a custom Generator class with generate method:
class MyGenerator(bspump.Generator):
async def generate(self, context, event, depth):
for item in event.items():
self.Pipeline.inject(context, item, depth)
2.) Generator can in the same way also generate completely independent events, if necessary.
In this way, the generator processes originally synchronous events "out-of-band" e.g. out of the synchronous processing within the pipeline.
Specific implementation of the generator should implement the generate method to process events while performing
long running (asynchronous) tasks such as HTTP requests or SQL select.
The long running tasks may enrich events with relevant information, such as output of external calculations.
Example of generate method:
async def generate(self, context, event, depth):
# Perform possibly long-running asynchronous operation
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com/resolve_color/{}".format(event.get("color_id", "unknown"))) as resp:
if resp.status != 200:
return
new_event = await resp.json()
# Inject a new event into a next depth of the pipeline
self.Pipeline.inject(context, new_event, depth)
- Generator.__init__()[source]
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, defualt = None
configuration file containing additional information.
Generator Construction
Analyzer
- This is general analyzer interface, which can be the basement of different analyzers.
analyze_on_clock enables analyzis by timer, which period can be set by analyze_period or Config[“analyze_period”].
In general, the Analyzer contains some object, where it accumulates some information about events. Events go through analyzer unchanged, the information is recorded by evaluate() function. The internal object sometimes should be processed and sent somewhere (e.g. another pipeline), this process can be done by analyze() function, which can be triggered by time, pubsub or externally
- class Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]
Bases:
Processor
Description:
Analyzer Construction
Analyzer
The main function, which runs through the analyzed object. Specific for each analyzer. If the analyzed object is Matrix, it is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy fuctions. Examples: 1. You have a vector with n rows. You need only those row indeces, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Usefull numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
- Analyzer.evaluate(context, event)[source]
- The function which records the information from the event into the analyzed object.
Specific for each analyzer.
Parameters
context :
- eventany data type
information with timestamp.
- Analyzer.predicate(context, event)[source]
This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.
Parameters
context :
- eventany data type
information with timestamp.
- Returns
True
Analyzing Source
Lookup
Lookups serve for fast data searching in lists of key-value type. They can subsequently be localized and used in pipeline objects (processors and the like). Each lookup requires a statically or dynamically created value list.
If the “lazy” parameter in the constructor is set to True, no load method is called and the user is expected to call it when necessary.
- class Lookup(app, id=None, config=None, lazy=False)[source]
Bases:
Configurable
Description:
- Returns
Lookup Construction
MappingLookup
- class MappingLookup(app, id=None, config=None, lazy=False)[source]
Bases:
Lookup
,Mapping
Description:
- MappingLookup.__init__()
Description:
Async Lookup Mixin
AsyncLookupMixin makes sure the value from the lookup is obtained asynchronously. AsyncLookupMixin is to be used for every technology that is external to BSPump, respective that require a connection to resource server such as SQL etc.
Dictionary Lookup
- class DictionaryLookup(app, id=None, config=None, lazy=False)[source]
Bases:
MappingLookup
Description:
Dictionary Lookup Methods
Lookup Provider
Lookup Provider Methods
Lookup BatchProvider ABC
- class LookupBatchProviderABC(lookup, url, id=None, config=None)[source]
Bases:
LookupProviderABC
,ABC
Description:
- LookupBatchProviderABC.__init__()
Description:
Anomaly
- class Anomaly[source]
Bases:
dict
Description: Anomaly is an abstract class to be overriden for a specific anomaly and its type.
- Returns
Implement: TYPE, on_tick
- Anomaly.__init__()