diff --git a/hyppopy/deepdict/deepdict.py b/hyppopy/deepdict/deepdict.py index 3550c38..6b41db5 100644 --- a/hyppopy/deepdict/deepdict.py +++ b/hyppopy/deepdict/deepdict.py @@ -1,372 +1,382 @@ # -*- coding: utf-8 -*- # # DKFZ # # # Copyright (c) German Cancer Research Center, # Division of Medical and Biological Informatics. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE.txt or http://www.mitk.org for details. # # Author: Sven Wanner (s.wanner@dkfz.de) import os import json import types import pprint import xmltodict from dicttoxml import dicttoxml from collections import OrderedDict import logging LOG = logging.getLogger('hyppopy') def convert_ordered2std_dict(obj): """ Helper function converting an OrderedDict into a standard lib dict. :param obj: [OrderedDict] """ for key, value in obj.items(): if isinstance(value, OrderedDict): obj[key] = dict(obj[key]) convert_ordered2std_dict(obj[key]) def check_dir_existance(dirname): """ Helper function to check if a directory exists, creating it if not. :param dirname: [str] full path of the directory to check """ if not os.path.exists(dirname): os.mkdir(dirname) class DeepDict(object): """ The DeepDict class represents a nested dictionary with additional functionality compared to a standard lib dict. The data can be accessed and changed vie a pathlike access and dumped or read to .json/.xml files. Initializing instances using defaults creates an empty DeepDict. Using in_data enables to initialize the object instance with data, where in_data can be a dict, or a filepath to a json or xml file. Using path sep the appearance of path passing can be changed, a default data access via path would look like my_dd['target/section/path'] with path_sep='.' like so my_dd['target.section.path'] :param in_data: [dict] or [str], input dict or filename :param path_sep: [str] path separator character """ _data = None _sep = "/" def __init__(self, in_data=None, path_sep="/"): self.clear() self._sep = path_sep LOG.debug(f"path separator is: {self._sep}") if in_data is not None: if isinstance(in_data, str): self.from_file(in_data) elif isinstance(in_data, dict): self.data = in_data def __str__(self): """ Enables print output for class instances, printing the instance data dict using pretty print :return: [str] """ return pprint.pformat(self.data) def __eq__(self, other): """ Overloads the == operator comparing the instance data dictionaries for equality :param other: [DeepDict] rhs :return: [bool] """ return self.data == other.data def __getitem__(self, path): """ Overloads the return of the [] operator for data access. This enables access the DeepDict instance like so: my_dd['target/section/path'] or my_dd[['target','section','path']] :param path: [str] or [list(str)], the path to the target data structure level/content :return: [object] """ return DeepDict.get_from_path(self.data, path, self.sep) def __setitem__(self, path, value=None): """ Overloads the setter for the [] operator for data assignment. :param path: [str] or [list(str)], the path to the target data structure level/content :param value: [object] rhs assignment object """ if isinstance(path, str): path = path.split(self.sep) if not isinstance(path, list) or isinstance(path, tuple): raise IOError("Input Error, expect list[str] type for path") if len(path) < 1: raise IOError("Input Error, missing section strings") if not path[0] in self._data.keys(): if value is not None and len(path) == 1: self._data[path[0]] = value else: self._data[path[0]] = {} tmp = self._data[path[0]] path.pop(0) while True: if len(path) == 0: break if path[0] not in tmp.keys(): if value is not None and len(path) == 1: tmp[path[0]] = value else: tmp[path[0]] = {} tmp = tmp[path[0]] else: tmp = tmp[path[0]] path.pop(0) + def __len__(self): + return len(self._data) + def clear(self): """ clears the instance data """ LOG.debug("clear()") self._data = {} def from_file(self, fname): """ Loads data from file. Currently implemented .json and .xml file reader. :param fname: [str] filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") if fname.endswith(".json"): self.read_json(fname) elif fname.endswith(".xml"): self.read_xml(fname) else: LOG.error("Unknown filetype, expect [.json, .xml]") raise NotImplementedError("Unknown filetype, expect [.json, .xml]") def read_json(self, fname): """ Read json file :param fname: [str] input filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") if not os.path.isfile(fname): raise IOError(f"File {fname} not found!") LOG.debug(f"read_json({fname})") try: with open(fname, "r") as read_file: self._data = json.load(read_file) DeepDict.value_traverse(self.data, callback=DeepDict.parse_type) except Exception as e: LOG.error(f"Error while reading json file {fname} or while converting types") raise IOError("Error while reading json file {fname} or while converting types") def read_xml(self, fname): """ Read xml file :param fname: [str] input filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") if not os.path.isfile(fname): raise IOError(f"File {fname} not found!") LOG.debug(f"read_xml({fname})") try: with open(fname, "r") as read_file: xml = "".join(read_file.readlines()) self._data = xmltodict.parse(xml, attr_prefix='') DeepDict.value_traverse(self.data, callback=DeepDict.parse_type) except Exception as e: LOG.error(f"Error while reading xml file {fname} or while converting types") raise IOError("Error while reading json file {fname} or while converting types") # if written with DeepDict, the xml contains a root node called # deepdict which should beremoved for consistency reasons if "deepdict" in self._data.keys(): self._data = self._data["deepdict"] self._data = dict(self.data) # convert the orderes dict structure to a default dict for consistency reasons convert_ordered2std_dict(self.data) def to_file(self, fname): """ Write to file, type is determined by checking the filename ending. Currently implemented is writing to json and to xml. :param fname: [str] filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") if fname.endswith(".json"): self.write_json(fname) elif fname.endswith(".xml"): self.write_xml(fname) else: LOG.error(f"Unknown filetype, expect [.json, .xml]") raise NotImplementedError("Unknown filetype, expect [.json, .xml]") def write_json(self, fname): """ Dump data to json file. :param fname: [str] filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") check_dir_existance(os.path.dirname(fname)) try: LOG.debug(f"write_json({fname})") with open(fname, "w") as write_file: json.dump(self.data, write_file) except Exception as e: LOG.error(f"Failed dumping to json file: {fname}") raise e def write_xml(self, fname): """ Dump data to json file. :param fname: [str] filename """ if not isinstance(fname, str): raise IOError("Input Error, expect str type for fname") check_dir_existance(os.path.dirname(fname)) xml = dicttoxml(self.data, custom_root='deepdict', attr_type=False) LOG.debug(f"write_xml({fname})") try: with open(fname, "w") as write_file: write_file.write(xml.decode("utf-8")) except Exception as e: LOG.error(f"Failed dumping to xml file: {fname}") raise e + def has_section(self, section): + return DeepDict.has_key(self.data, section) + @staticmethod def get_from_path(data, path, sep="/"): """ Implements a nested dict access via a path like string like so path='target/section/path' which is equivalent to my_dict['target']['section']['path']. :param data: [dict] input dictionary :param path: [str] pathlike string :param sep: [str] path separator, default='/' :return: [object] """ if not isinstance(data, dict): LOG.error("Input Error, expect dict type for data") raise IOError("Input Error, expect dict type for data") if isinstance(path, str): path = path.split(sep) if not isinstance(path, list) or isinstance(path, tuple): LOG.error(f"Input Error, expect list[str] type for path: {path}") raise IOError("Input Error, expect list[str] type for path") - if not DeepDict.has_section(data, path[-1]): + if not DeepDict.has_key(data, path[-1]): LOG.error(f"Input Error, section {path[-1]} does not exist in dictionary") raise IOError(f"Input Error, section {path[-1]} does not exist in dictionary") - for k in path: - data = data[k] + try: + for k in path: + data = data[k] + except Exception as e: + LOG.error(f"Failed retrieving data from path {path} due to {e}") + raise LookupError(f"Failed retrieving data from path {path} due to {e}") return data @staticmethod - def has_section(data, section, already_found=False): + def has_key(data, section, already_found=False): """ Checks if input dictionary has a key called section. The already_found parameter is for internal recursion checks. :param data: [dict] input dictionary :param section: [str] key string to search for :param already_found: recursion criteria check :return: [bool] section found """ if not isinstance(data, dict): LOG.error("Input Error, expect dict type for obj") raise IOError("Input Error, expect dict type for obj") if not isinstance(section, str): LOG.error(f"Input Error, expect dict type for obj {section}") raise IOError(f"Input Error, expect dict type for obj {section}") if already_found: return True found = False for key, value in data.items(): if key == section: found = True if isinstance(value, dict): - found = DeepDict.has_section(data[key], section, found) + found = DeepDict.has_key(data[key], section, found) return found @staticmethod def value_traverse(data, callback=None): """ Dictionary filter function, walks through the input dict (obj) calling the callback function for each value. The callback function return is assigned the the corresponding dict value. :param data: [dict] input dictionary :param callback: """ if not isinstance(data, dict): LOG.error("Input Error, expect dict type for obj") raise IOError("Input Error, expect dict type for obj") if not isinstance(callback, types.FunctionType): LOG.error("Input Error, expect function type for callback") raise IOError("Input Error, expect function type for callback") for key, value in data.items(): if isinstance(value, dict): DeepDict.value_traverse(data[key], callback) else: data[key] = callback(value) @staticmethod def parse_type(string): """ Type convert input string to float, int, list, tuple or string :param string: [str] input string :return: [T] converted output """ try: a = float(string) try: b = int(string) except ValueError: return float(string) if a == b: return b return a except ValueError: if string.startswith("[") and string.endswith("]"): elements = string[1:-1].split(",") li = [] for e in elements: li.append(DeepDict.parse_type(e)) return li elif string.startswith("(") and string.endswith(")"): elements = string[1:-1].split(",") li = [] for e in elements: li.append(DeepDict.parse_type(e)) return tuple(li) return string @property def data(self): return self._data @data.setter def data(self, value): if not isinstance(value, dict): LOG.error(f"Input Error, expect dict type for value, but got {type(value)}") raise IOError(f"Input Error, expect dict type for value, but got {type(value)}") self.clear() self._data = value @property def sep(self): return self._sep @sep.setter def sep(self, value): if not isinstance(value, str): LOG.error(f"Input Error, expect str type for value, but got {type(value)}") raise IOError(f"Input Error, expect str type for value, but got {type(value)}") self._sep = value diff --git a/hyppopy/settings.py b/hyppopy/settings.py index 8ae7af6..9da8907 100644 --- a/hyppopy/settings.py +++ b/hyppopy/settings.py @@ -1,34 +1,33 @@ # DKFZ # # Copyright (c) German Cancer Research Center, # Division of Medical and Biological Informatics. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE.txt or http://www.mitk.org for details. # -*- coding: utf-8 -*- import os import sys import logging ROOT = os.path.join(os.path.dirname(__file__), "..") LOGFILENAME = os.path.join(ROOT, 'logfile.log') PLUGIN_DEFAULT_DIR = os.path.join(ROOT, *("hyppopy", "solver_plugins")) sys.path.insert(0, ROOT) -#LOG = logging.getLogger() logging.getLogger('hyppopy').setLevel(logging.DEBUG) logging.basicConfig(filename=LOGFILENAME, filemode='w', format='%(name)s - %(levelname)s - %(message)s') ''' LOG.debug('debug message') LOG.info('info message') LOG.warning('warning message') LOG.error('error message') LOG.critical('critical message') ''' diff --git a/hyppopy/tests/test_deepdict.py b/hyppopy/tests/test_deepdict.py index d8986a7..2cabd51 100644 --- a/hyppopy/tests/test_deepdict.py +++ b/hyppopy/tests/test_deepdict.py @@ -1,147 +1,150 @@ # -*- coding: utf-8 -*- # # DKFZ # # # Copyright (c) German Cancer Research Center, # Division of Medical and Biological Informatics. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE.txt or http://www.mitk.org for details. # # Author: Sven Wanner (s.wanner@dkfz.de) import os import unittest from hyppopy.deepdict.deepdict import DeepDict DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") class DeepDictTestSuite(unittest.TestCase): def setUp(self): self.test_data = { 'widget': { 'debug': 'on', 'image': {'alignment': 'center', 'hOffset': 250, 'name': 'sun1', 'src': 'Images/Sun.png', 'vOffset': 250}, 'text': {'alignment': 'center', 'data': 'Click Here', 'hOffset': 250, 'name': 'text1', 'onMouseUp': 'sun1.opacity = (sun1.opacity / 100) * 90;', 'size': 36, 'style': 'bold', 'vOffset': 100}, 'window': {'height': 500, 'name': 'main_window', 'title': 'Sample Konfabulator Widget', 'width': 500} } } self.test_data2 = {"test": { "section": { "var1": 100, "var2": 200 } }} def test_fileIO(self): dd_json = DeepDict(os.path.join(DATA_PATH, 'test_json.json')) dd_xml = DeepDict(os.path.join(DATA_PATH, 'test_xml.xml')) dd_dict = DeepDict(self.test_data) self.assertTrue(list(self.test_data.keys())[0] == list(dd_json.data.keys())[0]) self.assertTrue(list(self.test_data.keys())[0] == list(dd_xml.data.keys())[0]) self.assertTrue(list(self.test_data.keys())[0] == list(dd_dict.data.keys())[0]) for key in self.test_data['widget'].keys(): self.assertTrue(self.test_data['widget'][key] == dd_json.data['widget'][key]) self.assertTrue(self.test_data['widget'][key] == dd_xml.data['widget'][key]) self.assertTrue(self.test_data['widget'][key] == dd_dict.data['widget'][key]) for key in self.test_data['widget'].keys(): if key == 'debug': self.assertTrue(dd_json.data['widget']["debug"] == "on") self.assertTrue(dd_xml.data['widget']["debug"] == "on") self.assertTrue(dd_dict.data['widget']["debug"] == "on") else: for key2, value2 in self.test_data['widget'][key].items(): self.assertTrue(value2 == dd_json.data['widget'][key][key2]) self.assertTrue(value2 == dd_xml.data['widget'][key][key2]) self.assertTrue(value2 == dd_dict.data['widget'][key][key2]) dd_dict.to_file(os.path.join(DATA_PATH, 'write_to_json_test.json')) dd_dict.to_file(os.path.join(DATA_PATH, 'write_to_xml_test.xml')) self.assertTrue(os.path.isfile(os.path.join(DATA_PATH, 'write_to_json_test.json'))) self.assertTrue(os.path.isfile(os.path.join(DATA_PATH, 'write_to_xml_test.xml'))) dd_json = DeepDict(os.path.join(DATA_PATH, 'write_to_json_test.json')) dd_xml = DeepDict(os.path.join(DATA_PATH, 'write_to_xml_test.xml')) self.assertTrue(dd_json == dd_dict) self.assertTrue(dd_xml == dd_dict) try: os.remove(os.path.join(DATA_PATH, 'write_to_json_test.json')) os.remove(os.path.join(DATA_PATH, 'write_to_xml_test.xml')) except Exception as e: print(e) print("Warning: Failed to delete temporary data during tests!") def test_has_section(self): dd = DeepDict(self.test_data) self.assertTrue(DeepDict.has_section(dd.data, 'hOffset')) self.assertTrue(DeepDict.has_section(dd.data, 'window')) self.assertTrue(DeepDict.has_section(dd.data, 'widget')) self.assertFalse(DeepDict.has_section(dd.data, 'notasection')) def test_data_access(self): dd = DeepDict(self.test_data) self.assertEqual(dd['widget/window/height'], 500) self.assertEqual(dd['widget/image/name'], 'sun1') self.assertTrue(isinstance(dd['widget/window'], dict)) self.assertEqual(len(dd['widget/window']), 4) dd = DeepDict(path_sep=".") dd.data = self.test_data self.assertEqual(dd['widget.window.height'], 500) self.assertEqual(dd['widget.image.name'], 'sun1') self.assertTrue(isinstance(dd['widget.window'], dict)) self.assertEqual(len(dd['widget.window']), 4) def test_data_adding(self): dd = DeepDict() dd["test/section/var1"] = 100 dd["test/section/var2"] = 200 self.assertTrue(dd.data == self.test_data2) dd = DeepDict() dd["test"] = {} dd["test/section"] = {} dd["test/section/var1"] = 100 dd["test/section/var2"] = 200 self.assertTrue(dd.data == self.test_data2) def test_sample_space(self): dd = DeepDict(os.path.join(DATA_PATH, 'test_paramset.json')) self.assertEqual(len(dd[['parameter', 'activation', 'data']]), 4) self.assertEqual(dd['parameter/activation/data'], ['ReLU', 'tanh', 'sigm', 'ELU']) self.assertTrue(isinstance(dd['parameter/activation/data'], list)) self.assertTrue(isinstance(dd['parameter/activation/data'][0], str)) self.assertEqual(dd['parameter/layerdepth/data'], [3, 20]) self.assertTrue(isinstance(dd['parameter/layerdepth/data'], list)) self.assertTrue(isinstance(dd['parameter/layerdepth/data'][0], int)) self.assertTrue(isinstance(dd['parameter/learningrate/data'][0], float)) self.assertEqual(dd['parameter/learningrate/data'][0], 1e-5) self.assertEqual(dd['parameter/learningrate/data'][1], 10.0) + def test_len(self): + dd = DeepDict(os.path.join(DATA_PATH, 'test_paramset.json')) + self.assertEqual(len(dd), 1) if __name__ == '__main__': unittest.main()