diff --git a/hyppopy/MPIBlackboxFunction.py b/hyppopy/MPIBlackboxFunction.py index b007c21..aac6eae 100644 --- a/hyppopy/MPIBlackboxFunction.py +++ b/hyppopy/MPIBlackboxFunction.py @@ -1,205 +1,189 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE __all__ = ['MPIBlackboxFunction'] import os import logging import functools -from hyppopy.globals import DEBUGLEVEL +from hyppopy.globals import DEBUGLEVEL, MPI_TAGS from mpi4py import MPI LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) def default_kwargs(**defaultKwargs): """ Decorator defining default args in **kwargs arguments """ def actual_decorator(fn): @functools.wraps(fn) def g(*args, **kwargs): defaultKwargs.update(kwargs) return fn(*args, **defaultKwargs) return g return actual_decorator class MPIBlackboxFunction(object): """ This class is a BlackboxFunction wrapper class encapsulating the loss function. Additional function pointer can be set to get access at different pipelining steps: - dataloader_func: data loading, the function must return a data object and is called first when the solver is executed. The data object returned will be the input of the blackbox function. - preprocess_func: data preprocessing is called after dataloader_func, the functions signature must be foo(data, params) and must return a data object. The input is the data object set directly or via dataloader_func, the params are passed from constructor params. - callback_func: this function is called at each iteration step getting passed the trail info content, can be used for custom visualization - data: add a data object directly The constructor accepts several function pointers or a data object which are all None by default (see below). Additionally one can define an arbitrary number of arg pairs. These are passed as input to each function pointer as arguments. :param dataloader_func: data loading function pointer, default=None :param preprocess_func: data preprocessing function pointer, default=None :param callback_func: callback function pointer, default=None :param data: data object, default=None :param kwargs: additional arg=value pairs """ @default_kwargs(blackbox_func=None, dataloader_func=None, preprocess_func=None, callback_func=None, data=None) def __init__(self, **kwargs): self._blackbox_func = None self._preprocess_func = None self._dataloader_func = None self._callback_func = None self._raw_data = None self._data = None self.setup(kwargs) def __call__(self, **kwargs): """ Call method calls blackbox_func passing the data object and the args passed :param kwargs: [dict] args :return: blackbox_func(data, kwargs) """ return self.blackbox_func(self.data, kwargs) def call_batch(self, candidates): print('batch_call') size = MPI.COMM_WORLD.Get_size() + + #RALF: Hier müssen die Kandidaten an die Worker verteilt werden, dass machst du. + # Aber auch die ergebnisse eingesammelt werden und für alle Kandidaten zurück gemeldet werden + # das wird so vom solver erwartet wenn er call_batch macht. for i, candidate in enumerate(candidates): dest = (i % (size-1)) +1 - MPI.COMM_WORLD.send(candidate, dest=dest, tag=55) + MPI.COMM_WORLD.send(candidate, dest=dest, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) # shared = {'d1': 55, 'd2': 42} # MPI.COMM_WORLD.send(shared, dest=1, tag=13) - def signal_worker_finished(self): - print('signal_worker_finished') - size = MPI.COMM_WORLD.Get_size() - for i in range(size-1): - MPI.COMM_WORLD.send(None, dest=i+1, tag=55) - - def call_worker(self): - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - print("Hello, World! I am process {}. I am waiting for param".format(rank)) - - candidates_loss = [] - while True: - param = comm.recv(source=0, tag=55) # Receive the param - - if param == None: - print(print("process {} received finish signal.".format(rank))) - return - loss = self._blackbox_func(None, param) - print('{}: param = {}, loss={}'.format(rank,param, loss)) - candidates_loss.append(loss) - + #RALF: call_worker wird hier nicht gebraucht. das passiert im SolverWrapper + # TODO Kommentar löschen def setup(self, kwargs): """ Alternative to Constructor, kwargs signature see __init__ :param kwargs: (see __init__) """ self._blackbox_func = kwargs['blackbox_func'] # self._preprocess_func = kwargs['preprocess_func'] # self._dataloader_func = kwargs['dataloader_func'] # self._callback_func = kwargs['callback_func'] #self._raw_data = kwargs['data'] # self._data = self._raw_data del kwargs['blackbox_func'] # del kwargs['preprocess_func'] # del kwargs['dataloader_func'] # del kwargs['data'] # params = kwargs # if self.dataloader_func is not None: # self._raw_data = self.dataloader_func(params=params) # assert self._raw_data is not None, "Missing data exception!" # assert self.blackbox_func is not None, "Missing blackbox fucntion exception!" # if self.preprocess_func is not None: # result = self.preprocess_func(data=self._raw_data, params=params) # if result is not None: # self._data = result # else: # self._data = self._raw_data # else: # self._data = self._raw_data @property def blackbox_func(self): """ BlackboxFunction wrapper class encapsulating the loss function or a function accepting a hyperparameter set and returning a float. :return: [object] pointer to blackbox_func """ return self._blackbox_func @property def preprocess_func(self): """ Data preprocessing is called after dataloader_func, the functions signature must be foo(data, params) and must return a data object. The input is the data object set directly or via dataloader_func, the params are passed from constructor params. :return: [object] preprocess_func """ return self._preprocess_func @property def dataloader_func(self): """ Data loading, the function must return a data object and is called first when the solver is executed. The data object returned will be the input of the blackbox function. :return: [object] dataloader_func """ return self._dataloader_func @property def callback_func(self): """ This function is called at each iteration step getting passed the trail info content, can be used for custom visualization :return: [object] callback_func """ return self._callback_func @property def raw_data(self): """ This data structure is used to store the return from dataloader_func to serve as input for preprocess_func if available. :return: [object] raw_data """ return self._raw_data @property def data(self): """ Datastructure keeping the input data. :return: [object] data """ return self._data diff --git a/hyppopy/globals.py b/hyppopy/globals.py index 0e67e87..1f966cf 100644 --- a/hyppopy/globals.py +++ b/hyppopy/globals.py @@ -1,40 +1,40 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import os import sys import logging from enum import Enum ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, ROOT) LIBNAME = "hyppopy" TESTDATA_DIR = os.path.join(ROOT, *(LIBNAME, "tests", "data")) HYPERPARAMETERPATH = "hyperparameter" SETTINGSPATH = "settings" FUNCTIONSIMULATOR_DATAPATH = os.path.join(os.path.join(ROOT, LIBNAME), "virtualparameterspace") SUPPORTED_DOMAINS = ["uniform", "normal", "loguniform", "categorical"] SUPPORTED_DTYPES = ["int", "float", "str"] DEFAULTGRIDFREQUENCY = 10 LOGFILENAME = os.path.join(ROOT, '{}_log.log'.format(LIBNAME)) DEBUGLEVEL = logging.DEBUG logging.basicConfig(filename=LOGFILENAME, filemode='w', format='%(levelname)s: %(name)s - %(message)s') class MPI_TAGS(Enum): - MPI_SEND_DATA = 55 - MPI_SEND_TRIALS = 99 + MPI_SEND_CANDIDATE = 55 + MPI_SEND_RESULTS = 99 diff --git a/hyppopy/solvers/GridsearchSolver.py b/hyppopy/solvers/GridsearchSolver.py index 5eadde7..fd478aa 100644 --- a/hyppopy/solvers/GridsearchSolver.py +++ b/hyppopy/solvers/GridsearchSolver.py @@ -1,252 +1,256 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import os import logging import warnings import numpy as np from pprint import pformat from scipy.stats import norm from itertools import product from hyppopy.globals import DEBUGLEVEL, DEFAULTGRIDFREQUENCY from hyppopy.solvers.HyppopySolver import HyppopySolver LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) def get_uniform_axis_sample(a, b, N, dtype): """ Returns a uniform sample x(n) in the range [a,b] sampled at N pojnts :param a: left value range bound :param b: right value range bound :param N: discretization of intervall [a,b] :param dtype: data type :return: [list] axis range """ assert a < b, "condition a < b violated!" assert isinstance(N, int), "condition N of type int violated!" if dtype is int: return list(np.linspace(a, b, N).astype(int)) elif dtype is float: return list(np.linspace(a, b, N)) else: raise AssertionError("dtype {} not supported for uniform sampling!".format(dtype)) def get_norm_cdf(N): """ Returns a normed gaussian cdf (range [0,1]) with N sampling points :param N: sampling points :return: [ndarray] gaussian cdf function values """ assert isinstance(N, int), "condition N of type int violated!" even = True if N % 2 != 0: N -= 1 even = False N = int(N/2) sigma = 1/3 x = np.linspace(0, 1, N) y1 = norm.cdf(x, loc=0, scale=sigma)-0.5 if not even: y1 = np.append(y1, [0.5]) y2 = 1-(norm.cdf(x, loc=0, scale=sigma)-0.5) y2 = np.flip(y2, axis=0) y = np.concatenate((y1, y2), axis=0) return y def get_gaussian_axis_sample(a, b, N, dtype): """ Returns a function value f(n) where f is a gaussian cdf in range [a, b] and N sampling points :param a: left value range bound :param b: right value range bound :param N: discretization of intervall [a,b] :param dtype: data type :return: [list] axis range """ assert a < b, "condition a < b violated!" assert isinstance(N, int), "condition N of type int violated!" data = [] for n in range(N): x = a + get_norm_cdf(N)[n]*(b-a) if dtype is int: data.append(int(x)) elif dtype is float: data.append(x) else: raise AssertionError("dtype {} not supported for uniform sampling!".format(dtype)) return data def get_logarithmic_axis_sample(a, b, N, dtype): """ Returns a function value f(n) where f is logarithmic function e^x sampling the exponent range [log(a), log(b)] linear at N sampling points. The function values returned are in the range [a, b]. :param a: left value range bound :param b: right value range bound :param N: discretization of intervall [a,b] :param dtype: data type :return: [list] axis range """ assert a < b, "condition a < b violated!" assert a > 0, "condition a > 0 violated!" assert isinstance(N, int), "condition N of type int violated!" # convert input range into exponent range lexp = np.log(a) rexp = np.log(b) exp_range = np.linspace(lexp, rexp, N) data = [] for n in range(exp_range.shape[0]): x = np.exp(exp_range[n]) if dtype is int: data.append(int(x)) elif dtype is float: data.append(x) else: raise AssertionError("dtype {} not supported for uniform sampling!".format(dtype)) return data class GridsearchSolver(HyppopySolver): """ The GridsearchSolver class implements a gridsearch optimization. The gridsearch supports categorical, uniform, normal and loguniform sampling. To use the GridsearchSolver, besides a range, one must specifiy the number of samples in the domain, e.g. 'data': [0, 1, 100] """ def __init__(self, project=None): """ The constructor accepts a HyppopyProject. :param project: [HyppopyProject] project instance, default=None """ HyppopySolver.__init__(self, project) def define_interface(self): """ This function is called when HyppopySolver.__init__ function finished. Child classes need to define their individual parameter here by calling the _add_member function for each class member variable need to be defined. Using _add_hyperparameter_signature the structure of a hyperparameter the solver expects must be defined. Both, members and hyperparameter signatures are later get checked, before executing the solver, ensuring settings passed fullfill solver needs. """ self._add_hyperparameter_signature(name="domain", dtype=str, options=["uniform", "normal", "loguniform", "categorical"]) self._add_hyperparameter_signature(name="data", dtype=list) self._add_hyperparameter_signature(name="frequency", dtype=int) self._add_hyperparameter_signature(name="type", dtype=type) def loss_function_call(self, params): """ This function is called within the function loss_function and encapsulates the actual blackbox function call in each iteration. The function loss_function takes care of the iteration driving and reporting, but each solver lib might need some special treatment between the parameter set selection and the calling of the actual blackbox function, e.g. parameter converting. :param params: [dict] hyperparameter space sample e.g. {'p1': 0.123, 'p2': 3.87, ...} :return: [float] loss """ loss = self.blackbox(**params) if loss is None: return np.nan return loss - @staticmethod - def get_candidate_list(searchspace): + #RALF: Ich würde allgemein candidaten listen zu dict machen, damit jeder candidate auch + # eine ID (key) hat, die der Solver vergeben kann. Hier würde ich einfach die stringifizierten params + # zum key machen oder du führst die CandidateDescritorClasse (siehe HyppopySolver.py) als Convinience. + # TODO: Kommentar wieder entfernen + def get_candidate_list(self, searchspace): """ This function converts the searchspace to a candidate_list that can then be used to distribute via MPI. :param searchspace: converted hyperparameter space """ candidates_list = list() candidates = [x for x in product(*searchspace[1])] # [print(c) for c in candidates] for c in candidates: params = {} for name, value in zip(searchspace[0], c): params[name] = value candidates_list.append(params) return candidates_list def execute_solver(self, searchspace): """ This function is called immediately after convert_searchspace and get the output of the latter as input. It's purpose is to call the solver libs main optimization function. :param searchspace: converted hyperparameter space """ - for x in product(*searchspace[1]): - params = {} - for name, value in zip(searchspace[0], x): - params[name] = value - try: - self.loss_function(**params) - except Exception as e: - msg = "internal error in randomsearch execute_solver occured. {}".format(e) - LOG.error(msg) - raise BrokenPipeError(msg) + candidates = self.get_candidate_list() + + #RALF: Hier wird get_candidate_list gebraucht um einen loss_function_batch aufzurufen + # entsprechend muss auch HypopySolver erweitert werden, dass sie die loss_function_batch unterstützen. + # TODO: Kommentar wieder entfernen + try: + self.loss_function_batch(candidates) + except Exception as e: + msg = "internal error in randomsearch execute_solver occured. {}".format(e) + LOG.error(msg) + raise BrokenPipeError(msg) self.best = self._trials.argmin def convert_searchspace(self, hyperparameter): """ The function converts the standard parameter input into a range list depending on the domain. These rangelists are later used with itertools product to create a paramater space sample of each combination. :param hyperparameter: [dict] hyperparameter space :return: [list] name and range for each parameter space axis """ LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter))) searchspace = [[], []] for name, param in hyperparameter.items(): if param["domain"] != "categorical" and "frequency" not in param.keys(): param["frequency"] = DEFAULTGRIDFREQUENCY warnings.warn("No frequency field found, used default gridsearch frequency {}".format(DEFAULTGRIDFREQUENCY)) if param["domain"] == "categorical": searchspace[0].append(name) searchspace[1].append(param["data"]) elif param["domain"] == "uniform": searchspace[0].append(name) searchspace[1].append(get_uniform_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"])) elif param["domain"] == "normal": searchspace[0].append(name) searchspace[1].append(get_gaussian_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"])) elif param["domain"] == "loguniform": searchspace[0].append(name) searchspace[1].append(get_logarithmic_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"])) return searchspace diff --git a/hyppopy/solvers/HyppopySolver.py b/hyppopy/solvers/HyppopySolver.py index cc6b09c..1c9d6c7 100644 --- a/hyppopy/solvers/HyppopySolver.py +++ b/hyppopy/solvers/HyppopySolver.py @@ -1,514 +1,575 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE __all__ = ['HyppopySolver'] import abc import copy import types import datetime import numpy as np import pandas as pd from hyperopt import Trials from hyppopy.globals import * from hyppopy.VisdomViewer import VisdomViewer from hyppopy.HyppopyProject import HyppopyProject from hyppopy.BlackboxFunction import BlackboxFunction from hyppopy.MPIBlackboxFunction import MPIBlackboxFunction from hyppopy.FunctionSimulator import FunctionSimulator from hyppopy.globals import DEBUGLEVEL LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) +class CandidateDescriptor(object): + '''Descriptor that defines an candidate the solver wants to be checked. + It use used to lable/identify the candidates and there results in the case of batch processing.''' + + def __init__(self, **definingValues): + '''@param definingValues Class assumes that all variables passed to the copmuter are parametes of the candidate + the instance should represent.''' + + import uuid + + self._definingValues = definingValues + + self._definingStr = str() + + for item in sorted(definingValues.items()): + self._definingStr = self._definingStr + "'" + str(item[0]) + "':'" + str(item[1]) + "'," + + self.ID = str(uuid.uuid4()) + + def __missing__(self, key): + return None + + def __len__(self): + return len(self._definingValues) + + def __contains__(self, key): + return key in self._definingValues + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self._definingValues == other._definingValues + else: + return False + + def __hash__(self): + return hash(self._definingStr) + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return 'EvalInstanceDescriptor(%s)' % (self._definingValues) + + def __str__(self): + return '(%s)' % (self._definingValues) + + def keys(self): + return self._definingValues.keys() + + def __getitem__(self, key): + if key in self._definingValues: + return self._definingValues[key] + raise KeyError('Unkown defining value key was requested. Key: {}; self: {}'.format(key, self)) + + class HyppopySolver(object): """ The HyppopySolver class is the base class for all solver addons. It defines virtual functions a child class has to implement to deal with the front-end communication, orchestrating the optimization process and ensuring a proper process information storing. The key idea is that the HyppopySolver class defines an interface to configure and run an object instance of itself independently from the concrete solver lib used to optimize in the background. To achieve this goal an addon developer needs to implement the abstract methods 'convert_searchspace', 'execute_solver' and 'loss_function_call'. These methods abstract the peculiarities of the solver libs to offer, on the user side, a simple and consistent parameter space configuration and optimization procedure. The method 'convert_searchspace' transforms the hyppopy parameter space description into the solver lib specific description. The method loss_function_call is used to handle solver lib specifics of calling the actual blackbox function and execute_solver is executed when the run method is invoked und takes care of calling the solver lib solving routine. The class HyppopySolver defines an interface to be implemented when writing a custom solver. Each solver derivative needs to implement the abstract methods: - convert_searchspace - execute_solver - loss_function_call - define_interface The dev-user interface consists of the methods: - _add_member - _add_hyperparameter_signature - _check_project The end-user interface consists of the methods: - run - get_results - print_best - print_timestats - start_viewer """ def __init__(self, project=None): """ The constructor accepts a HyppopyProject. :param project: [HyppopyProject] project instance, default=None """ self._idx = 0 # current iteration counter self._best = None # best parameter set self._trials = Trials() # trials object, hyppopy uses the Trials object from hyperopt self._blackbox = None # blackbox function, eiter a function or a BlackboxFunction instance self._total_duration = None # keeps track of the solvers running time self._solver_overhead = None # stores the time overhead of the solver, means total time minus time in blackbox self._time_per_iteration = None # mean time per iterration self._accumulated_blackbox_time = None # summed time the solver was in the blackbox function self._visdom_viewer = None # visdom viewer instance self._child_members = {} # this dict keeps track of the settings the child solver defines self._hopt_signatures = {} # this dict keeps track of the hyperparameter signatures the child solver defines self.define_interface() # the child define interface function is called which defines settings and hyperparameter signatures if project is not None: self.project = project @abc.abstractmethod def convert_searchspace(self, hyperparameter): """ This function gets the unified hyppopy-like parameterspace description as input and, if necessary, should convert it into a solver lib specific format. The function is invoked when run is called and what it returns is passed as searchspace argument to the function execute_solver. :param hyperparameter: [dict] nested parameter description dict e.g. {'name': {'domain':'uniform', 'data':[0,1], 'type':'float'}, ...} :return: [object] converted hyperparameter space """ raise NotImplementedError('users must define convert_searchspace to use this class') @abc.abstractmethod def execute_solver(self, searchspace): """ This function is called immediately after convert_searchspace and get the output of the latter as input. It's purpose is to call the solver libs main optimization function. :param searchspace: converted hyperparameter space """ raise NotImplementedError('users must define execute_solver to use this class') @abc.abstractmethod def loss_function_call(self, params): """ This function is called within the function loss_function and encapsulates the actual blackbox function call in each iteration. The function loss_function takes care of the iteration driving and reporting, but each solver lib might need some special treatment between the parameter set selection and the calling of the actual blackbox function, e.g. parameter converting. :param params: [dict] hyperparameter space sample e.g. {'p1': 0.123, 'p2': 3.87, ...} :return: [float] loss """ raise NotImplementedError('users must define loss_function_call to use this class') @abc.abstractmethod def define_interface(self): """ This function is called when HyppopySolver.__init__ function finished. Child classes need to define their individual parameter here by calling the _add_member function for each class member variable need to be defined. Using _add_hyperparameter_signature the structure of a hyperparameter the solver expects must be defined. Both, members and hyperparameter signatures are later get checked, before executing the solver, ensuring settings passed fullfill solver needs. """ raise NotImplementedError('users must define define_interface to use this class') def _add_member(self, name, dtype, value=None, default=None): """ When designing your child solver class you need to implement the define_interface abstract method where you can call _add_member to define custom solver options that are automatically converted to class attributes. :param name: [str] option name :param dtype: [type] option data type :param value: [object] option value :param default: [object] option default value """ assert isinstance(name, str), "precondition violation, name needs to be of type str, got {}".format(type(name)) if value is not None: assert isinstance(value, dtype), "precondition violation, value does not match dtype condition!" if default is not None: assert isinstance(default, dtype), "precondition violation, default does not match dtype condition!" setattr(self, name, value) self._child_members[name] = {"type": dtype, "value": value, "default": default} def _add_hyperparameter_signature(self, name, dtype, options=None): """ When designing your child solver class you need to implement the define_interface abstract method where you can call _add_hyperparameter_signature to define a hyperparamter signature which is automatically checked for consistency while solver execution. :param name: [str] hyperparameter name :param dtype: [type] hyperparameter data type :param options: [list] list of possible values the hp can be set, if None no option check is done """ assert isinstance(name, str), "precondition violation, name needs to be of type str, got {}".format(type(name)) self._hopt_signatures[name] = {"type": dtype, "options": options} def _check_project(self): """ The function checks the members and hyperparameter signatures read from the project instance to be consistent with the members and signatures defined in the child class via define_interface. """ assert isinstance(self.project, HyppopyProject), "Invalid project instance, either not set or setting failed!" # check hyperparameter signatures for name, param in self.project.hyperparameter.items(): for sig, settings in self._hopt_signatures.items(): if sig not in param.keys(): msg = "Missing hyperparameter signature {}!".format(sig) LOG.error(msg) raise LookupError(msg) else: if not isinstance(param[sig], settings["type"]): msg = "Hyperparameter signature type mismatch, expected type {} got {}!".format(settings["type"], param[sig]) LOG.error(msg) raise TypeError(msg) if settings["options"] is not None: if param[sig] not in settings["options"]: msg = "Wrong signature value, {} not found in signature options!".format(param[sig]) LOG.error(msg) raise LookupError(msg) # check child members for name in self._child_members.keys(): if name not in self.project.__dict__.keys(): msg = "missing settings field {}!".format(name) LOG.error(msg) raise LookupError(msg) self.__dict__[name] = self.project.settings[name] def __compute_time_statistics(self): """ Evaluates all timestatistic values available """ dts = [] for trial in self._trials.trials: if 'book_time' in trial.keys() and 'refresh_time' in trial.keys(): dt = trial['refresh_time'] - trial['book_time'] dts.append(dt.total_seconds()) self._time_per_iteration = np.mean(dts) * 1e3 self._accumulated_blackbox_time = np.sum(dts) * 1e3 tmp = self.total_duration - self._accumulated_blackbox_time self._solver_overhead = int(np.round(100.0 / (self.total_duration + 1e-12) * tmp)) def loss_function(self, **params): """ This function is called each iteration with a selected parameter set. The parameter set selection is driven by the solver lib itself. The purpose of this function is to take care of the iteration reporting and the calling of the callback_func is available. As a developer you might want to overwrite this function completely (e.g. HyperoptSolver) but then you need to take care for iteration reporting for yourself. The alternative is to only implement loss_function_call (e.g. OptunitySolver). :param params: [dict] hyperparameter space sample e.g. {'p1': 0.123, 'p2': 3.87, ...} :return: [float] loss """ self._idx += 1 vals = {} idx = {} for key, value in params.items(): vals[key] = [value] idx[key] = [self._idx] trial = {'tid': self._idx, 'result': {'loss': None, 'status': 'ok'}, 'misc': { 'tid': self._idx, 'idxs': idx, 'vals': vals }, 'book_time': datetime.datetime.now(), 'refresh_time': None } try: loss = self.loss_function_call(params) trial['result']['loss'] = loss trial['result']['status'] = 'ok' if loss is np.nan: trial['result']['status'] = 'failed' except Exception as e: LOG.error("computing loss failed due to:\n {}".format(e)) loss = np.nan trial['result']['loss'] = np.nan trial['result']['status'] = 'failed' trial['refresh_time'] = datetime.datetime.now() self._trials.trials.append(trial) cbd = copy.deepcopy(params) cbd['iterations'] = self._idx cbd['loss'] = loss cbd['status'] = trial['result']['status'] cbd['book_time'] = trial['book_time'] cbd['refresh_time'] = trial['refresh_time'] if (isinstance(self.blackbox, BlackboxFunction) or isinstance(self.blackbox, MPIBlackboxFunction)) and self.blackbox.callback_func is not None: self.blackbox.callback_func(**cbd) - if self._visdom_viewer is not None: - self._visdom_viewer.update(cbd) return loss + # RALF: Das wird gebraucht um ganze batches abzugeben und hier werden dann auch die trials eingepflegt. + def loss_function_batch(self, candidates): + # RALF analog zu loss_function aber für alle candidates. hier wird erstmal getestet ob die eigentliche function batches + #unterstützt. Wenn ja: go for it. Wenn nein: brav in einer schleife abspulen. + # wenn sauber implementiert wird loss_function() obsolet bzw. macht nichts anderes als loss_function_batch() mit + # nur einem Kandidaten aufrufen. + return candidate_losses + def run(self, print_stats=True): """ This function starts the optimization process. :param print_stats: [bool] en- or disable console output """ self._idx = 0 self.trials = Trials() start_time = datetime.datetime.now() try: search_space = self.convert_searchspace(self.project.hyperparameter) except Exception as e: msg = "Failed to convert searchspace, error: {}".format(e) LOG.error(msg) raise AssertionError(msg) try: self.execute_solver(search_space) except Exception as e: msg = "Failed to execute solver, error: {}".format(e) LOG.error(msg) raise AssertionError(msg) end_time = datetime.datetime.now() dt = end_time - start_time days = divmod(dt.total_seconds(), 86400) hours = divmod(days[1], 3600) minutes = divmod(hours[1], 60) seconds = divmod(minutes[1], 1) milliseconds = divmod(seconds[1], 0.001) self._total_duration = [int(days[0]), int(hours[0]), int(minutes[0]), int(seconds[0]), int(milliseconds[0])] if print_stats: self.print_best() self.print_timestats() def get_results(self): """ This function returns a complete optimization history as pandas DataFrame and a dict with the optimal parameter set. :return: [DataFrame], [dict] history and optimal parameter set """ assert isinstance(self.trials, Trials), "precondition violation, wrong trials type! Maybe solver was not yet executed?" results = {'duration': [], 'losses': [], 'status': []} pset = self.trials.trials[0]['misc']['vals'] for p in pset.keys(): results[p] = [] for n, trial in enumerate(self.trials.trials): t1 = trial['book_time'] t2 = trial['refresh_time'] results['duration'].append((t2 - t1).microseconds / 1000.0) results['losses'].append(trial['result']['loss']) results['status'].append(trial['result']['status'] == 'ok') losses = np.array(results['losses']) results['losses'] = list(losses) pset = trial['misc']['vals'] for p in pset.items(): results[p[0]].append(p[1][0]) return pd.DataFrame.from_dict(results), self.best def print_best(self): """ Optimization result console output printing. """ print("\n") print("#" * 40) print("### Best Parameter Choice ###") print("#" * 40) for name, value in self.best.items(): print(" - {}\t:\t{}".format(name, value)) print("\n - number of iterations\t:\t{}".format(self.trials.trials[-1]['tid']+1)) print(" - total time\t:\t{}d:{}h:{}m:{}s:{}ms".format(self._total_duration[0], self._total_duration[1], self._total_duration[2], self._total_duration[3], self._total_duration[4])) print("#" * 40) def print_timestats(self): """ Time statistic console output printing. """ print("\n") print("#" * 40) print("### Timing Statistics ###") print("#" * 40) print(" - per iteration: {}ms".format(int(self.time_per_iteration*1e4)/10000)) print(" - total time: {}d:{}h:{}m:{}s:{}ms".format(self._total_duration[0], self._total_duration[1], self._total_duration[2], self._total_duration[3], self._total_duration[4])) print("#" * 40) print(" - solver overhead: {}%".format(self.solver_overhead)) def start_viewer(self, port=8097, server="http://localhost"): """ Starts the visdom viewer. :param port: [int] port number, default: 8097 :param server: [str] server name, default: http://localhost """ try: self._visdom_viewer = VisdomViewer(self._project, port, server) except Exception as e: import warnings warnings.warn("Failed starting VisdomViewer. Is the server running? If not start it via $visdom") LOG.error("Failed starting VisdomViewer: {}".format(e)) self._visdom_viewer = None @property def project(self): """ HyppopyProject instance :return: [HyppopyProject] project instance """ return self._project @project.setter def project(self, value): """ Set HyppopyProject instance :param value: [HyppopyProject] project instance """ if isinstance(value, dict): self._project = HyppopyProject(value) elif isinstance(value, HyppopyProject): self._project = value else: msg = "Input error, project_manager of type: {} not allowed!".format(type(value)) LOG.error(msg) raise TypeError(msg) self._check_project() @property def blackbox(self): """ Get the BlackboxFunction object. :return: [object] BlackboxFunction instance or function """ return self._blackbox @blackbox.setter def blackbox(self, value): """ Set the BlackboxFunction wrapper class encapsulating the loss function or a function accepting a hyperparameter set and returning a float. :return: [object] pointer to blackbox_func """ if isinstance(value, types.FunctionType) or isinstance(value, BlackboxFunction) or isinstance(value, FunctionSimulator) or isinstance(value, MPIBlackboxFunction): self._blackbox = value else: self._blackbox = None msg = "Input error, blackbox of type: {} not allowed!".format(type(value)) LOG.error(msg) raise TypeError(msg) @property def best(self): """ Returns best parameter set. :return: [dict] best parameter set """ return self._best @best.setter def best(self, value): """ Set the best parameter set. :param value: [dict] best parameter set """ if not isinstance(value, dict): msg = "Input error, best of type: {} not allowed!".format(type(value)) LOG.error(msg) raise TypeError(msg) self._best = value @property def trials(self): """ Get the Trials instance. :return: [object] Trials instance """ return self._trials @trials.setter def trials(self, value): """ Set the Trials object. :param value: [object] Trials instance """ self._trials = value @property def total_duration(self): """ Get total computation duration. :return: [float] total computation time """ return (self._total_duration[0]*86400 + self._total_duration[1] * 3600 + self._total_duration[2] * 60 + self._total_duration[3]) * 1000 + self._total_duration[4] @property def solver_overhead(self): """ Get the solver overhead, this is the total time minus the duration of the blackbox function calls. :return: [float] solver overhead duration """ if self._solver_overhead is None: self.__compute_time_statistics() return self._solver_overhead @property def time_per_iteration(self): """ Get the mean duration per iteration. :return: [float] time per iteration """ if self._time_per_iteration is None: self.__compute_time_statistics() return self._time_per_iteration @property def accumulated_blackbox_time(self): """ Get the summed blackbox function computation time. :return: [float] blackbox function computation time """ if self._accumulated_blackbox_time is None: self.__compute_time_statistics() return self._accumulated_blackbox_time diff --git a/hyppopy/solvers/MPISolverWrapper.py b/hyppopy/solvers/MPISolverWrapper.py index 8a92423..9f62c85 100644 --- a/hyppopy/solvers/MPISolverWrapper.py +++ b/hyppopy/solvers/MPISolverWrapper.py @@ -1,244 +1,190 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import datetime import os import logging from mpi4py import MPI from hyppopy.globals import DEBUGLEVEL, MPI_TAGS LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) class MPISolverWrapper: """ TODO """ def __init__(self, solver=None): """ The constructor accepts a HyppopySolver. :param solver: [HyppopySolver] solver instance, default=None """ self._solver = solver @property def blackbox(self): """ Get the BlackboxFunction object. :return: [object] BlackboxFunction instance or function of member solver """ return self._solver.blackbox @blackbox.setter def blackbox(self, value): """ Set the BlackboxFunction wrapper class encapsulating the loss function or a function accepting a hyperparameter set and returning a float. :return: """ self._solver.blackbox = value def get_results(self): """ Just call get_results of the member solver and return the result. :return: return value of self._solver.get_results() """ comm = MPI.COMM_WORLD size = comm.size # This loop collects the results from the worker processes. # If we start this without MPI, this loop is skipped and the results are already in the member variable. # Genius? Maybe... or maybe it just turned out this way. :-P for i in range(size - 1): rec_trials = comm.recv(source=i+1, tag=MPI_TAGS.MPI_SEND_TRIALS.value) for trial in rec_trials.trials: self._solver._trials.trials.append(trial) self._solver.best = self._solver._trials.argmin print('Number of processes: {}'.format(size)) print('Best result: {}'.format(self._solver.best)) return self._solver.get_results() def define_interface(self): """ This function is called when HyppopySolver.__init__ function finished. Child classes need to define their individual parameter here by calling the _add_member function for each class member variable need to be defined. Using _add_hyperparameter_signature the structure of a hyperparameter the solver expects must be defined. Both, members and hyperparameter signatures are later get checked, before executing the solver, ensuring settings passed fullfill solver needs. """ self._solver.define_interface() def loss_function_call(self, candidates): """ This function is called within the function loss_function and encapsulates the actual blackbox function call in each iteration. The function loss_function takes care of the iteration driving and reporting, but each solver lib might need some special treatment between the parameter set selection and the calling of the actual blackbox function, e.g. parameter converting. :param candidates: TODO params [dict] hyperparameter space sample e.g. {'p1': 0.123, 'p2': 3.87, ...} TODO remove :return: [float] loss """ try: self.call_batch(candidates) except: for params in candidates: self.blackbox(**params) # TODO: Why do we need the loss as a return here? # loss = self.blackbox(**params) # if loss is None: # loss = np.nan return - @staticmethod - def call_batch(candidates): - size = MPI.COMM_WORLD.Get_size() - for i, candidate in enumerate(candidates): - dest = (i % (size-1)) +1 - MPI.COMM_WORLD.send(candidate, dest=dest, tag=MPI_TAGS.MPI_SEND_DATA.value) - - def call_worker(self): + def run_worker_mode(self): """ - This function calls a worker for a specific MPI rank. + This function is called if the wrapper should run as a worker for a specific MPI rank. It receives messages for the following tags: - tag==MPI_SEND_DATA: parameters for the loss calculation. It param==None, the worker finishes. + tag==MPI_SEND_CANDIDATE: parameters for the loss calculation. It param==None, the worker finishes. It sends messages for the following tags: - tag==99: trials of this mpi process. + tag==MPI_SEND_RESULT: result of an evaluated candidate. - :return: + :return: the evaluated loss of the candidate """ comm = MPI.COMM_WORLD rank = comm.Get_rank() print("Starting worker {}. Waiting for param...".format(rank)) while True: - param = comm.recv(source=0, tag=MPI_TAGS.MPI_SEND_DATA.value) # Wait here till params are received - - # param == None indicates that the worker should finish. - if param is None: - MPI.COMM_WORLD.send(self._solver._trials, dest=0, tag=MPI_TAGS.MPI_SEND_TRIALS.value) - # trials = comm.gather(self._solver._trials, root=0) # TODO can we solve this with gather as well? + candidate = comm.recv(source=0, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) # Wait here till params are received - self._solver.best = self._solver._trials.argmin - # TODO: Printing does not work for now. - # self.print_best() - # self.print_timestats() + if candidate == None: + print(print("process {} received finish signal.".format(rank))) return - self.loss_function(param) # No **params here... I overwrote this method. - - # TODO: Do we need this here? - # loss = self.loss_function(param) # No **params here... I overwrote this method. - # print('{}: param = {}, loss={}'.format(rank, param, loss)) + id,params = candidate + loss = self._solver.blackbox.blackbox_func(None, **params) + # RALF: Ergebnisse müssen zurück geschickt werden und nicht local in einem trial gespeichert werden + comm.send((id,loss), dest=0, tag=MPI_TAGS.MPI_SEND_RESULTS.value) @staticmethod def signal_worker_finished(): """ This function sends data==None to all workers from the master. This is the signal that tells the workers to finish. :return: """ print('signal_worker_finished') size = MPI.COMM_WORLD.Get_size() for i in range(size - 1): - MPI.COMM_WORLD.send(None, dest=i + 1, tag=MPI_TAGS.MPI_SEND_DATA.value) + MPI.COMM_WORLD.send(None, dest=i + 1, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) - def run(self): + def run(self, *args, **kwargs): """ This function starts the optimization process. - - TODO: - This is a copy paste of the HyppopySolver method. Maybe not the most elegant solution, but works for now. - :param print_stats: [bool] en- or disable console output """ - self._idx = 0 - - start_time = datetime.datetime.now() - try: - search_space = self.convert_searchspace(self._solver.project.hyperparameter) - except Exception as e: - msg = "Failed to convert searchspace, error: {}".format(e) - LOG.error(msg) - raise AssertionError(msg) - try: - self.execute_solver(search_space) - except Exception as e: - msg = "Failed to execute solver, error: {}".format(e) - LOG.error(msg) - raise AssertionError(msg) - end_time = datetime.datetime.now() - dt = end_time - start_time - days = divmod(dt.total_seconds(), 86400) - hours = divmod(days[1], 3600) - minutes = divmod(hours[1], 60) - seconds = divmod(minutes[1], 1) - milliseconds = divmod(seconds[1], 0.001) - self._total_duration = [int(days[0]), int(hours[0]), int(minutes[0]), int(seconds[0]), int(milliseconds[0])] - - # TODO: Do print later... Workers might not be finished - # if print_stats: - # self.print_best() - # self.print_timestats() - - def execute_solver(self, searchspace): - """ - This function is called immediately after convert_searchspace and get the output of the latter as input. It's - purpose is to call the solver libs main optimization function. - - :param searchspace: converted hyperparameter space - """ - - candidates_list = self._solver.get_candidate_list(searchspace) - - try: - self.call_batch(candidates_list) - except: - for params in candidates_list: - self.loss_function(params) # No **params here... I overwrote this method. - - return - - # results are gathered in the get_results() function + # RALF das ist der job von run hier: das eigentliche run MPI-aware zu machen. + # TODO: Kommentar wieder entfernen + mpi_rank = MPI.COMM_WORLD.Get_rank() + if mpi_rank == 0: + # This is the master process. From here we run the solver and start all the other processes. + self._solver.run(*args, **kwargs) + self.signal_worker_finished() # Tell the workers to finish. + else: + # this script execution should be in worker mode as it is an mpi worker. + self.run_worker_mode() + + # RALF execute_solver hat hier nichts zu suchen. Das ist wirklich job des gewrappten solvers + # TODO: Kommentar wieder entfernen def convert_searchspace(self, hyperparameter): """ Just a call of the convert_searchspace function of the member solver. The function converts the standard parameter input into a range list depending on the domain. These rangelists are later used with itertools product to create a paramater space sample of each combination. :param hyperparameter: [dict] hyperparameter space :return: [list] name and range for each parameter space axis """ searchspace = self._solver.convert_searchspace(hyperparameter) return searchspace def loss_function(self, params): return self._solver.loss_function(**params) def print_best(self): self._solver.print_best() def print_timestats(self): print('Implement me!') # TODO # self._solver.print_timestats() diff --git a/mpiplayground.py b/mpiplayground.py index c9d30c3..48c4735 100644 --- a/mpiplayground.py +++ b/mpiplayground.py @@ -1,89 +1,79 @@ # DKFZ # # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE # A hyppopy minimal example optimizing a simple demo function f(x,y) = x**2+y**2 # import the HyppopyProject class keeping track of inputs from hyppopy.HyppopyProject import HyppopyProject from hyppopy.solvers.GridsearchSolver import GridsearchSolver from hyppopy.solvers.MPISolverWrapper import MPISolverWrapper from hyppopy.MPIBlackboxFunction import MPIBlackboxFunction from mpi4py import MPI # To configure the Hyppopy solver we use a simple nested dictionary with two obligatory main sections, # hyperparameter and settings. The hyperparameter section defines your searchspace. Each hyperparameter # is again a dictionary with: # # - a domain ['categorical', 'uniform', 'normal', 'loguniform'] # - the domain data [left bound, right bound] and # - a type of your domain ['str', 'int', 'float'] # # The settings section has two subcategories, solver and custom. The first contains settings for the solver, # here 'max_iterations' - is the maximum number of iteration. # # The custom section allows defining custom parameter. An entry here is transformed to a member variable of the # HyppopyProject class. These can be useful when implementing new solver classes or for control your hyppopy script. # Here we use it as a solver switch to control the usage of our solver via the config. This means with the script # below your can try out every solver by changing use_solver to 'optunity', 'randomsearch', 'gridsearch',... # It can be used like so: project.custom_use_plugin (see below) If using the gridsearch solver, max_iterations is # ignored, instead each hyperparameter must specifiy a number of samples additionally to the range like so: # 'data': [0, 1, 100] which means sampling the space from 0 to 1 in 100 intervals. config = { "hyperparameter": { "x": { "domain": "normal", "data": [-10.0, 10.0], "type": float, "frequency": 100 }, "y": { "domain": "uniform", "data": [-10.0, 10.0], "type": float, "frequency": 100 } }, "max_iterations": 500 } project = HyppopyProject(config=config) # The user defined loss function def my_loss_function(data, params): x = params['x'] y = params['y'] return x**2+y**3 solver = MPISolverWrapper(GridsearchSolver(project)) blackbox = MPIBlackboxFunction(blackbox_func=my_loss_function) solver.blackbox = blackbox -# this part should be covered by a MPI solver wrapper. That is just a wrapper around normal solvers, mimicking the HypopySolver interact and just -# treating run() and get_results() in a special way. -mpi_rank = MPI.COMM_WORLD.Get_rank() -if mpi_rank == 0: - # This is the master process. From here we run the solver and start all the other processes. - solver.run() - solver.signal_worker_finished() # Tell the workes to finish. - df, best = solver.get_results() # gather the results from the different processes. - - print("\n") - print("*" * 100) - print("Best Parameter Set:\n{}".format(best)) - print("*" * 100) -else: - # Call the worker and make it wait till it receives input. - solver.call_worker() +solver.run() +df, best = solver.get_results() # gather the results from the different processes. +print("\n") +print("*" * 100) +print("Best Parameter Set:\n{}".format(best)) +print("*" * 100)