diff --git a/hyppopy/MPIBlackboxFunction.py b/hyppopy/MPIBlackboxFunction.py index 80c749d..15bb6a7 100644 --- a/hyppopy/MPIBlackboxFunction.py +++ b/hyppopy/MPIBlackboxFunction.py @@ -1,83 +1,83 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE from hyppopy.BlackboxFunction import BlackboxFunction __all__ = ['MPIBlackboxFunction'] import os import logging import functools from hyppopy.globals import DEBUGLEVEL, MPI_TAGS from mpi4py import MPI LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) def default_kwargs(**defaultKwargs): """ Decorator defining default args in **kwargs arguments """ def actual_decorator(fn): @functools.wraps(fn) def g(*args, **kwargs): defaultKwargs.update(kwargs) return fn(*args, **defaultKwargs) return g return actual_decorator class MPIBlackboxFunction(BlackboxFunction): """ This class is a BlackboxFunction wrapper class encapsulating the loss function. # TODO: complete class documentation The constructor accepts several function pointers or a data object which are all None by default (see below). Additionally one can define an arbitrary number of arg pairs. These are passed as input to each function pointer as arguments. :param dataloader_func: data loading function pointer, default=None :param preprocess_func: data preprocessing function pointer, default=None :param callback_func: callback function pointer, default=None :param data: data object, default=None :param mpi_comm: [MPI communicator] MPI communicator instance. If None, we create a new MPI.COMM_WORLD, default=None :param kwargs: additional arg=value pairs """ @default_kwargs(blackbox_func=None, dataloader_func=None, preprocess_func=None, callback_func=None, data=None, mpi_comm=None) def __init__(self, **kwargs): mpi_comm = kwargs['mpi_comm'] del kwargs['mpi_comm'] self._mpi_comm = None if mpi_comm is None: - print('MPIBlackboxFunction: No mpi_comm given: Using MPI.COMM_WORLD') + LOG.info('MPIBlackboxFunction: No mpi_comm given: Using MPI.COMM_WORLD') self._mpi_comm = MPI.COMM_WORLD else: self._mpi_comm = mpi_comm super().__init__(**kwargs) def call_batch(self, candidates): results = dict() size = self._mpi_comm.Get_size() for i, candidate in enumerate(candidates): dest = (i % (size-1)) + 1 self._mpi_comm.send(candidate, dest=dest, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) while True: for i in range(size - 1): if len(candidates) == len(results): - print('All results received!') + LOG.info('All results received!') return results cand_id, result_dict = self._mpi_comm.recv(source=i + 1, tag=MPI_TAGS.MPI_SEND_RESULTS.value) results[cand_id] = result_dict \ No newline at end of file diff --git a/hyppopy/solvers/DynamicPSOSolver.py b/hyppopy/solvers/DynamicPSOSolver.py index ecf3d69..8bf4d97 100644 --- a/hyppopy/solvers/DynamicPSOSolver.py +++ b/hyppopy/solvers/DynamicPSOSolver.py @@ -1,250 +1,232 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import os import sys import numpy import datetime import logging import optunity from pprint import pformat from hyppopy.CandidateDescriptor import CandidateDescriptor, CandicateDescriptorWrapper from hyppopy.globals import DEBUGLEVEL from hyperopt import Trials LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) from hyppopy.solvers.HyppopySolver import HyppopySolver from .OptunitySolver import OptunitySolver class DynamicPSOSolver(OptunitySolver): """Dynamic PSO HyppoPy Solver Class""" def define_interface(self): """ Function called after instantiation to define individual parameters for child solver class by calling _add_member function for each class member variable to be defined. When designing your own solver class, you need to implement this method to define custom solver options that are automatically converted to class attributes. """ super().define_interface() self._add_method("update_param") # Pass function used to adapt parameters during dynamic PSO as specified by user. self._add_method("combine_obj") # Pass function indicating how to combine obj. func. arguments and parameters to obtain scalar value. self._add_member("num_args_obj", int) # Pass number of arguments/terms contributing to obj. func. self._add_member("num_params_obj", int) # Pass number of parameters of obj. func. self._add_member("phi1", float, default=1.5) # Pass first PSO acceleration coefficient. self._add_member("phi2", float, default=2.0) # Pass second PSO acceleration coefficient. self._add_hyperparameter_signature(name="domain", dtype=str, options=["uniform", "loguniform", "categorical"]) def _add_method(self, name, func=None, default=None): """ When designing your child solver class you need to implement the define_interface abstract method where you can call _add_member_function to define custom solver options, here of Python callable type, which are automatically converted to class methods. :param func: [callable] function object to be passed to solver """ assert isinstance(name, str), "Precondition violation, name needs to be of type str, got {}.".format(type(name)) if func is not None: assert callable(func), "Precondition violation, passed object is not callable!" if default is not None: assert callable(default), "Precondition violation, passed object is not callable!" setattr(self, name, func) self._child_members[name] = {"type": "callable", "function": func, "default": default} def convert_searchspace(self, hyperparameter): """ Get unified hyppopy-like parameter space description as input and, if necessary, convert it into a solver-lib specific format. The function is invoked when run is called and what it returns is passed as searchspace argument to the function execute_solver. :param hyperparameter: [dict] nested parameter description dict e.g. {'name': {'domain':'uniform', 'data':[0,1], 'type':float}, ...} :return: [object] converted hyperparameter space :return: [dict] dict keeping domains for different hyperparameters. """ LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter))) # Split input in categorical and non-categorical data. cat, uni = self.split_categorical(hyperparameter) # Build up dict keeping all non-categorical data. uniforms = {} domains = {} for key, value in uni.items(): for key2, value2 in value.items(): if key2 == "data": if len(value2) == 3: uniforms[key] = value2[0:2] elif len(value2) == 2: uniforms[key] = value2 else: raise AssertionError("precondition violation, optunity searchspace needs list with left and right range bounds!") if key2 == "domain": domains[key] = value2 if len(cat) == 0: return uniforms, domains # Build nested categorical structure. inner_level = uniforms for key, value in cat.items(): tmp = {} optunity_space = {} for key2, value2 in value.items(): if key2 == "data": for elem in value2: tmp[elem] = inner_level if key2 == "domain": domains[key] = value2 optunity_space[key] = tmp inner_level = optunity_space return optunity_space, domains def hyppopy_optunity_solver_pmap(self, f, seq): # Check if seq is empty. I so, return an empty result list. if len(seq) == 0: return [] candidates = [] for elem in seq: can = CandidateDescriptor(**elem) candidates.append(can) cand_list = CandicateDescriptorWrapper(keys=seq[0].keys()) cand_list.set(candidates) f_result = f(cand_list) # If one candidate does not match the constraints, f() returns a single default value. # This is a problem as all the other candidates are not calculated either. # The following is a workaround. We split the candidate_list into 2 lists and call the map function recursively until all valid parameters are processed. if not isinstance(f_result, list): # First half seq_A = seq[:len(seq) // 2] temp_result_a = self.hyppopy_optunity_solver_pmap(f, seq_A) seq_B = seq[len(seq) // 2:] temp_result_b = self.hyppopy_optunity_solver_pmap(f, seq_B) # f_result = [42] f_result = temp_result_a + temp_result_b return f_result def execute_solver(self, searchspace, domains): """ This function is called immediately after convert_searchspace and uses the output of the latter as input. Its purpose is to call the solver lib's main optimization function. :param searchspace: converted hyperparameter space """ LOG.debug("execute_solver using solution space:\n\n\t{}\n".format(pformat(searchspace))) tree = optunity.search_spaces.SearchTree(searchspace) # Set up tree structure to model search space. box = tree.to_box() # Create set of box constraints to define given search space. f = optunity.functions.logged(self.loss_function_batch) # Call log here because function signature used later on is internal logic. f = tree.wrap_decoder(f) # Wrap decoder and constraints for internal search space rep. f = optunity.constraints.wrap_constraints(f, default=sys.float_info.max*numpy.ones(self.num_args_obj), range_oo=box) # 'wrap_constraints' decorates function f with given input domain constraints. default [float] gives a # function value to default to in case of constraint violations. range_oo [dict] gives open range # constraints lb and lu, i.e. lb < x < ub and range = (lb, ub), respectively. try: self.best, _ = optunity.optimize_dyn_PSO(func=f, box=box, domains=domains, maximize=False, max_evals=self.max_iterations, num_args_obj=self.num_args_obj, num_params_obj=self.num_params_obj, pmap=self.hyppopy_optunity_solver_pmap, #map,#optunity.pmap, decoder=tree.decode, update_param=self.update_param, eval_obj=self.combine_obj, phi1=self.phi1, phi2=self.phi2 ) # Workaround: Unpack best result, im max_iterations was reached. try: for key in self.best: self.best[key] = self.best[key].get()[0] except: pass """ optimize_dyn_PSO(func, maximize=False, max_evals=0, pmap=map, decoder=None, update_param=None, eval_obj=None) Optimize func with dynamic PSO solver. :param func: [callable] objective function :param maximize: [bool] maximize or minimize :param max_evals: [int] maximum number of permitted function evaluations :param pmap: [function] map() function to use :param update_param: [function] function to update parameters of objective function based on current state of knowledge :param eval_obj: [function] function giving functional form of objective function, i.e. how to combine parameters and terms to obtain scalar fitness/loss. :return: solution, named tuple with further details optimize_dyn_PSO function (api.py) internally uses 'optimize' function from dynamic PSO solver module. """ except Exception as e: LOG.error("Internal error in optunity.optimize_dyn_PSO occured. {}".format(e)) raise BrokenPipeError("Internal error in optunity.optimize_dyn_PSO occured. {}".format(e)) - def print_best(self): - """ - Optimization result console output printing. - """ - print("\n") - print("#" * 40) - print("### Best Parameter Choice ###") - print("#" * 40) - for name, value in self.best.items(): - print(" - {}\t:\t{}".format(name, value)) - #print("\n - number of iterations\t:\t{}".format(self.trials.trials[-1]['tid']+1)) - #print(" - total time\t:\t{}d:{}h:{}m:{}s:{}ms".format(self._total_duration[0], - # self._total_duration[1], - # self._total_duration[2], - # self._total_duration[3], - # self._total_duration[4])) - print("#" * 40) - def run(self, print_stats=True): """ This function starts the optimization process. :param print_stats: [bool] en- or disable console output """ self._idx = 0 self.trials = Trials() start_time = datetime.datetime.now() try: search_space, domains = self.convert_searchspace(self.project.hyperparameter) except Exception as e: msg = "Failed to convert searchspace, error: {}".format(e) LOG.error(msg) raise AssertionError(msg) try: self.execute_solver(search_space, domains) except Exception as e: msg = "Failed to execute solver, error: {}".format(e) LOG.error(msg) raise AssertionError(msg) end_time = datetime.datetime.now() dt = end_time - start_time days = divmod(dt.total_seconds(), 86400) hours = divmod(days[1], 3600) minutes = divmod(hours[1], 60) seconds = divmod(minutes[1], 1) milliseconds = divmod(seconds[1], 0.001) self._total_duration = [int(days[0]), int(hours[0]), int(minutes[0]), int(seconds[0]), int(milliseconds[0])] self.print_best() if print_stats: self.print_timestats() diff --git a/hyppopy/solvers/MPISolverWrapper.py b/hyppopy/solvers/MPISolverWrapper.py index 3759d56..93cecf4 100644 --- a/hyppopy/solvers/MPISolverWrapper.py +++ b/hyppopy/solvers/MPISolverWrapper.py @@ -1,168 +1,167 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import datetime import os import logging import numpy as np from mpi4py import MPI from hyppopy.globals import DEBUGLEVEL, MPI_TAGS from hyppopy.MPIBlackboxFunction import MPIBlackboxFunction LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) class MPISolverWrapper: """ TODO Class description The MPISolverWrapper class wraps the functionality of solvers in Hyppopy to extend them with MPI functionality. It builds upon the interface defined by the HyppopySolver class. """ def __init__(self, solver=None, mpi_comm=None): """ The constructor accepts a HyppopySolver. :param solver: [HyppopySolver] solver instance, default=None :param mpi_comm: [MPI communicator] MPI communicator instance. If None, we create a new MPI.COMM_WORLD, default=None """ self._solver = solver self._mpi_comm = None if mpi_comm is None: - print('MPISolverWrapper: No mpi_comm given: Using MPI.COMM_WORLD') + LOG.info('MPISolverWrapper: No mpi_comm given: Using MPI.COMM_WORLD') self._mpi_comm = MPI.COMM_WORLD else: self._mpi_comm = mpi_comm @property def blackbox(self): """ Get the BlackboxFunction object. :return: [object] BlackboxFunction instance or function of member solver """ return self._solver.blackbox @blackbox.setter def blackbox(self, value): """ Set the BlackboxFunction wrapper class encapsulating the loss function or a function accepting a hyperparameter set and returning a float. If the passed value is not an instance of MPIBlackboxFunction (or a derived class) it will automatically wrapped by an MPIBackboxFunction. :return: """ if isinstance(value, MPIBlackboxFunction): self._solver.blackbox = value else: self._solver.blackbox = MPIBlackboxFunction(blackbox_func=value, mpi_comm=self._mpi_comm) def get_results(self): """ Just call get_results of the member solver and return the result. :return: return value of self._solver.get_results() """ # Only rank==0 returns results, the workers return None. mpi_rank = self._mpi_comm.Get_rank() if mpi_rank == 0: return self._solver.get_results() return None, None def run_worker_mode(self): """ This function is called if the wrapper should run as a worker for a specific MPI rank. It receives messages for the following tags: tag==MPI_SEND_CANDIDATE: parameters for the loss calculation. It param==None, the worker finishes. It sends messages for the following tags: tag==MPI_SEND_RESULT: result of an evaluated candidate. :return: the evaluated loss of the candidate """ rank = self._mpi_comm.Get_rank() - print("Starting worker {}. Waiting for param...".format(rank)) + LOG.info("Starting worker {}. Waiting for param...".format(rank)) cand_results = dict() while True: try: candidate = self._mpi_comm.recv(source=0, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) # Wait here till params are received if candidate is None: - print("[RECEIVE] Process {} received finish signal.".format(rank)) + LOG.info("[RECEIVE] Process {} received finish signal.".format(rank)) return # if candidate.ID == 9999: # comm.gather(losses, root=0) # continue - # print("[WORKING] Process {} is actually doing things.".format(rank)) + LOG.debug("[WORKING] Process {} is actually doing things.".format(rank)) cand_id = candidate.ID params = candidate.get_values() - loss = self._solver.blackbox.blackbox(params) + loss = self._solver.blackbox(params) except Exception as e: msg = "Error in Worker(rank={}): {}".format(rank, e) LOG.error(msg) - print(msg) loss = np.nan finally: cand_results['book_time'] = datetime.datetime.now() cand_results['loss'] = loss # Write loss to dictionary. This dictionary will be send back to the master via gather cand_results['refresh_time'] = datetime.datetime.now() cand_results['book_time'] = datetime.datetime.now() cand_results['loss'] = loss # Write loss to dictionary. This dictionary will be send back to the master via gather cand_results['refresh_time'] = datetime.datetime.now() self._mpi_comm.send((cand_id, cand_results), dest=0, tag=MPI_TAGS.MPI_SEND_RESULTS.value) def signal_worker_finished(self): """ This function sends data==None to all workers from the master. This is the signal that tells the workers to finish. :return: """ - print('[SEND] signal_worker_finished') + LOG.info('[SEND] signal_worker_finished') size = self._mpi_comm.Get_size() for i in range(size - 1): self._mpi_comm.send(None, dest=i + 1, tag=MPI_TAGS.MPI_SEND_CANDIDATE.value) def run(self, *args, **kwargs): """ This function starts the optimization process of the underlying solver and takes care of the MPI awareness. """ mpi_rank = self._mpi_comm.Get_rank() if mpi_rank == 0: # This is the master process. From here we run the solver and start all the other processes. self._solver.run(*args, **kwargs) self.signal_worker_finished() # Tell the workers to finish. else: # this script execution should be in worker mode as it is an mpi worker. self.run_worker_mode() def is_master(self): mpi_rank = self._mpi_comm.Get_rank() if mpi_rank == 0: return True else: return False def is_worker(self): mpi_rank = self._mpi_comm.Get_rank() if mpi_rank != 0: return True else: return False diff --git a/hyppopy/solvers/OptunaSolver.py b/hyppopy/solvers/OptunaSolver.py index 126f455..e0fe5b5 100644 --- a/hyppopy/solvers/OptunaSolver.py +++ b/hyppopy/solvers/OptunaSolver.py @@ -1,150 +1,149 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import os import optuna import logging import warnings import numpy as np from pprint import pformat from hyppopy.globals import DEBUGLEVEL from hyppopy.solvers.HyppopySolver import HyppopySolver from hyppopy.CandidateDescriptor import CandidateDescriptor LOG = logging.getLogger(os.path.basename(__file__)) LOG.setLevel(DEBUGLEVEL) class OptunaSolver(HyppopySolver): def __init__(self, project=None): """ The constructor accepts a HyppopyProject. :param project: [HyppopyProject] project instance, default=None """ HyppopySolver.__init__(self, project) self._searchspace = None self.candidates_list = list() def define_interface(self): """ This function is called when HyppopySolver.__init__ function finished. Child classes need to define their individual parameter here by calling the _add_member function for each class member variable need to be defined. Using _add_hyperparameter_signature the structure of a hyperparameter the solver expects must be defined. Both, members and hyperparameter signatures are later get checked, before executing the solver, ensuring settings passed fullfill solver needs. """ self._add_member("max_iterations", int) self._add_hyperparameter_signature(name="domain", dtype=str, options=["uniform", "categorical"]) self._add_hyperparameter_signature(name="data", dtype=list) self._add_hyperparameter_signature(name="type", dtype=type) def get_candidates(self, trial=None): """ This function converts the searchspace to a candidate_list that can then be used to distribute via MPI. :param searchspace: converted hyperparameter space """ candidates_list = list() N = self.max_iterations for n in range(N): - print(n) # Todo: Ugly hack that does not even work... from optuna import trial as trial_module # temp_study = optuna.create_study() trial_id = self.study._storage.create_new_trial_id(0) trial = trial_module.Trial(self.study, trial_id) ## trial.report(result) ## self._storage.set_trial_state(trial_id, structs.TrialState.COMPLETE) ## self._log_completed_trial(trial_number, result) params = {} for name, param in self._searchspace.items(): if param["domain"] == "categorical": params[name] = trial.suggest_categorical(name, param["data"]) else: params[name] = trial.suggest_uniform(name, param["data"][0], param["data"][1]) candidates_list.append(CandidateDescriptor(**params)) return candidates_list N = self.max_iterations for n in range(N): params = {} for name, param in self._searchspace.items(): if param["domain"] == "categorical": params[name] = trial.suggest_categorical(name, param["data"]) else: params[name] = trial.suggest_uniform(name, param["data"][0], param["data"][1]) candidates_list.append(CandidateDescriptor(**params)) return candidates_list def trial_cache(self, trial): """ Optuna specific loss function wrapper :param trial: [Trial] instance :return: [function] loss function """ params = {} for name, param in self._searchspace.items(): if param["domain"] == "categorical": params[name] = trial.suggest_categorical(name, param["data"]) else: params[name] = trial.suggest_uniform(name, param["data"][0], param["data"][1]) return self.loss_function(**params) def execute_solver(self, searchspace): """ This function is called immediately after convert_searchspace and get the output of the latter as input. It's purpose is to call the solver libs main optimization function. :param searchspace: converted hyperparameter space """ LOG.debug("execute_solver using solution space:\n\n\t{}\n".format(pformat(searchspace))) self._searchspace = searchspace try: study = optuna.create_study() study.optimize(self.trial_cache, n_trials=self.max_iterations) self.best = study.best_trial.params except Exception as e: LOG.error("internal error in bayes_opt maximize occured. {}".format(e)) raise BrokenPipeError("internal error in bayes_opt maximize occured. {}".format(e)) def convert_searchspace(self, hyperparameter): """ This function gets the unified hyppopy-like parameterspace description as input and, if necessary, should convert it into a solver lib specific format. The function is invoked when run is called and what it returns is passed as searchspace argument to the function execute_solver. :param hyperparameter: [dict] nested parameter description dict e.g. {'name': {'domain':'uniform', 'data':[0,1], 'type':'float'}, ...} :return: [object] converted hyperparameter space """ LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter))) for name, param in hyperparameter.items(): if param["domain"] != "categorical" and param["domain"] != "uniform": msg = "Warning: Optuna cannot handle {} domain. Only uniform and categorical domains are supported!".format(param["domain"]) warnings.warn(msg) LOG.warning(msg) return hyperparameter