import logging
import asyncio
import concurrent.futures
from asab import ConfigObject
L = logging.getLogger(__name__)
[docs]class Source(ConfigObject):
"""
Source class is responsible for connecting to a source, and propagating events or other data from the source to the :meth:`processors <bspump.Processor()>`.
"""
[docs] def __init__(self, app, pipeline, id=None, config=None):
"""
Set the initial ID, :meth:`Pipeline <bspump.Pipeline()>` and Task.
**Parameters**
app : Application
Name of an `Application` <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
pipeline : address of a pipeline
Name of a :meth:`Pipeline <bspump.Pipeline()>`.
id : str, default None
Name of a the :meth:`Pipeline <bspump.Pipeline()>`.
config : compatible config type , default None
Option for adding a configuration file.
"""
super().__init__("pipeline:{}:{}".format(pipeline.Id, id if id is not None else self.__class__.__name__), config=config)
self.Id = id if id is not None else self.__class__.__name__
self.Pipeline = pipeline
self.Task = None # Contains a main coroutine `main()` if Pipeline is started
[docs] async def process(self, event, context=None):
"""
This method is used to emit event into a :meth:`Pipeline <bspump.Pipeline()>`.
**Parameters**
event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a :meth:`Pipeline <bspump.Pipeline()>`.
context : default None
Additional information.
If there is an error in the processing of the event, the :meth:`Pipeline <bspump.Pipeline()>` is throttled by setting the error and the exception raised.
:hint The source should catch this exception and fail gracefully.
"""
# TODO: Remove this method completely, each source should call pipeline.process() method directly
await self.Pipeline.process(event, context=context)
[docs] def start(self, loop):
"""
Starts the :meth:`Pipeline <bspump.Pipeline()>` through the _main method, but if main method is implemented
it starts the coroutine using main method instead.
**Parameters**
loop : ?
Contains the coroutines.
"""
if self.Task is not None:
return
async def _main():
"""
Description:
:return:
"""
# This is to properly handle a lifecycle of the main method
try:
await self.main()
except concurrent.futures.CancelledError:
pass
except Exception as e:
self.Pipeline.set_error(None, None, e)
L.exception("Exception in the source '{}'".format(self.Id))
self.Task = loop.create_task(_main())
[docs] async def stop(self):
"""
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
"""
if self.Task is None:
return # Source is not started
if not self.Task.done():
self.Task.cancel()
await self.Task
if not self.Task.done():
L.error("Source '{}' refused to stop: {}".format(self.Id, self.Task))
self.Task = None
[docs] def restart(self, loop):
"""
Restarts the loop of coroutines and returns result() method.
**Parameters**
loop : ??
Contains the coroutines.
"""
if self.Task is not None:
if self.Task.done():
self.Task.result()
self.Task = None
self.start(loop)
[docs] async def main(self):
"""
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
"""
raise NotImplementedError()
[docs] async def stopped(self):
"""
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#... initialize resources here
await self.stopped()
#... finalize resources here
"""
try:
while True:
await asyncio.sleep(60)
except asyncio.CancelledError:
pass
[docs] def locate_address(self):
"""
Locates address of a :meth:`Pipeline <bspump.Pipeline()>`.
:return: ID and ID of a :meth:`Pipeline <bspump.Pipeline()>` as a string.
"""
return "{}.*{}".format(self.Pipeline.Id, self.Id)
def rest_get(self):
"""
:return: ID and class ID
"""
return {
"Id": self.Id,
"Class": self.__class__.__name__
}
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.locate_address())
[docs] @classmethod
def construct(cls, app, pipeline, definition: dict):
"""
Can create a source based on a specific definition. For example, a JSON file.
**Parameters**
app : Application
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_.
pipeline : :meth:`Pipeline <bspump.Pipeline()>`
Specification of a :meth:`Pipeline <bspump.Pipeline()>`.
definition : dict
Definition that is used to create a source.
:return: cls(app, newid, config)
"""
newid = definition.get('id')
config = definition.get('config')
args = definition.get('args')
if args is not None:
return cls(app, pipeline, id=newid, config=config, **args)
else:
return cls(app, pipeline, id=newid, config=config)
[docs]class TriggerSource(Source):
"""
Description:
|
:return:
"""
[docs] def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.App = app
self.Loop = app.Loop
self.TriggerEvent = asyncio.Event(loop=app.Loop)
self.TriggerEvent.clear()
self.Triggers = set()
[docs] def time(self):
"""
Method used for measuring an accurate time.
:return: App.time()
:hint: You can find more information about `UTC Time <https://asab.readthedocs.io/en/latest/asab/application.html#utc-time>`_ in the ASAB documentation
"""
return self.App.time()
[docs] def on(self, trigger):
"""
Sets a Trigger which is a method that waits for a given condition.
**Parameters**
trigger : keyword of a trigger
Given condition that.
:return: Trigger.add(trigger)
"""
trigger.add(self)
self.Triggers.add(trigger)
return self
[docs] async def main(self, *args, **kwags):
"""
Waits for :meth:`Pipeline <bspump.Pipeline()>`, triggers, and calls exceptions when the source is initiated.
**Parameters**
*args : ?
**kwags : ?
|
"""
while True:
# Wait for pipeline is ready
await self.Pipeline.ready()
# Wait for a trigger
await self.TriggerEvent.wait()
# Send begin on a cycle event
self.Pipeline.PubSub.publish("bspump.pipeline.cycle_begin!", pipeline=self.Pipeline)
# Execute one cycle
try:
await self.cycle(*args, **kwags)
except concurrent.futures.CancelledError:
# This happens when Ctrl-C is pressed
L.warning("Pipeline '{}' processing was cancelled".format(self.Pipeline.Id))
# Send end of a cycle event
self.Pipeline.PubSub.publish("bspump.pipeline.cycle_canceled!", pipeline=self.Pipeline)
break
except BaseException as e:
self.Pipeline.set_error(None, None, e)
# Send end of a cycle event
self.Pipeline.PubSub.publish("bspump.pipeline.cycle_end!", pipeline=self.Pipeline)
self.TriggerEvent.clear()
for trigger in self.Triggers:
trigger.done(self)
[docs] async def cycle(self, *args, **kwags):
"""
Not implemented.
**Parameters**
*args : ?
**kwags : ?
"""
raise NotImplementedError()
[docs] def rest_get(self):
"""
Description:
:return:
"""
result = super().rest_get()
result.update({
"triggered": self.TriggerEvent.is_set()
})
return result