Source code for bspump.application

import signal
import sys

import asab
import asab.web

from .service import BSPumpService
from .__version__ import __version__, __build__


[docs]class BSPumpApplication(asab.Application): """ BSPumpApplication is responsible for the main life cycle of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. It is based on ASAB `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ class | """
[docs] def __init__(self, args=None, web_listen=None): ''' Initiates the Application and looks for config with additional arguments. **Parameters** args : default= None web_listen : default= None ''' super().__init__(args=args) # Banner print("BitSwan BSPump version {}".format(__version__)) from asab.proactor import Module self.add_module(Module) from asab.metrics import Module self.add_module(Module) self.PumpService = BSPumpService(self) self.WebContainer = None try: # Signals are not available on Windows self.Loop.add_signal_handler(signal.SIGUSR1, self._on_signal_usr1) except (NotImplementedError, AttributeError): pass # Register bspump API endpoints, if requested (the web service is present) if asab.Config["web"].get("listen"): # Initialize API service self.add_module(asab.web.Module) self.WebService = self.get_service("asab.WebService") from .web import initialize_web self.WebContainer = initialize_web(self.WebService.WebContainer)
[docs] def create_argument_parser(self): """ Enables to create arguments that can be called within the command prompt when starting the application :return: parser """ prog = sys.argv[0] if prog[-11:] == '__main__.py': prog = sys.executable + " -m bspump" description = ''' BSPump is a stream processor. It is a part of BitSwan. For more information, visit: https://github.com/LibertyAces/BitSwanPump version: {} build: {} [{}] '''.format(__version__, __build__, __build__[:7]) parser = super().create_argument_parser( prog=prog, description=description ) return parser
[docs] def parse_arguments(self, args=None): """ Parses argument in the ASAB `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ using super() method. **Parameters** args : default= None :return: args | """ args = super().parse_arguments(args=args) self._web_listen = args.web_api return args
[docs] async def main(self): """ Prints a message about how many pipelines are ready. | """ print("{} pipeline(s) ready.".format(len(self.PumpService.Pipelines)))
def _on_signal_usr1(self): """ Description: :return: :hint: To clear reset from all pipelines, run $ kill -SIGUSR1 xxxx Equivalently, you can use `docker kill -s SIGUSR1 ....` to reset containerized BSPump. """ # Reset errors from all pipelines for pipeline in self.PumpService.Pipelines.values(): if not pipeline.is_error(): continue # Focus only on pipelines that has errors pipeline.set_error(None, None, None)