Source code for matrixb.source.csv
# Copyright (c) 2019-2020 Kevin Crouse
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @license: http://www.apache.org/licenses/LICENSE-2.0
# @author: Kevin Crouse (krcrouse@gmail.com)
import os
from .base import SourceBase
[docs]class CSV (SourceBase):
def __init__(self, *args, encoding='utf-8-sig', csvdialect='excel', **kwargs):
"""
Extends the SourceBase and includes all arguments provided there. In addition, provides the following csv-only arguments:
Args:
encoding (str): Forwards to the encoding on to the file open() command. Default is 'utf-8-sig', which is useful to strip out Byte-Order-Marker u'feff' which happens for certain generated csv files in windows. 'latin_1' is a common alternative when 'utf-8-sig' leads to issues.
csvdialect (str): Forwards to the dialect parameter for csv.reader. Default is 'excel'.
"""
super().__init__(*args, **kwargs)
self.encoding = encoding
self.csvdialect = csvdialect
self.stream = None
[docs] def open_stream(self):
import csv
self.fh = open(os.path.expanduser(self.filename), 'r', newline='', encoding=self.encoding)
self.stream = csv.reader(self.fh, dialect=self.csvdialect)
[docs] def skip_rows(self, count):
skipped = []
for i in range(count):
skipped.append(next(self.stream))
return(skipped)
[docs] def next_row(self):
""" next() for csv matrix sources, which translates empty strings to None and skips blank lines."""
if not self.stream:
self.open()
hasText = False
while True:
try:
row = next(self.stream)
except StopIteration:
self.fh.close()
raise(StopIteration)
# csvreader should raise a StopException exception at EOF so we don't have to
for i in range(0, len(row)):
if row[i] is not None:
if type(row[i]) is str:
if self.nonemptyre.search(row[i]):
hasText = i+1
# -- don't break because we want to convert empty cells
# later in the row to None
else:
row[i] = None
elif type(row[i]) in (int, float):
hasText = i+1
if hasText:
return(row[:hasText])
[docs] @classmethod
def export_to(self, matrix, filename, topmatter=None, autosize=None):
"""The export_to class method to export a matrixb matrix to a CSV file.
Args:
matrix (matrixb.Matrix): The Matrix object to export.
filename (str): The full path to send the file.
topmatter (list|str, optional): Lines to appear above the exported table.
autosize: This parameter is not used, only maintained for consistency with the export_to interface.
"""
with open(filename, 'w') as fh:
if topmatter:
if type(topmatter) in (list, tuple):
fh.writelines(topmatter)
else:
fh.write('"' + topmatter + '"' + "\n")
fh.write(','.join(self.export_cell(colname) for colname in matrix.columns) + "\n")
for row in matrix:
fh.write(','.join(self.export_cell(cell) for cell in row) + "\n")
[docs] @classmethod
def export_cell(self, cell):
if cell is None:
return('')
if type(cell) is str:
# quotes need to be substitutded
cell = cell.replace('"', '""')
if "\n" in cell or "," in cell or "\r" in cell:
return('"' + str(cell) + '"')
return(str(cell))
def __getstate__(self):
""" To pickle/serialze the csv source, we delete the stream and filehandle - this will allow future restored objects to get things like the source filename, but attempts to access the source object will fail. """
state = super().__getstate__()
del state['stream']
del state['fh']
return(state)