Source code for sdata.data

# -*-coding: utf-8-*-
from __future__ import division

import sdata

'''
basic sdata types 
'''
from sdata import __version__
import sys
import os
import uuid
from collections import OrderedDict
import logging
logger = logging.getLogger("sdata")
import numpy as np
import pandas as pd
import shutil
import copy
from sdata.metadata import Metadata, Attribute, extract_name_unit
from sdata.timestamp import now_utc_str, now_local_str, today_str
import inspect
import json
import hashlib
import base64
import requests
from tabulate import tabulate
from sdata.contrib.sqlitedict import SqliteDict

if sys.version_info < (3, 0):
    from StringIO import StringIO
else:
    from io import BytesIO, StringIO

[docs]class Sdata_Name_Exeption(Exception): pass
[docs]class Sdata_Uuid_Exeption(Exception): pass
if sys.version_info < (3, 6): import sha3 try: import openpyxl except: logger.warning("openpyxl is not available -> no xlsx import")
[docs]def uuid_from_str(name): return uuid.uuid3(uuid.NAMESPACE_DNS, name)
[docs]class Data(object): """Base sdata object""" ATTR_NAMES = [] SDATA_VERSION = "!sdata_version" SDATA_NAME = "!sdata_name" SDATA_UUID = "!sdata_uuid" SDATA_CTIME = "!sdata_ctime" SDATA_MTIME = "!sdata_mtime" SDATA_PARENT = "!sdata_parent" SDATA_CLASS = "!sdata_class" SDATA_PROJECT = "!sdata_project" SDATA_ATTRIBUTES = [SDATA_VERSION, SDATA_NAME, SDATA_UUID, SDATA_CLASS, SDATA_PARENT, SDATA_PROJECT, SDATA_CTIME, SDATA_MTIME] def __init__(self, **kwargs): """create Data object .. code-block:: python df = pd.DataFrame([1,2,3]) data = sdata.Data(name='my name', uuid='38b26864e7794f5182d38459bab85842', table=df, description="A remarkable description") :param name: name of the data object :param table: pandas.DataFrame to store :param uuid: uuid of the object :param metadata: sdata.Metadata object :param description: a string to describe the object """ # self._uuid = None # self._name = None self._prefix = None # ToDo: add getter and setter for metadata # self.metadata = kwargs.get("metadata") or Metadata() self.metadata = Metadata() # set default sdata attributes self.metadata.add(self.SDATA_VERSION, __version__, dtype="str", description="sdata package version") self.metadata.add(self.SDATA_NAME, "N.N.", dtype="str", description="name of the data object") self.metadata.add(self.SDATA_UUID, "", dtype="str", description="Universally Unique Identifier") self.metadata.add(self.SDATA_PARENT, "", dtype="str", description="uuid of the parent sdata object") self.metadata.add(self.SDATA_CLASS, self.__class__.__name__, dtype="str", description="sdata class") self.metadata.add(self.SDATA_CTIME, now_utc_str(), dtype="str", description="creation date") self.metadata.add(self.SDATA_MTIME, now_utc_str(), dtype="str", description="modification date") metadata = kwargs.get("metadata") if metadata is not None: # logger.debug("Data got Metadata {}".format(metadata)) if metadata and isinstance(metadata, Metadata): for attribute in metadata.attributes.values(): # logger.debug("Data.Metadata.add {0.name}:{0.value}".format(attribute)) self.metadata.add(attribute) # auto correct if kwargs.get("auto_correct") is None or kwargs.get("auto_correct") is True: self.auto_correct = True else: self.auto_correct = False # logger.debug("sdata: set auto_correct={}".format(self.auto_correct)) if kwargs.get("name") is not None: self.name = kwargs.get("name") self.prefix = kwargs.get("prefix") or "" self._gen_default_attributes(kwargs.get("default_attributes") or self.ATTR_NAMES) self._group = OrderedDict() self._table = None # pd.DataFrame() self.table = kwargs.get("table", None) self._description = "" self.description = kwargs.get("description", "") self.project = kwargs.get("project", "") if (kwargs.get("uuid")=="" or kwargs.get("uuid") is not None) and not self.metadata.get(self.SDATA_UUID).value and kwargs.get("uuid")!="hash": # logger.info("uuid in kwargs") try: self._set_uuid(kwargs.get("uuid")) # store given uuid str or generate a new uuid except Sdata_Uuid_Exeption as exp: if self.auto_correct is True: logger.warning("got invalid uuid -> generate a new uuid") self._set_uuid(uuid.uuid4()) else: raise elif (kwargs.get("uuid")=="" or kwargs.get("uuid") is None) and self.metadata.get(self.SDATA_UUID).value != "": # logger.info("uuid in metadata") pass elif kwargs.get("uuid")=="hash": sha3_256 = self.gen_uuid_from_state() # logger.info("gen uuid from sha3_256 {}".format(sha3_256)) new_uuid = uuid_from_str(sha3_256) self._set_uuid(new_uuid.hex) else: # logger.info("uuid new") self._set_uuid(uuid.uuid4())
[docs] def gen_uuid_from_state(self): """generate the same uuid for the same data :return: uuid """ s = hashlib.sha3_256() metadata = self.metadata.copy() metadata.attributes.pop(self.SDATA_UUID) metadata.attributes.pop(self.SDATA_MTIME) metadata.attributes.pop(self.SDATA_CTIME) metadatastr = metadata.to_json().encode(errors="replace") s.update(metadatastr) if self.table is not None: tablestr = self.table.to_json().encode(errors="replace") s.update(tablestr) s.update(self.description.encode(errors="replace")) return s.hexdigest()
def __eq__(self, other): """compare Data checksum :param other: sdata.Data objecet :return: True or False """ if not isinstance(other, self.__class__): logger.debug("you should not compare {} with {}!".format(self.__class__.__name__, other.__class__.__name__)) return False return self.sha3_256 == other.sha3_256
[docs] def update_mtime(self): """update modification time :return: """ self.metadata.add(self.SDATA_MTIME, now_utc_str())
@property def sha3_256_table(self): """Return a SHA3 hash of the sData.table object with a hashbit length of 32 bytes. .. code-block:: python sdata.Data(name="1", uuid=sdata.uuid_from_str("1")).sha3_256_table 'c468e659891eb5dea6eb6baf73f51ca0688792bf9ad723209dc22730903f6efa' :return: hashlib.sha3_256.hexdigest() """ s = hashlib.sha3_256() if self.table is not None: tablestr = self.table.to_json().encode(errors="replace") s.update(tablestr) return s.hexdigest() @property def sha3_256(self): """Return a SHA3 hash of the sData object with a hashbit length of 32 bytes. .. code-block:: python sdata.Data(name="1", uuid=sdata.uuid_from_str("1")).sha3_256 'c468e659891eb5dea6eb6baf73f51ca0688792bf9ad723209dc22730903f6efa' :return: hashlib.sha3_256.hexdigest() """ s = hashlib.sha3_256() metadatastr = self.metadata.to_json().encode(errors="replace") s.update(metadatastr) if self.table is not None: tablestr = self.table.to_json().encode(errors="replace") s.update(tablestr) s.update(self.description.encode(errors="replace")) return s.hexdigest()
[docs] def update_hash(self, hashobject): """A hash represents the object used to calculate a checksum of a string of information. .. code-block:: python data = sdata.Data() md5 = hashlib.md5() data.update_hash(md5) md5.hexdigest() 'bbf323bdcb0bf961803b5504a8a60d69' sha1 = hashlib.sha1() data.update_hash(sha1) sha1.hexdigest() '3c59368c7735c1ecaf03ebd4c595bb6e73e90f0c' hashobject = hashlib.sha3_256() data.update_hash(hashobject).hexdigest() 'c468e659891eb5dea6eb6baf73f51ca0688792bf9ad723209dc22730903f6efa' data.update_hash(hashobject).digest() b'M8...' :param hash: hash object, e.g. hashlib.sha1() :return: hash """ if not (hasattr(hashobject, "update") and hasattr(hashobject, "hexdigest")): logger.error("Data.update_hash: given hashfunction is invalid") raise Exception("Data.update_hash: given hashfunction is invalid") metadatastr = self.metadata.to_json().encode(errors="replace") hashobject.update(metadatastr) if self.table is not None: tablestr = self.table.to_json().encode(errors="replace") hashobject.update(tablestr) hashobject.update(self.description.encode(errors="replace")) return hashobject
[docs] def describe(self): """Generate descriptive info of the data .. code-block:: python df = pd.DataFrame([1,2,3]) data = sdata.Data(name='my name', uuid='38b26864e7794f5182d38459bab85842', table=df, description="A remarkable description") data.describe() .. code-block:: none 0 metadata 3 table_rows 3 table_columns 1 description 24 :return: pd.DataFrame """ df = pd.DataFrame({0: []}, dtype=object) df.loc["metadata", 0] = self.metadata.size if self.table is None: df.loc["table_rows"] = 0 df.loc["table_columns"] = 0 else: df.loc["table_rows"] = len(self.table) df.loc["table_columns"] = len(self.table.columns) df.loc["description", 0] = len(self.description) return df
def _gen_default_attributes(self, default_attributes): """create default Attributes in data.metadata""" for attr_name, value, dtype, unit, description, required in default_attributes: self.metadata.set_attr(name=attr_name, value=value, dtype=dtype, description=description) def _get_uuid(self): return self.metadata.get(self.SDATA_UUID).value # return self._uuid def _set_uuid(self, value): if isinstance(value, str): try: uuid.UUID(value) self.metadata.set_attr(self.SDATA_UUID, uuid.UUID(value).hex) except ValueError as exp: logger.warning("data.uuid: %s" % exp) raise Sdata_Uuid_Exeption("got invalid uuid str '{}'".format(str(value))) elif isinstance(value, uuid.UUID): self.metadata.set_attr(self.SDATA_UUID, value.hex) else: logger.error("Data.uuid: invalid uuid '{}'".format(value)) raise Exception("Data.uuid: invalid uuid '{}'".format(value)) uuid = property(fget=_get_uuid, fset=_set_uuid, doc="uuid of the object") def _get_name(self): # return self._name return self.metadata.get(self.SDATA_NAME).value def _set_name(self, value): if isinstance(value, str): try: self.metadata.set_attr(self.SDATA_NAME, str(value)[:256]) except ValueError as exp: logger.warning("data.name: %s" % exp) else: # self._name = str(value)[:256] self.metadata.set_attr(self.SDATA_NAME, str(value)[:256]) name = property(fget=_get_name, fset=_set_name, doc="name of the object") def _get_project(self): return self.metadata.get(self.SDATA_PROJECT).value def _set_project(self, value): if isinstance(value, str): try: self.metadata.set_attr(self.SDATA_PROJECT, str(value)[:256]) except ValueError as exp: logger.warning("data.project: %s" % exp) else: # self._name = str(value)[:256] self.metadata.set_attr(self.SDATA_PROJECT, str(value)[:256]) project = property(fget=_get_project, fset=_set_project, doc="name of the project") def _get_description(self): return self._description def _set_description(self, value): if isinstance(value, str): try: self._description = str(value) except ValueError as exp: logger.warning("data.name: %s" % exp) else: self._description = str(value) description = property(fget=_get_description, fset=_set_description, doc="description of the object") @property def filename(self): validchars = "-_.() " out = "" name = "{}".format(self.name) for c in name: if str.isalpha(c) or str.isdigit(c) or (c in validchars): out += c else: out += "_" return out def _get_prefix(self): return self._prefix def _set_prefix(self, value): if isinstance(value, str): try: self._prefix = value[:256] except ValueError as exp: logger.warning("data.prefix: %s" % exp) else: self._prefix = str(value)[:256] prefix = property(fget=_get_prefix, fset=_set_prefix, doc="prefix of the object name") def _get_table(self): return self._table def _set_table(self, df): if isinstance(df, pd.DataFrame): self._table = df if self._table.index.name is None: self._table.index.name = "index" table = property(fget=_get_table, fset=_set_table, doc="table object(pandas.DataFrame)") df = table
[docs] def description_to_df(self): """get description as DataFrame :return: DataFrame of description lines """ return pd.DataFrame(self.description.splitlines())
[docs] def description_from_df(self, df): """set description from DataFrame of lines :return: """ if df is not None and isinstance(df, pd.DataFrame) and len(df)>0: lines = df.iloc[:, 0] lines = lines.astype(str) self.description = "\n".join(lines.values)
[docs] def to_folder(self, path, dtype="csv"): """export data to folder :param path: :param dtype: :return: """ if dtype not in ["csv", "xlsx"]: dtype = "xlsx" if not os.path.exists(path): try: os.makedirs(path) except OSError as exp: logger.error(exp) else: self.clear_folder(path) self.metadata.set_attr(name="class", value=self.__class__.__name__, description="object class", unit="-", dtype="str") self.metadata.set_attr(name="uuid", value=self.uuid, description="object uuid", unit="-", dtype="str") self.metadata.set_attr(name="name", value=self.name, description="object name", unit="-", dtype="str") if dtype == "csv": metadata_filepath = os.path.join(path, "metadata.csv") logger.debug("export meta csv '{}'".format(metadata_filepath)) self.metadata.to_csv(metadata_filepath) # table export if isinstance(self._table, pd.DataFrame) and len(self._table) > 0: exportpath = os.path.join(path, "{}.csv".format(self.osname)) self._table.to_csv(exportpath, index=False) if dtype == "xlsx": if not isinstance(self._table, pd.DataFrame): self.table = pd.DataFrame() exportpath = os.path.join(path, "{}.xlsx".format(self.osname)) self.to_xlsx(exportpath) # group export for data in self.group.values(): exportpath = os.path.join(path, "{}-{}".format(data.__class__.__name__.lower(), data.osname)) data.to_folder(exportpath, dtype=dtype) return path
[docs] @classmethod def from_folder(cls, path): """sdata object instance :param path: :return: """ # data = Data.from_folder(path) data = cls() if not os.path.exists(path): logger.error("from_folder error: path '{}' not exists.".format(path)) return data data.metadata = data._load_metadata(path) try: data.uuid = data.metadata.get_attr("uuid").value data.name = data.metadata.get_attr("name").value except Exception as exp: logger.error("Data.from_folder: {}".format(data.metadata.to_dict())) raise # table import files = [x for x in os.listdir(path) if not os.path.isdir(os.path.join(path, x)) and not x.startswith("metadata")] if len(files) == 1: assert len(files) == 1, "invalid number of files for Table '{}'".format(files) importpath = os.path.join(path, files[0]) print("read table {}".format(importpath)) # data._table = pd.read_csv(importpath) if not os.path.exists(path): return cls() metadata = cls._load_metadata(path) data = cls() data.metadata = metadata data.uuid = data.metadata.get_attr("uuid").value data.name = data.metadata.get_attr("name").value folders = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))] for folder in folders: subfolder = os.path.join(path, folder) data_ = data.from_folder(subfolder) subdata = data_.from_folder(subfolder) data.add_data(subdata) return data
[docs] @staticmethod def clear_folder(path): """delete subfolder in export folder :param path: path :return: None """ def is_valid(path): prefix = path.split("-")[0] if prefix in [x.lower() for x in SDATACLS.keys()]: return True else: return False subfolders = [x for x in os.listdir(path) if os.path.isdir(os.path.join(path, x))] valid_subfolders = [x for x in subfolders if is_valid(x)] for subfolder in valid_subfolders: try: subfolder = os.path.join(path, subfolder) logger.debug("clear_folder: rm {}".format(subfolder)) shutil.rmtree(subfolder) except OSError as exp: raise
@staticmethod def _load_metadata(path): """load metadata from csv :returns: Metadata instance""" metadata_filepath = os.path.join(path, "metadata.csv") if os.path.exists(metadata_filepath): metadata = Metadata().from_csv(metadata_filepath) else: metadata = Metadata() return metadata @staticmethod def _get_class_from_metadata(metadata): """get class object from metadata :returns: relevant sdata class object""" classattr = metadata.get_attr("class") if classattr is not None: sdataclassname = classattr.value sdatacls = SDATACLS.get(sdataclassname) if sdataclassname not in SDATACLS: logger.warning("unsupported cls '{}'".format(sdataclassname)) sdatacls = Data else: logger.warning("cls not defined '{}'".format(metadata)) sdatacls = None return sdatacls @property def osname(self): """:returns: os compatible name (ascii?)""" return self.asciiname.lower() @property def asciiname(self): name = copy.copy(self.name) mapper = [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("Ä", "Ae"), ("Ö", "Oe"), ("Ü", "Ue"), ("ß", "sz"), (" ", "_"), ("/", "_"), ("\\", "_")] for k, v in mapper: name = name.replace(k, v) return name.encode('ascii', 'replace').decode("ascii")
[docs] def verify_attributes(self): """check mandatory attributes""" invalid_attrs = [] # attr_defs = ["name", "value", "dtype", "unit", "description", "required"] for attr_defs in self.ATTR_NAMES: required = attr_defs[5] if required is False: continue attr = self.metadata.get_attr(attr_defs[0]) if attr is None: invalid_attrs.append(attr_defs[0]) elif attr.value is None: invalid_attrs.append(attr.name) return invalid_attrs
def __str__(self): return f"({self.__class__.__name__} '{self.name}':{self.uuid})" __repr__ = __str__
[docs] def get_group(self): return self._group
group = property(get_group, doc="get group")
[docs] def keys(self): """get all child objects uuids :return: list of uuid's """ return list(self.group.keys())
[docs] def values(self): """get all child objects :return: list of child objects """ return list(self.group.values())
[docs] def items(self): """get all child objects :return: [(child uuid, child objects), ] """ return list(self.group.items())
[docs] def clear_group(self): """clear group dict""" self._group = OrderedDict()
[docs] def add_data(self, data): """add data, if data.name is unique""" if hasattr(data, "metadata"): names = [dat.name.lower() for uid, dat in self.group.items()] if data.name.lower() in names: logger.error("{}: name '{}' aready exists".format(data.__class__.__name__, data.name)) return self.group[data.uuid] = data else: logger.warning(f"ignore data {data}, {data.__class__.__name__} (wrong type!)")
[docs] def get_data_by_uuid(self, uid): """get data by uuid""" return self.group.get(uid)
[docs] def get_data_by_name(self, name): """:return obj by name""" d = dict([(obj.name, uid) for uid, obj in self.group.items()]) uid = d.get(name) return self.get_data_by_uuid(uid)
[docs] def tree_folder(self, dir, padding=" ", print_files=True, hidden_files=False, last=True): """print tree folder structure""" if last is False: print(padding[:-1] + '├─' + os.path.basename(os.path.abspath(dir))) else: print(padding[:-1] + '└─' + os.path.basename(os.path.abspath(dir))) padding = padding + ' ' files = [] if print_files: files = [x for x in sorted(os.listdir(dir)) if not x.startswith(".")] else: files = [x for x in sorted(os.listdir(dir)) if os.path.isdir(dir + os.sep + x)] # metadata first metafiles = [f for f in files if f.startswith("metadata")] files = [x for x in files if x not in metafiles] files = metafiles + sorted(files) for count, file in enumerate(sorted(files)): # print(padding + '|') path = dir + os.sep + file if os.path.isdir(path): if count == (len(files) - 1): self.tree_folder(path, padding + ' ', print_files, last=True) else: self.tree_folder(path, padding + '|', print_files, last=False) else: if count == (len(files) - 1): print(padding + '└─' + file) else: print(padding + '├─' + file)
[docs] def dir(self): """returns a nested list of all child objects :return: list of sdata.Data objects """ return [(x.name, x.dir()) for x in self.group.values()]
[docs] def to_xlsx_byteio(self): """get xlsx as byteio :return: BytesIO """ def adjust_col_width(sheetname, df, writer, width=40): worksheet = writer.sheets[sheetname] # pull worksheet object worksheet.set_column(0, 0, width) for idx, col in enumerate(df): # loop through all columns # series = df[col] # max_len = max(( # series.astype(str, raise_on_error=False).map(len).max(), # len of largest item # len(str(series.name)) # len of column name/header # )) + 1 # adding a little extra space worksheet.set_column(idx + 1, idx + 1, width) output = BytesIO() writer = pd.ExcelWriter(output, engine='xlsxwriter') self.metadata.df.to_excel(writer, sheet_name='metadata') adjust_col_width('metadata', self.metadata.df, writer) self.df.to_excel(writer, sheet_name='table') adjust_col_width('table', self.table, writer, width=15) df_description = pd.DataFrame(self.description.splitlines()) df_description.to_excel(writer, sheet_name='description', index=False, header=None) adjust_col_width('description', df_description, writer, width=200) writer.save() processed_data = output.getvalue() return processed_data
[docs] def to_xlsx_base64(self): """get xlsx as byteio base64 encoded :return: base64 """ val = self.to_xlsx_byteio() b64 = base64.b64encode(val) return b64
[docs] def to_xlsx(self, filepath=None): """export atrributes and data to excel :param filepath: :return: """ def adjust_col_width(sheetname, df, writer, width=40): worksheet = writer.sheets[sheetname] # pull worksheet object worksheet.set_column(0, 0, width) for idx, col in enumerate(df): # loop through all columns # series = df[col] # max_len = max(( # series.astype(str, raise_on_error=False).map(len).max(), # len of largest item # len(str(series.name)) # len of column name/header # )) + 1 # adding a little extra space worksheet.set_column(idx + 1, idx + 1, width) with pd.ExcelWriter(filepath) as writer: # metadata # dfm = pd.DataFrame.from_dict(self.metadata, orient="index", columns=["value"]) dfm = self.metadata.to_dataframe() # dfm = dfm.sort_index() dfm.index.name = "key" dfm.to_excel(writer, sheet_name='metadata', index=False) adjust_col_width('metadata', dfm, writer) # data if self.table is not None: if self._table.index.name is None: self._table.index.name = "index" self.table.to_excel(writer, sheet_name='table') adjust_col_width('table', self.table, writer, width=15) else: df = pd.DataFrame() df.index.name = "index" df.to_excel(writer, sheet_name='table') adjust_col_width('table', df, writer, width=15) df_description = pd.DataFrame(self.description.splitlines()) df_description.to_excel(writer, sheet_name='description', index=False, header=None) adjust_col_width('description', df_description, writer, width=200)
# # raw data # self.df_raw.index.name = "index" # self.df_raw.to_excel(writer, sheet_name='df_raw') # adjust_col_width('df_raw', self.df_raw, writer, width=15)
[docs] @classmethod def from_xlsx(cls, filepath): """save table as xlsx :param filepath: :return: """ try: if os.path.exists(filepath): wb = openpyxl.load_workbook(filename=filepath) sheetnames = wb.sheetnames tt = cls(name=filepath) # read df if "table" in sheetnames: tt.table = pd.read_excel(filepath, sheet_name="table", index_col=0) else: logger.info("no table data in '{}'".format(filepath)) dfm = pd.read_excel(filepath, sheet_name="metadata") dfm = dfm.set_index(dfm.name.values) # dfm["value"] = dfm["value"].replace(np.nan, None) dfm["description"] = dfm["description"].replace(np.nan, '') dfm["label"] = dfm["label"].replace(np.nan, '') # print("!data.from_xlsx", dfm) tt.metadata = tt.metadata.from_dataframe(dfm) # read description if "description" in sheetnames: cells = [] for cell in wb["description"]["A"]: if cell.value is not None: cells.append(cell.value) else: cells.append("") tt.description = "\n".join(cells) else: logger.info("no description in '{}'".format(filepath)) return tt else: raise Exception("excel file '{}' not available".format(filepath)) except Exception as exp: raise
[docs] def to_json(self, filepath=None): """export Data in json format :param filepath: export file path (default:None) :return: json str """ if self.table is not None: json_table = self.table.to_dict() else: json_table = {} j = {"metadata": self.metadata.to_dict(), "table": json_table, "description": self.description } if filepath: with open(filepath, "w") as fh: json.dump(j, fh) else: return json.dumps(j)
[docs] @classmethod def from_json(cls, s=None, filepath=None): """create Data from json str or file :param s: json str :param filepath: :return: sdata.Data """ data = cls(name="N.N.") if s is None and filepath is not None: with open(filepath, "r") as fh: d = json.load(fh) elif s is None and filepath is None: logger.error("data.from_json: no json data available") return elif s is not None and filepath is None: d = json.loads(s) else: logger.error("data.from_json: unexpected error") d = None if d: if "metadata" in d.keys(): data.metadata.update_from_dict(d["metadata"]) else: logger.error("Data.from_json: table not available") if "table" in d.keys(): data.table = pd.DataFrame.from_dict(d["table"]) # data.table = pd.read_json(json.dumps(d["table"])) # data.table = pd.read_json(d["table"]) else: logger.error("Data.from_json: metadata not available") if "description" in d.keys(): data.description = d["description"] else: logger.error("Data.from_json: description not available") return data
[docs] @classmethod def from_url(cls, url=None, stype=None): """create Data from json str or file :param url: url :param stype: "json" ("xlsx", "csv") :return: sdata.Data """ supported_stypes = ["json"] if stype not in supported_stypes: raise NotADirectoryError("stype '{}' is not supported".format(stype)) return raw = requests.get(url).text if stype=="json": data = cls.from_json(raw) return data
[docs] def to_csv(self, filepath=None): """export sdata.Data to csv :param filepath: :return: """ exportlines = [] exportlines.append(self.metadata.to_csv_header(prefix="#;", sep=";", filepath=None)) if self.df is not None: exportlines.append(self.df.to_csv(sep=";")) exportstr = "".join(exportlines) if filepath is None: return exportstr else: with open(filepath, "w") as fh: fh.write(exportstr)
[docs] @classmethod def from_csv(cls, s=None, filepath=None, sep=";"): """import sdata.Data from csv :param s: csv str :param filepath: :param sep: separator (default=";") :return: sdata.Data """ data = cls() if filepath: df = pd.read_csv(filepath, sep=";", comment="#", index_col=0) sio = open(filepath, "r") elif s is not None: sio = StringIO(s) pd.read_csv(sio, sep=";", comment="#") sio.seek(0) else: logger.error("data.from_csv: no csv data available") raise attritute_list = [] for line in sio: if line.startswith("#;"): line = line.rstrip("\n") line = line.split(sep) attritute_list.append(line[1:8]) data.metadata = Metadata.from_list(attritute_list) data.table = df return data
[docs] def to_hdf5(self, filepath, **kwargs): """export sdata.Data to hdf5 :param filepath: :param complib: default='zlib' ['zlib', 'lzo', 'bzip2', 'blosc', 'blosc:blosclz', 'blosc:lz4', 'blosc:lz4hc', 'blosc:snappy', 'blosc:zlib', 'blosc:zstd'] :param complevel: default=9 [0-9] :return: """ if not isinstance(self.df, pd.DataFrame): df = pd.DataFrame() else: df = self.df kwargs["mode"] = "w" if kwargs.get("complib") is None: kwargs["complib"] = "zlib" if kwargs.get("complevel") is None: kwargs["complevel"] = 9 with pd.HDFStore(filepath, **kwargs) as hdf: hdf.put('metadata'.format(self.uuid), self.metadata.df, format='fixed', data_columns=True) hdf.put('table'.format(self.uuid), df, format='fixed', data_columns=True) hdf.put('description'.format(self.uuid), self.description_to_df(), format='fixed', data_columns=True)
[docs] def to_sqlite(self, filepath, **kwargs): """export sdata.Data to sqlite :param filepath: :param kwargs: :return: """ if not isinstance(self.df, pd.DataFrame): df = pd.DataFrame() else: df = self.df df.columns = df.columns.astype(str) with SqliteDict(filepath, autocommit=True) as sqdict: sqdict['metadata'] = self.metadata.df.to_parquet() sqdict['table'] = df.to_parquet() description = self.description_to_df().copy() description.columns = description.columns.astype(str) sqdict['description'] = description.to_parquet()
[docs] @classmethod def from_sqlite(cls, filepath, **kwargs): """import sdata.Data from sqlite :param filepath: :param kwargs: :return: sdata.Data """ with SqliteDict(filepath) as sqdict: metadata = sdata.Metadata.from_dataframe(pd.read_parquet(BytesIO(sqdict['metadata']))) df_table = pd.read_parquet(BytesIO(sqdict['table'])) df_description = pd.read_parquet(BytesIO(sqdict['description'])) data = Data(metadata=metadata, table=df_table) data.description_from_df(df_description) return data
[docs] @classmethod def metadata_from_hdf5(cls, filepath, **kwargs): """import sdata.Data.Metadata from hdf5 :param filepath: :return: sdata.Data """ if not os.path.exists: logger.error("hdf5 file '{}' not available".format(filepath)) return with pd.HDFStore(filepath, mode="r+") as hdf: metadata_path = "/metadata".format(uuid) df_metadata = hdf.get(metadata_path) metadata = Metadata.from_dataframe(df_metadata) return metadata
[docs] @classmethod def from_hdf5(cls, filepath, **kwargs): """import sdata.Data from hdf5 :param filepath: :return: sdata.Data """ if not os.path.exists: logger.error("hdf5 file '{}' not available".format(filepath)) return with pd.HDFStore(filepath, mode="r+") as hdf: metadata_path = "/metadata".format(uuid) table_path = "/table".format(uuid) description_path = "/description".format(uuid) df_metadata = hdf.get(metadata_path) df_table = hdf.get(table_path) df_description = hdf.get(description_path) metadata = Metadata.from_dataframe(df_metadata) # logger.debug("hdf {}".format(metadata.get("!sdata_uuid").value)) data = Data(metadata=metadata, table=df_table) data.description_from_df(df_description) return data
[docs] def to_html(self, filepath, xlsx=True, style=None): """export Data to html :param filepath: :param xlsx: :param style: :return: """ table_values = self.df.head() table_headers = self.df.columns table_description_values = self.df.describe() table_description_headers = self.df.describe().columns metadata_values = self.metadata.df.head().values metadata_headers = self.metadata.df.columns if xlsx is True: xlsx_tag = self.get_download_link() else: xlsx_tag = "" param = {"title":"{0} [{1}]".format(self.osname, self.uuid), "description":self.description, "metadata": tabulate(metadata_values, metadata_headers, tablefmt="html"), "table": tabulate(table_values, table_headers, tablefmt="html"), "table_description": tabulate(table_description_values, table_description_headers, tablefmt="html"), "xlsx_tag":xlsx_tag, "sdata":"created with sdata v{}.".format(__version__), "now":"{}".format(now_utc_str()), } tmpl = """<!DOCTYPE html> <html lang="de"> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{title}</title> <style> h1 {{ background-color: #00FFFF77; color: black; }} h2 {{ background-color: #00FFFF44; color: black; }} h3 {{ background-color: #00FFFF11; color: black; }} p {{ color: black; }} table, th, td, caption {{ border: 1px solid #a0a0a0; }} table {{ border-collapse: collapse; border-spacing: 1em; border-width: thin 0 0 thin; margin: 0 0 1em; table-layout: auto; max-width: 100%; text-align: right; }} th, td {{ font-weight: normal; text-align: left; border-spacing: 1em; padding: .1em .3em; }} th, caption {{ background-color: #f1f3f4; font-weight: 700; }} </style> </head> <body> <h1>{title}</h1> <h2>Download</h2> <p">{xlsx_tag}</p> <h2>Description</h2> <p>{description}</p> <h2>Metadata</h2> {metadata} <h2>Table</h2> {table} <h3>Table Description</h3> {table_description} <p>{sdata}</p> <p>{now}</p> </body> </html>""".format(**param) try: with open(filepath, "w") as fh: fh.write(tmpl) except Exception as exp: raise
[docs] def copy(self, **kwargs): """create a copy of the Data object .. code-block:: python data = sdata.Data(name="data", uuid="38b26864e7794f5182d38459bab85842", description="this is remarkable") datac = data.copy() print("data {0.uuid}".format(data)) print("datac {0.uuid}".format(datac)) print("datac.metadata['!sdata_parent'] {0.value}".format(datac.metadata["sdata_parent"])) .. code-block:: data 38b26864e7794f5182d38459bab85842 datac 2c4eb15900af435d8cd9c8573ca777e2 datac.metadata['!sdata_parent'] 38b26864e7794f5182d38459bab85842 :return: Data """ data = copy.deepcopy(self) data.metadata.add(self.SDATA_PARENT, self.uuid) data.metadata.add(self.SDATA_UUID, self.gen_uuid()) data.metadata.add(self.SDATA_MTIME, now_utc_str(), dtype="str") if "uuid" in kwargs: data.uuid = kwargs.get("uuid") if "name" in kwargs: data.name = kwargs.get("name") logger.debug(f"make copy of {self.uuid} -> {data.uuid}") return data
[docs] def gen_uuid(self): """generate new uuid string :return: str, e.g. '5fa04a3738e4431dbc34eccea5e795c4' """ return uuid.uuid4().hex
[docs] def refactor(self, fix_columns=True, add_table_metadata=True): """helper function * to cleanup dataframe column name * to define Attributes for all dataframe columns """ if isinstance(self.table, pd.DataFrame): mapper = {} for old_colname in self.table.columns: name, unit = extract_name_unit(old_colname) if fix_columns: mapper[old_colname] = name if add_table_metadata: old_attr = self.metadata.get(old_colname) if old_attr: logger.info("skip: {}".format(old_attr)) self.metadata.relabel(old_colname, name) else: self.metadata.add(name=name, description=old_colname, unit=unit, dtype="float") self.table.rename(columns=mapper, inplace=True)
SDATACLS = {"Data": Data, } # # __all__ = ["Data", Data]