import csv
import logging
import os
from ..abc.sink import Sink
#
L = logging.getLogger(__file__)
#
[docs]class FileCSVSink(Sink):
"""
Description:
** Default Config**
path : ''
dialect : 'excel'
delimiter : ','
doublequote : True
escapechar : ""
lineterminator : os.linesep
quotechar : '"'
quoting : csv.QUOTE_MINIMAL
skipinitialspace : False
strict : False
"""
ConfigDefaults = {
'path': '',
'dialect': 'excel',
'delimiter': ',',
'doublequote': True,
'escapechar': "",
'lineterminator': os.linesep,
'quotechar': '"',
'quoting': csv.QUOTE_MINIMAL, # 0 - 3 for [QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE]
'skipinitialspace': False,
'strict': False,
}
[docs] def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
"""
super().__init__(app, pipeline, id=id, config=config)
self.Dialect = csv.get_dialect(self.Config['dialect'])
self._csv_writer = None
[docs] def get_file_name(self, context, event):
"""
Description: Override this method to gain control over output file name.
**Parameters**
context :
event :
:return: path of context and config
|
"""
# Here we are able to modify the sink behavior from outside using context.
# If provided, the filename (and path) is taken from the context instead of the config
return context.get("path", self.Config["path"])
[docs] def writer(self, f, fieldnames):
"""
Description:
**Parameters**
f :
fieldnames : file
Name of the file.
:return: dialect and fieldnames
|
"""
kwargs = dict()
kwargs['delimiter'] = self.Config.get('delimiter')
kwargs['doublequote'] = bool(self.Config.get('doublequote'))
escape_char = self.Config.get('escapechar')
escape_char = None if escape_char == "" else escape_char
kwargs['escapechar'] = escape_char
kwargs['lineterminator'] = self.Config.get('lineterminator')
kwargs['quotechar'] = self.Config.get('quotechar')
kwargs['quoting'] = int(self.Config.get('quoting'))
kwargs['skipinitialspace'] = bool(self.Config.get('skipinitialspace'))
kwargs['strict'] = bool(self.Config.get('strict'))
return csv.DictWriter(
f,
dialect=self.Dialect,
fieldnames=fieldnames,
**kwargs
)
[docs] def process(self, context, event):
"""
Description:
**Parameters**
context :
event : any data type
Information with timestamp.
"""
if self._csv_writer is None:
# Open CSV file if needed
fieldnames = event.keys()
fname = self.get_file_name(context, event)
fo = open(fname, 'w', newline='')
self._csv_writer = self.writer(fo, fieldnames)
self._csv_writer.writeheader()
self._csv_writer.writerow(event)
[docs] def rotate(self):
"""
Description: Call this to close the currently open file.
"""
del self._csv_writer
self._csv_writer = None