Source code for sdata.metadata

# -*-coding: utf-8-*-
import logging
logger = logging.getLogger("sdata")
import pandas as pd
import numpy as np
from sdata.timestamp import TimeStamp
from sdata import __version__
import json
import os
import hashlib
import re
import copy
from sdata.contrib.sortedcontainers.sorteddict import SortedDict

[docs]def extract_name_unit(value): """extract name and unit from a combined string .. code-block:: python value: 'Target Strain Rate (1/s) ' name : 'Target Strain Rate' unit : '1/s' value: 'Gauge Length [mm] monkey ' name : 'Gauge Length' unit : 'mm' value: 'Gauge Length <mm> whatever ' name : 'Gauge Length' unit : 'mm' :param value: string, e.g. 'Length <mm> whatever' :return: name, unit """ pattern1 = r'([\w\s\.]+) \(([\w.-\/]+)\)' match1 = re.search(pattern1, value) pattern2 = r'([\w\s\.]+) \[([\w.-\/]+)\]' match2 = re.search(pattern2, value) pattern3 = r'([\w\s\.]+) \<([\w.-\/]+)\>' match3 = re.search(pattern3, value) if match1: name = match1.group(1) unit = match1.group(2) elif match2: name = match2.group(1) unit = match2.group(2) elif match3: name = match3.group(1) unit = match3.group(2) else: name = value unit = "" return name, unit
[docs]class Attribute(object): """Attribute class""" DTYPES = {'float': float, 'int': int, 'str': str, 'timestamp': TimeStamp, "bool": bool} def __init__(self, name, value, **kwargs): """Attribute :param name :param value :param dtype ['float', 'int', 'str', 'timestamp', 'uuid?', 'unicode?'] :param description :param dimension e.g. force, length, strain, count, energy :param unit :param label :param required """ self._name = None self._value = None self._unit = "-" self._description = "" self._label = "" self._dtype = None self.name = name self._set_dtype(kwargs.get("dtype", None)) self._set_description(kwargs.get("description", "")) self._set_label(kwargs.get("label", "")) self._set_unit(kwargs.get("unit", "-")) self._set_required(kwargs.get("required", False)) # set dtype first! self._set_value(value) def _get_name(self): return self._name def _set_name(self, value): if isinstance(value, str): try: value = value.strip()[:256] if len(value) > 0: self._name = value else: raise ValueError("empty Attribute.name") except ValueError as exp: logger.warning("error Attribute.name: %s" % exp) else: self._name = str(value).strip()[:256] name = property(fget=_get_name, fset=_set_name, doc="Attribute name") def _get_value(self): return self._value def _set_value(self, value): try: dtype = self.DTYPES.get(self.dtype, self.guess_dtype(value)) if self.dtype != dtype.__name__: # logger.debug("guess dtype for ``: ``".format(value, dtype.__name__)) self.dtype = dtype.__name__ if value == "" and self.dtype in ["str"]: self._value = "" elif not value and self.dtype not in ["int", "float", "bool"]: self._value = None elif not value and self.dtype in ["int", "float"]: self._value = np.nan elif pd.isna(value) and self.dtype in ["int", "float"]: self._value = np.nan elif dtype.__name__ == "bool" and value not in [1, "1", "true", "True"]: self._value = False elif dtype.__name__ == "bool" and value in [1, "1", "true", "True"]: self._value = True else: self._value = dtype(value) except ValueError as exp: logger.error("error Attribute.value: {}".format(exp)) value = property(fget=_get_value, fset=_set_value, doc="Attribute value")
[docs] @staticmethod def guess_dtype(value): """returns dtype class :param value: :return: __class__ """ if isinstance(value, (int, np.int)): return value.__class__ elif isinstance(value, (float, np.float)): return value.__class__ elif isinstance(value, (str)): return value.__class__ else: return str
def _get_dtype(self): return self._dtype def _set_dtype(self, value): """set dtype str s :param value: :return: """ if value is None: return None elif "float" in value: value = "float" elif "int" in value: value = "int" if value in self.DTYPES.keys(): self._dtype = value # todo: cast self.value to new dtype # if self._value is not None: # try: # self._value = self.DTYPES[self.dtype](self.value) # except Exception as exp: # logger.error("_set_dtype:{}:{}-{}".format(self.dtype, exp, exp.__class__.__name__)) dtype = property(fget=_get_dtype, fset=_set_dtype, doc="Attribute type str") def _get_description(self): return self._description def _set_description(self, value): if value is None: value = "" self._description = str(value) description = property(fget=_get_description, fset=_set_description, doc="Attribute description") def _get_label(self): return self._label def _set_label(self, value): if value is None: value = "" self._label = str(value) label = property(fget=_get_label, fset=_set_label, doc="Attribute label") def _get_unit(self): return self._unit def _set_unit(self, value): self._unit = value unit = property(fget=_get_unit, fset=_set_unit, doc="Attribute unit") def _get_required(self): return self._required def _set_required(self, value): if value in [True, 1, "true", "True"]: self._required = True else: self._required = False required = property(fget=_get_required, fset=_set_required, doc="Attribute required")
[docs] def to_dict(self): """:returns dict of attribute items""" return {'name': self.name, 'value': self.value, 'unit': self.unit, 'dtype': self.dtype, 'description': self.description, 'label': self.label, 'required': self.required, }
[docs] def to_list(self): return [self.name, self.value, self.unit, self.dtype, self.description, self.label, self.required]
[docs] def to_csv(self, prefix="", sep=",", quote=None): """export Attribute to csv :param prefix: :param sep: :param quote: :return: """ xs = [] for x in self.to_list(): if x is None: xs.append("") else: xs.append(str(x)) return "{}{}".format(prefix, sep.join(xs))
def __str__(self): return "(Attr'%s':%s(%s))" % (self.name, self.value, self.dtype) __repr__ = __str__
[docs]class Metadata(object): """Metadata container class each Metadata entry has has a * name (256) * value * unit * description * type (int, str, float, bool, timestamp) """ ATTRIBUTEKEYS = ["name", "value", "dtype", "unit", "description", "label", "required"] def __init__(self, **kwargs): """Metadata class :param kwargs: """ self._attributes = SortedDict() self._name = kwargs.get("name") or "N.N." def _get_name(self): return self._name def _set_name(self, value): self._name = str(value) name = property(fget=_get_name, fset=_set_name, doc="Name of the Metadata") def _get_attributes(self): return self._attributes def _set_attributes(self, value): self._attributes = value attributes = property(fget=_get_attributes, fset=_set_attributes, doc="returns Attributes") @property def user_attributes(self): attrs = [(a.name, a) for a in self.attributes.values() if not a.name.startswith("!sdata")] return SortedDict(attrs) @property def sdata_attributes(self): attrs = [(a.name, a) for a in self.attributes.values() if a.name.startswith("!sdata")] return SortedDict(attrs) @property def required_attributes(self): required_attributes = [(attr.name, attr) for attr in self.attributes.values() if attr.required is True] return SortedDict(required_attributes)
[docs] def set_attr(self, name="N.N.", value=None, **kwargs): """set Attribute""" prefix = kwargs.get("prefix", "") if isinstance(name, Attribute): attr = name # name is the Attribute! else: attr = self.get_attr(prefix + name) or Attribute(name, value, **kwargs) for key in ["dtype", "unit", "description", "label", "required"]: if key in kwargs: if key in kwargs: # print("!!!", attr, key, kwargs.get(key)) setattr(attr, key, kwargs.get(key)) if value is not None: attr.value = value self._attributes[prefix + attr.name] = attr
[docs] def get_attr(self, name): """get Attribute by name""" return self._attributes.get(name, None)
[docs] def to_dict(self): """serialize attributes to dict""" d = {} for attr in self.attributes.values(): d[attr.name] = attr.to_dict() return d
[docs] @staticmethod def guess_dtype_from_value(value): """guess dtype from value, e.g. '1.23' -> 'float' 'otto1.23' -> 'str' 1 -> 'int' False -> 'bool' :param value: :return: dtype(value), dtype ['int', 'float', 'bool', 'str'] """ if value.__class__.__name__ in ["int", "float", "bool"]: return value, value.__class__.__name__ elif value in ["False", "True", "true", "false"]: return value, 'bool' try: value = int(value) return value, value.__class__.__name__ except: pass try: value = float(value) return value, value.__class__.__name__ except: pass return str(value), "str"
[docs] def update_from_dict(self, d): """set attributes from dict :param d: dict :return: """ for k, v in d.items(): value, dtype = self.guess_dtype_from_value(v) if dtype in ["float", "int", "bool"]: v = {"name":k, "value":value, "dtype":dtype, "unit":"", "description":"", "label":"", "required":False} elif isinstance(v, (str,)): v = {"name":k, "value":v, "dtype":"str", "unit":"", "description":"", "label":"", "required":False} elif hasattr(v, "keys"): dtype = v.get("dtype", self.guess_dtype_from_value(v.get("value"))[1]) value = v.get("value") v = {"name":k, "value":value, "dtype":dtype, "unit":v.get("unit", ""), "description":v.get("description", ""), "label":v.get("label", ""), "required":v.get("required", False)} else: v, dtype = self.guess_dtype_from_value(v) v = {"name":k, "value":v, "dtype":dtype, "unit":"", "description":"", "label":"", "required":False} self.set_attr(**v)
[docs] @classmethod def from_dict(cls, d): """setup metadata from dict""" metadata = cls() metadata.update_from_dict(d) return metadata
def _to_dict(self, attributes): """ :param attributes: :return: """ d = {} for attr in attributes.values(): d[attr.name] = attr.to_dict() return d
[docs] def get_sdict(self): """get sdata attribute as dict""" d = {} for attr in self.sdata_attributes.values(): d[attr.name] = attr.value return d
[docs] def get_udict(self): """get user attribute as dict""" d = {} for attr in self.user_attributes.values(): d[attr.name] = attr.value return d
def _to_dataframe(self, attributes): """create dataframe from attributes""" d = self._to_dict(attributes) if len(d) == 0: df = pd.DataFrame(columns=self.ATTRIBUTEKEYS) else: df = pd.DataFrame.from_dict(d, orient="index") df.index.name = "key" return df[self.ATTRIBUTEKEYS]
[docs] def to_dataframe(self): """create dataframe""" return self._to_dataframe(self.attributes)
@property def df(self): """create dataframe""" return self._to_dataframe(self.attributes) @property def udf(self): """create dataframe for user attributes""" return self._to_dataframe(self.user_attributes) @property def sdf(self): """create dataframe for sdata attributes""" return self._to_dataframe(self.sdata_attributes) @property def sdft(self): """create transposed dataframe for sdata attributes""" mt = self.sdf[["value"]].transpose(copy=True) mt.index = [self.get("!sdata_uuid").value] return mt
[docs] @classmethod def from_dataframe(cls, df): """create metadata from dataframe""" d = df.to_dict(orient='index') metadata = cls.from_dict(d) return metadata
[docs] def update_from_usermetadata(self, metadata): """update user metadata from metadata""" for attribute in metadata.user_attributes.values(): self.add(attribute)
[docs] def to_csv(self, filepath=None, sep=",", header=False): """serialize to csv""" try: df = self.to_dataframe() # df.to_csv(filepath, index=None, sep=sep) return df.to_csv(filepath, index=None, sep=sep, header=header) except OSError as exp: logger.error("metadata.to_csv error: %s" % (exp))
[docs] def to_csv_header(self, prefix="#", sep=",", filepath=None): """serialize to csv""" try: lines = [] for attr in self.attributes.values(): lines.append(attr.to_csv(prefix=prefix, sep=sep)+"\n") alines = "".join(lines) if filepath: logger.info("export '{}'".format(filepath)) with open(filepath, "w") as fh: fh.write(alines) return alines except OSError as exp: logger.error("metadata.to_csv error: %s" % (exp))
[docs] @classmethod def from_csv(cls, filepath): """create metadata from dataframe""" df = pd.read_csv(filepath, header=None) df.columns = cls.ATTRIBUTEKEYS df.set_index(df.name.values, inplace=True) metadata = cls.from_dataframe(df) return metadata
[docs] def to_json(self, filepath=None): """create a json :param filepath: default None :return: json str """ d = self.to_dict() if filepath: with open(filepath, "w") as fh: json.dump(d, fh) return json.dumps(d)
[docs] @classmethod def from_json(cls,jsonstr=None, filepath=None): """create metadata from json file :param jsonstr: json str :param filepath: filepath to json file :return: Metadata """ if filepath is not None and os.path.exists(filepath): with open(filepath, "r") as fh: j = json.load(fh) metadata = cls.from_dict(j) elif jsonstr is not None: j = json.loads(jsonstr) metadata = cls.from_dict(j) return metadata
[docs] def to_list(self): """create a nested list of Attribute values :return: list """ return self.df.values.tolist()
[docs] @classmethod def from_list(cls, mlist): """create metadata from a list of Attribute values [['force_x', 1.2, 'float', 'kN', 'force in x-direction'], ['force_y', 3.1, 'float', 'N', 'force in y-direction', 'label', True]] """ metadata = cls() for alist in mlist: if len(alist) < 2: logger.error("Metadata.from_list skip {}".format(alist)) else: alist.extend(["", "", "", ""]) #["name", "value", "dtype", "unit", "description"] metadata.add(alist[0], alist[1], dtype=alist[2], unit=alist[3], description=alist[4], label=alist[5], required=alist[6]) return metadata
def __repr__(self): return "(Metadata'%s':%d)" % (self.name, len(self.attributes)) def __str__(self): return "(Metadata'%s':%d %s)" % (self.name, len(self.attributes), [x for x in self.attributes])
[docs] def add(self, name, value=None, **kwargs): """add Attribute :param name: :param value: :param kwargs: :return: """ self.set_attr(name, value, **kwargs)
[docs] def relabel(self, name, newname): """relabel Attribute :param name: old attribute name :param newname: new attribute name :return: None """ attr = self.get(name) if not attr: logger.warning("{0}: no Attribute {1} to relabel.".format(self.__class__, name)) else: attr.name = newname self.attributes.pop(name) self.add(attr)
[docs] def get(self, name, default=None): if self._attributes.get(name) is not None: return self._attributes.get(name) else: return default
[docs] def keys(self): """ :return: list of Attribute names """ return list(self._attributes.keys())
[docs] def values(self): """ :return: list of Attribute values """ return list(self._attributes.values())
[docs] def items(self): """ :return: list of Attribute items (keys, values) """ return list(self._attributes.items())
[docs] def copy(self): """returns a deep copy""" return copy.deepcopy(self)
@property def size(self): """return number uf Attribute""" return len(self.attributes) def __getitem__(self, name): return self.get(name) @property def sha3_256(self): """Return a new SHA3 hash object with a hashbit length of 32 bytes. :return: hashlib.sha3_256.hexdigest() """ s = hashlib.sha3_256() metadatastr = self.to_json().encode(errors="replace") s.update(metadatastr) return s.hexdigest()
[docs] def update_hash(self, hashobject): """A hash represents the object used to calculate a checksum of a string of information. .. code-block:: python hashobject = hashlib.sha3_256() metadata = Metadata() metadata.update_hash(hashobject) hash.hexdigest() :param hash: hash object :return: hash_function().hexdigest() """ if not (hasattr(hashobject, "update") and hasattr(hashobject, "hexdigest")): logger.error("Metadata.hash: given hashfunction is invalid") raise Exception("Metadata.hash: given hashfunction is invalid") metadatastr = self.to_json().encode(errors="replace") hashobject.update(metadatastr) return hash
[docs] def set_unit_from_name(self, add_description=True, fix_name=True): """try to extract unit from attribute name :return: """ for attr in self.user_attributes.values(): new_name, unit = extract_name_unit(attr.name) attr.unit = unit if add_description is True and len(attr.description)==0 and len(unit)>0: attr.description = attr.name if fix_name is True: self.relabel(attr.name, new_name)
[docs] def guess_value_dtype(self): """try to cast the Attribute values, e.g. str -> float :return: """ for attr in list(self.user_attributes.values()): for dtype in [float, str]: try: # attr.value = dtype(attr.value) # attr.dtype = dtype.__name__ self.add(name=attr.name, value=dtype(attr.value), dtype=dtype.__name__) # print(["ok", attr.name, attr.value, self.get(attr.name).value]) break except (ValueError, TypeError) as exp: pass
# print([attr.name, attr.value, dtype, exp])
[docs] def is_complete(self): """check all required attributes""" required_attributes = self.required_attributes.values() for attr in required_attributes: if attr.value is None or attr.value=="": return False return True