Source code for bspump.ipc.datagram

import asyncio
import logging
import socket

from ..abc.source import Source
from ..abc.sink import Sink

#

L = logging.getLogger(__name__)


#

[docs]class DatagramSource(Source): """ Description: """ ConfigDefaults = { 'address': '127.0.0.1 8888', # IPv4, IPv6 or unix socket path 'max_packet_size': 64 * 1024, 'receiver_buffer_size': 0, }
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: """ super().__init__(app, pipeline, id=id, config=config) self.Loop = app.Loop # Create a UDP socket self.Address = str(self.Config['address']) # Receive Buffer Size self.ReceiveBufferSize = int(self.Config['receiver_buffer_size']) addrline = self.Address.strip() if addrline.count(":") == 1: host, port = self.Address.rsplit(":", maxsplit=1) (family, socktype, proto, canonname, sockaddr) = socket.getaddrinfo(host, port)[0] self.Socket = socket.socket(family, socket.SOCK_DGRAM) self.Socket.setblocking(False) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if self.ReceiveBufferSize > 0: self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.ReceiveBufferSize) self.Socket.bind(sockaddr) else: self.Socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.Socket.setblocking(False) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if self.ReceiveBufferSize > 0: self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.ReceiveBufferSize) self.Socket.bind(self.Address) self.MaxPacketSize = int(self.Config['max_packet_size'])
[docs] async def main(self): """ Description: """ task = asyncio.ensure_future(self._receive(), loop=self.Loop) await self.stopped() task.cancel() await task self.Socket.close()
async def _receive(self): while True: try: await self.Pipeline.ready() event = await self.Loop.sock_recv(self.Socket, self.MaxPacketSize) await self.Pipeline.ready() await self.process(event) except asyncio.CancelledError: break except Exception: L.exception("Error in datagram source.") raise
[docs]class DatagramSink(Sink): """ Description: """ ConfigDefaults = { 'address': '127.0.0.1 8888', # IPv4, IPv6 or unix socket path 'max_packet_size': 64 * 1024, 'receiver_buffer_size': 0, }
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: """ super().__init__(app, pipeline, id=id, config=config) self.Loop = app.Loop # Create a UDP socket self.Address = str(self.Config['address']) # Receive Buffer Size self.ReceiveBufferSize = int(self.Config['receiver_buffer_size']) addrline = self.Address.strip() if " " in addrline: host, port = self.Address.rsplit(" ", maxsplit=1) elif addrline.count(":") == 1: host, port = self.Address.rsplit(":", maxsplit=1) (family, socktype, proto, canonname, sockaddr) = socket.getaddrinfo(host, port)[0] self.Socket = socket.socket(family, socket.SOCK_DGRAM) self.Socket.setblocking(False) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if self.ReceiveBufferSize > 0: self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.ReceiveBufferSize) self.Socket.connect(sockaddr) else: self.Socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.Socket.setblocking(False) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if self.ReceiveBufferSize > 0: self.Socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.ReceiveBufferSize) self.Socket.connect(self.Address) self.MaxPacketSize = int(self.Config['max_packet_size'])
[docs] def process(self, context, event): """ Description: """ self.Socket.send(event)