import asyncio
import logging
import os
import time
from .globscan import _glob_scan
from ..abc.source import TriggerSource
L = logging.getLogger(__file__)
[docs]class FileABCSource(TriggerSource):
"""
Description:
"""
ConfigDefaults = {
'path': '',
'mode': 'rb',
'newline': os.linesep,
'post': 'move', # one of 'delete', 'noop' and 'move'
'exclude': '', # glob of filenames that should be excluded (has precedence over 'include')
'include': '', # glob of filenames that should be included
'encoding': '',
'move_destination': '', # destination folder for 'move'. Make sure it's outside of the glob search
'lines_per_event': 10000, # the number of lines after which the read method enters the idle state to allow other operations to perform their tasks
'event_idle_time': 0.01, # the time for which the read method enters the idle state (see above)
}
[docs] def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
**Parameters**
app : Application
Name of the Application.
pipeline : Pipeline
Name of the Pipeline.
id : ID, default = None
ID
config : JSON, default = None
Configuration file with additional information.
"""
super().__init__(app, pipeline, id=id, config=config)
self.path = self.Config['path']
self.mode = self.Config['mode']
self.newline = self.Config['newline']
self.post = self.Config['post']
if self.post not in ['delete', 'noop', 'move']:
L.warning("Incorrect/unknown 'post' configuration value '{}' - defaulting to 'move'".format(self.post))
self.post = 'move'
self.include = self.Config['include']
self.exclude = self.Config['exclude']
conf_encoding = self.Config['encoding']
self.encoding = conf_encoding if len(conf_encoding) > 0 else None
self.MoveDestination = self.Config['move_destination']
if (self.MoveDestination != ''):
if (self.post == 'move') and (not os.path.isdir(self.MoveDestination)):
os.makedirs(self.MoveDestination)
else:
self.MoveDestination = None
metrics_service = app.get_service('asab.MetricsService')
self.Gauge = metrics_service.create_gauge(
"file_count",
tags={
'pipeline': pipeline.Id,
},
init_values={
"processed": 0,
"failed": 0,
"locked": 0,
"unprocessed": 0,
"all_files": 0,
"scan_time": 0.0,
}
)
self.Loop = app.Loop
self.ProactorService = app.get_service("asab.ProactorService")
self.LinesCounter = 0
self.LinesPerEvent = int(self.Config["lines_per_event"])
self.EventIdleTime = float(self.Config["event_idle_time"])
[docs] async def cycle(self):
"""
Cycles through a file.
"""
filename = None
start_time = time.time()
for path in self.path.split(os.pathsep):
# Asynchronously call following:
# filename = _glob_scan(path, self.Gauge, self.Loop, exclude=self.exclude, include=self.include)
filename = await self.ProactorService.execute(
_glob_scan,
path,
self.Gauge,
self.Loop,
self.exclude,
self.include
)
if filename is not None:
break
end_time = time.time()
self.Gauge.set("scan_time", end_time - start_time)
if filename is None:
self.Pipeline.PubSub.publish("bspump.file_source.no_files!")
return # No file to read
await self.Pipeline.ready()
# Lock the file
L.debug("Locking file '{}'".format(filename))
locked_filename = filename + '-locked'
try:
os.rename(filename, locked_filename)
except FileNotFoundError:
return
except (OSError, PermissionError): # OSError - UNIX, PermissionError - Windows
L.exception("Error when locking the file '{}' - will try again".format(filename))
return
except BaseException as e:
L.exception("Error when locking the file '{}'".format(filename))
self.Pipeline.set_error(None, None, e)
return
try:
if filename.endswith(".gz"):
import gzip
f = gzip.open(locked_filename, self.mode, encoding=self.encoding)
elif filename.endswith(".bz2"):
import bz2
f = bz2.open(locked_filename, self.mode, encoding=self.encoding)
elif filename.endswith(".xz") or filename.endswith(".lzma"):
import lzma
f = lzma.open(locked_filename, self.mode, encoding=self.encoding)
else:
if 'b' in self.mode: # Binary mode doesn't take a newline argument
self.newline = None
f = open(locked_filename, self.mode, newline=self.newline, encoding=self.encoding)
except (OSError, PermissionError): # OSError - UNIX, PermissionError - Windows
L.exception("Error when opening the file '{}' - will try again".format(filename))
return
except BaseException as e:
L.exception("Error when opening the file '{}'".format(filename))
self.Pipeline.set_error(None, None, e)
return
L.debug("Processing file '{}'".format(filename))
try:
await self.read(filename, f)
except Exception:
try:
if self.post == "noop":
# When we should stop, rename file back to original
os.rename(locked_filename, filename)
else:
# Otherwise rename to ...-failed and continue processing
os.rename(locked_filename, filename + '-failed')
except BaseException:
L.exception("Error when renaming the file '{}' - will try again".format(filename))
return
finally:
f.close()
L.debug("File '{}' processed {}".format(filename, "succefully"))
# Finalize
try:
if self.post == "delete":
os.unlink(locked_filename)
elif self.post == "noop":
os.rename(locked_filename, filename)
else:
if self.MoveDestination is not None:
file_from = os.path.abspath(locked_filename)
base = os.path.basename(filename)
file_to = os.path.abspath(os.path.join(self.MoveDestination, base + '-processed'))
else:
file_from = locked_filename
file_to = filename + "-processed"
os.rename(file_from, file_to)
except (OSError, PermissionError): # OSError - UNIX, PermissionError - Windows
L.exception("Error when finalizing the file '{}' - will try again".format(filename))
return
except BaseException as e:
L.exception("Error when finalizing the file '{}'".format(filename))
self.Pipeline.set_error(None, None, e)
return
[docs] async def simulate_event(self):
"""
The simulate_event method should be called in read method after a file line has been processed.
It ensures that all other asynchronous events receive enough time to perform their tasks.
Otherwise, the application loop is blocked by a file reader and no other activity makes a progress.
"""
self.LinesCounter += 1
if self.LinesCounter >= self.LinesPerEvent:
await asyncio.sleep(self.EventIdleTime)
self.LinesCounter = 0
[docs] async def read(self, filename, f):
"""
Description: Override this method to implement your File Source.
`f` is an opened file object.
**Parameters**
filename : file
Name of the file.
f :
"""
raise NotImplementedError()