import logging
from .routing import InternalSource, RouterProcessor
L = logging.getLogger(__name__)
[docs]class TeeSource(InternalSource):
"""
Description:
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.socket.TCPStreamSource(app, self, config={'port': 7000}),
bspump.common.TeeProcessor(app, self).bind("SampleTeePipeline.*TeeSource"),
bspump.common.PPrintSink(app, self)
)
class SampleTeePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.common.TeeSource(app, self),
bspump.common.PPrintSink(app, self)
)
|
"""
[docs] def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id=id, config=config)
self.Targets = []
self._svc = app.get_service("bspump.PumpService")
[docs] def bind(self, target):
"""
Description:
:return:
|
"""
self.Targets.append(target)
return self
[docs] async def main(self):
"""
Description:
:return:
|
"""
unbind_processor = []
for target in self.Targets:
processor = self._svc.locate(target)
if processor is None:
L.warning("TeeSource '{}' cannot find processor '{}'".format(self.Id, target))
return
if not isinstance(processor, TeeProcessor):
L.warning("TeeSource '{}' requires TeeProcessor as target, not '{}'".format(self.Id, target))
return
processor.bind(self.locate_address())
unbind_processor.append(processor)
try:
await super().main()
finally:
for processor in unbind_processor:
processor.unbind(self.locate_address())
#
[docs]class TeeProcessor(RouterProcessor):
"""
Description: See TeeSource for details.
|
"""
ConfigDefaults = {
}
[docs] def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id=id, config=config)
self.Targets = []
[docs] def bind(self, target: str):
"""
Description: Target is a bspump.PumpService.locate() string
:return: ?
|
"""
self.Targets.append(target)
return self
[docs] def unbind(self, target: str):
"""
Description:
:return: ?
|
"""
self.Targets.remove(target)
self.unlocate(target)
return self
[docs] def process(self, context, event):
"""
Description:
:return: event
|
"""
for source in self.Targets:
self.route(context, event, source)
return event