def __init__(self):
        self._solver_list = ["hyperopt", "optunity", "optuna", "randomsearch", "quasirandomsearch", "gridsearch"]

    def get_solver_names(self):
        """
        Returns a list of available solvers

        :return: [list] solver list
        """
        return self._solver_list

    def get(self, solver_name=None, project=None):
        """
        Get the configured solver instance

        :param solver_name: [str] solver name, if None, the project must have an attribute solver keeping the solver name, default=None
        :param project: [HyppopyProject] HyppopyProject instance

        :return: [HyppopySolver] the configured solver instance
        """
        if solver_name is not None:
            assert isinstance(solver_name, str), "precondition violation, solver_name type str expected, got {} instead!".format(type(solver_name))
        if project is not None:
            assert isinstance(project, HyppopyProject), "precondition violation, project type HyppopyProject expected, got {} instead!".format(type(project))
            if "solver" in project.__dict__:
                solver_name = project.solver

        if solver_name not in self._solver_list:
            raise AssertionError("Solver named [{}] not implemented!".format(solver_name))

        if solver_name == "hyperopt":
            if project is not None:
                return HyperoptSolver(project)
            return HyperoptSolver()
        elif solver_name == "optunity":
            if project is not None:
                return OptunitySolver(project)
            return OptunitySolver()
        elif solver_name == "optuna":
            if project is not None:
                return OptunaSolver(project)
            return OptunaSolver()
        elif solver_name == "gridsearch":
            if project is not None:
                return GridsearchSolver(project)
            return GridsearchSolver()
        elif solver_name == "randomsearch":
            if project is not None:
                return RandomsearchSolver(project)
            return RandomsearchSolver()
        elif solver_name == "quasirandomsearch":
            if project is not None:
                return QuasiRandomsearchSolver(project)
            return QuasiRandomsearchSolver() The function values returned are in the range [a, b].

    :param a: left value range bound
    :param b: right value range bound
    :param N: discretization of intervall [a,b]
    :param dtype: data type

    :return: [list] axis range
    """
    assert a < b, "condition a < b violated!"
    assert a > 0, "condition a > 0 violated!"
    assert isinstance(N, int), "condition N of type int violated!"
    # convert input range into exponent range
    lexp = np.log(a)
    rexp = np.log(b)
    exp_range = np.linspace(lexp, rexp, N)
    data = []
    for n in range(exp_range.shape[0]):
        x = np.exp(exp_range[n])
        if dtype is int:
            data.append(int(x))
        elif dtype is float:
            data.append(x)
        else:
            raise AssertionError("dtype {} not supported for uniform sampling!".format(dtype))
    return data


class GridsearchSolver(HyppopySolver):
    """
    The GridsearchSolver class implements a gridsearch optimization. The gridsearch supports categorical,
    uniform, normal and loguniform sampling. To use the GridsearchSolver, besides a range, one must specifiy the number of samples in the domain, e.g.
    'data': [0, 1, 100]
    """

    def __init__(self, project=None):
        HyppopySolver.__init__(self, project)

    def define_interface(self):
        self._add_hyperparameter_signature(name="domain", dtype=str,
                                            options=["uniform", "normal", "loguniform", "categorical"])
        self._add_hyperparameter_signature(name="data", dtype=list)
        self._add_hyperparameter_signature(name="frequency", dtype=int)
        self._add_hyperparameter_signature(name="type", dtype=type)

    def loss_function_call(self, params):
        loss = self.blackbox(**params)
        if loss is None:
            return np.nan
        return loss

    def execute_solver(self, searchspace):
        for x in product(*searchspace[1]):
            params = {}
            for name, value in zip(searchspace[0], x):
                params[name] = value
            try:
                self.loss_function(**params)
            except Exception as e:
                msg = "internal error in randomsearch execute_solver occured. {}".format(e)
                LOG.error(msg)
                raise BrokenPipeError(msg)
        self.best = self._trials.argmin

    def convert_searchspace(self, hyperparameter):
        """
        The function converts the standard parameter input into a range list depending
        on the domain. These rangelists are later used with itertools product to create a paramater space sample of each combination.

        :param hyperparameter: [dict] hyperparameter space

        :return: [list] name and range for each parameter space axis
        """
        LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter)))
        searchspace = [[], []]
        for name, param in hyperparameter.items():
            if param["domain"] != "categorical" and "frequency" not in param.keys():
                param["frequency"] = DEFAULTGRIDFREQUENCY
                warnings.warn("No frequency field found, used default gridsearch frequency {}".format(DEFAULTGRIDFREQUENCY))
            if param["domain"] == "categorical":
                searchspace[0].append(name)
                searchspace[1].append(param["data"])
            elif param["domain"] == "uniform":
                searchspace[0].append(name)
                searchspace[1].append(get_uniform_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"]))
            elif param["domain"] == "normal":
                searchspace[0].append(name)
                searchspace[1].append(get_gaussian_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"]))
            elif param["domain"] == "loguniform":
                searchspace[0].append(name)
                searchspace[1].append(get_logarithmic_axis_sample(param["data"][0], param["data"][1], param["frequency"], param["type"]))
        return searchspace The class needs a total
    number of samples and the number of dimensions to generate a quasirandom sequence for each axis. The method
    get_unit_space returns a sequence list with N_samples for each axis representing N_samples vectors on a unit sphere.
    """

    def __init__(self):
        pass

    def __next_prime(self):
        """
        Checks if num is a prime value
        """
        def is_prime(num):
            for i in range(2, int(num ** 0.5) + 1):
                if (num % i) == 0: return False
            return True
        prime = 3
        while 1:
            if is_prime(prime):
                yield prime
            prime += 2

    def __vdc(self, n, base):
        vdc, denom = 0, 1
        while n:
            denom *= base
            n, remainder = divmod(n, base)
            vdc += remainder / float(denom)
        return vdc

    def get_unit_space(self, N_samples, N_dims):
        """
        Returns a unit space in form of a sequence list keeping N_dims sequences with N_sample samplings. Each sample
        represents a N_dims dimensional vector on a unit sphere.

        :param N_samples: [int] Number of samples
        :param N_dims: [int] Number of dimensions

        :return: [list] samples list of length N_dims keeping lists each of length N_samples
        """
        seq = []
        primeGen = self.__next_prime()
        next(primeGen)
        for d in range(N_dims):
            base = next(primeGen)
            seq.append([self.__vdc(i, base) for i in range(N_samples)])
        return seq


class QuasiRandomSampleGenerator(object):
    """
    This class takes care of the hyperparameter space creation and next sample delivery.
    """
    def __init__(self, N_samples=None):
        self._axis = None
        self._samples = []
        self._numerical = []
        self._categorical = []
        self._N_samples = N_samples

    def set_axis(self, name, data, domain, dtype):
        """
        Add an axis description.

        :param name: [str] axis name
        :param data: [list] axis range [min, max]
        :param domain: [str] axis domain
        :param dtype: [type] axis data type
        """
        if domain == "categorical":
            if dtype is int:
                data = [int(i) for i in data]
            elif dtype is str:
                data = [str(i) for i in data]
            elif dtype is float:
                data = [float(i) for i in data]
            self._categorical.append({"name": name, "data": data, "type": dtype})
        else:
            self._numerical.append({"name": name, "data": data, "type": dtype, "domain": domain})

    def generate_samples(self, N_samples=None):
        """
        This function is called once when the first sample is requested. It generates the halton sequence space.

        :param N_samples: [int] number of samples
        """
        self._axis = []
        if N_samples is None:
            assert isinstance(self._N_samples, int), "Precondition violation, no number of samples specified!"
        else:
            self._N_samples = N_samples

        axis_samples = {}
        if len(self._numerical) > 0:
            generator = HaltonSequenceGenerator()
            unit_space = generator.get_unit_space(self._N_samples, len(self._numerical))
            for n, axis in enumerate(self._numerical):
                width = abs(axis["data"][1] - axis["data"][0])
                unit_space[n] = [x * width for x in unit_space[n]]
                unit_space[n] = [x + axis["data"][0] for x in unit_space[n]]
                if axis["type"] is int:
                    unit_space[n] = [int(round(x)) for x in unit_space[n]]
                axis_samples[axis["name"]] = unit_space[n]
        else:
            warnings.warn("No numerical axis defined, this warning can be ignored if searchspace is categorical only, otherwise check if axis was set!")

        for n in range(self._N_samples):
            sample = {}
            for name, data in axis_samples.items():
                sample[name] = data[n]
            for cat in self._categorical:
                choice = np.random.choice(len(cat["data"]), 1)[0]
                sample[cat["name"]] = cat["data"][choice]
            self._samples.append(sample)

    def next(self):
        """
        Returns the next sample. Returns None if all samples are requested.

        :return: [dict] sample dict {'name':value, ...}
        """
        if len(self._samples) == 0:
            self.generate_samples()
        if len(self._samples) == 0:
            return None
        next_index = np.random.choice(len(self._samples), 1)[0]
        sample = self._samples.pop(next_index)
        return sample


class QuasiRandomsearchSolver(HyppopySolver):
    """
    The QuasiRandomsearchSolver class implements a quasi randomsearch optimization. The quasi randomsearch supports
    categorical and uniform sampling. The solver defines a Halton Sequence distributed hyperparameter space. This
    means a rather evenly distributed space sampling but no real randomness.
    """

    def __init__(self, project=None):
        HyppopySolver.__init__(self, project)
        self._sampler = None

    def define_interface(self):
        self._add_member("max_iterations", int)
        self._add_hyperparameter_signature(name="domain", dtype=str, options=["uniform", "categorical"])
        self._add_hyperparameter_signature(name="data", dtype=list)
        self._add_hyperparameter_signature(name="type", dtype=type)

    def loss_function_call(self, params):
        loss = self.blackbox(**params)
        if loss is None:
            return np.nan
        return loss

    def execute_solver(self, searchspace):
        N = self.max_iterations
        self._sampler = QuasiRandomSampleGenerator(N)
        for name, axis in searchspace.items():
            self._sampler.set_axis(name, axis["data"], axis["domain"], axis["type"])
        try:
            for n in range(N):
                params = self._sampler.next()
                if params is None:
                    break
                self.loss_function(**params)
        except Exception as e:
            msg = "internal error in randomsearch execute_solver occured. {}".format(e)
            LOG.error(msg)
            raise BrokenPipeError(msg)
        self.best = self._trials.argmin

    def convert_searchspace(self, hyperparameter):
        LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter)))
        return hyperparameter __all__ = ['RandomsearchSolver',
           'draw_uniform_sample',
           'draw_normal_sample',
           'draw_loguniform_sample',
           'draw_categorical_sample',
           'draw_sample']

import os
import copy
import random
import logging
import numpy as np
from pprint import pformat

from hyppopy.globals import DEBUGLEVEL
from hyppopy.solvers.HyppopySolver import HyppopySolver

LOG = logging.getLogger(os.path.basename(__file__ The randomsearch supports categorical, uniform, normal and loguniform sampling. The solver draws an independent sample - from the parameter space each iteration.""" + from the parameter space each iteration. + """ def __init__(self, project=None): HyppopySolver.__init__(self, project) def define_interface(self): self._add_member("max_iterations", int) self._add_hyperparameter_signature(name="domain", dtype=str, options=["uniform", "normal", "loguniform", "categorical"]) self._add_hyperparameter_signature(name="data", dtype=list) self._add_hyperparameter_signature(name="type", dtype=type) def loss_function_call(self, params): loss = self.blackbox(**params) if loss is None: return np.nan return loss def execute_solver(self, searchspace): N = self.max_iterations try: for n in range(N): params = {} for name, p in searchspace.items(): params[name] = draw_sample(p) self.loss_function(**params) except Exception as e: msg = "internal error in randomsearch execute_solver occured. {}".format(e) LOG.error(msg) raise BrokenPipeError(msg) self.best = self._trials.argmin def convert_searchspace(self, hyperparameter): - """ - this function simply pipes the input parameter through, the sample - drawing functions are responsible for interpreting the parameter. - :param hyperparameter: [dict] hyperparameter space - :return: [dict] hyperparameter space - """ LOG.debug("convert input parameter\n\n\t{}\n".format(pformat(hyperparameter))) return hyperparameter diff --git a/hyppopy/tests/test_randomsearchsolver.py b/hyppopy/tests/test_randomsearchsolver.py index 18ca58e..0be138c 100644 --- a/hyppopy/tests/test_randomsearchsolver.py +++ b/hyppopy/tests/test_randomsearchsolver.py @@ -1,164 +1,165 @@ # Hyppopy - A Hyper-Parameter Optimization Toolbox # # Copyright (c) German Cancer Research Center, # Division of Medical Image Computing. # All rights reserved. # # This software is distributed WITHOUT ANY WARRANTY; without # even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. # # See LICENSE import unittest +import numpy as np import matplotlib.pylab as plt from hyppopy.solvers.RandomsearchSolver import * from hyppopy.FunctionSimulator import FunctionSimulator from hyppopy.HyppopyProject import HyppopyProject class RandomsearchTestSuite(unittest.TestCase): def setUp(self): pass def test_draw_uniform_sample(self): param = {"data": [0, 1, 10], "type": float} values = [] for i in range(10000): values.append(draw_uniform_sample(param)) self.assertTrue(0 <= values[-1] <= 1) self.assertTrue(isinstance(values[-1], float)) hist = plt.hist(values, bins=10, normed=True) std = np.std(hist[0]) mean = np.mean(hist[0]) self.assertTrue(std < 0.05) self.assertTrue(0.9 < mean < 1.1) param = {"data": [0, 10, 11], "type": int} values = [] for i in range(10000): values.append(draw_uniform_sample(param)) self.assertTrue(0 <= values[-1] <= 10) self.assertTrue(isinstance(values[-1], int)) hist = plt.hist(values, bins=11, normed=True) std = np.std(hist[0]) mean = np.mean(hist[0]) self.assertTrue(std < 0.05) self.assertTrue(0.09 < mean < 0.11) def test_draw_normal_sample(self): param = {"data": [0, 10, 11], "type": int} values = [] for i in range(10000): values.append(draw_normal_sample(param)) self.assertTrue(0 <= values[-1] <= 10) self.assertTrue(isinstance(values[-1], int)) hist = plt.hist(values, bins=11, normed=True) for i in range(1, 5): self.assertTrue(hist[0][i-1]-hist[0][i] < 0) for i in range(5, 10): self.assertTrue(hist[0][i] - hist[0][i+1] > 0) def test_draw_loguniform_sample(self): param = {"data": [1, 1000, 11], "type": float} values = [] for i in range(10000): values.append(draw_loguniform_sample(param)) self.assertTrue(1 <= values[-1] <= 1000) self.assertTrue(isinstance(values[-1], float)) hist = plt.hist(values, bins=11, normed=True) for i in range(4): self.assertTrue(hist[0][i] > hist[0][i+1]) self.assertTrue((hist[0][i] - hist[0][i+1]) > 0) def test_draw_categorical_sample(self): param = {"data": [1, 2, 3], "type": int} values = [] for i in range(10000): values.append(draw_categorical_sample(param)) self.assertTrue(values[-1] == 1 or values[-1] == 2 or values[-1] == 3) self.assertTrue(isinstance(values[-1], int)) hist = plt.hist(values, bins=3, normed=True) for i in range(3): self.assertTrue(0.45 < hist[0][i] < 0.55) def test_solver_uniform(self): config = { "hyperparameter": { "axis_00": { "domain": "uniform", "data": [0, 800], "type": float }, "axis_01": { "domain": "uniform", "data": [-1, 1], "type": float }, "axis_02": { "domain": "uniform", "data": [0, 10], "type": float } }, "max_iterations": 300 } project = HyppopyProject(config) solver = RandomsearchSolver(project) vfunc = FunctionSimulator() vfunc.load_default() solver.blackbox = vfunc solver.run(print_stats=False) df, best = solver.get_results() self.assertTrue(0 <= best['axis_00'] <= 800) self.assertTrue(-1 <= best['axis_01'] <= 1) self.assertTrue(0 <= best['axis_02'] <= 10) for status in df['status']: self.assertTrue(status) for loss in df['losses']: self.assertTrue(isinstance(loss, float)) def test_solver_normal(self): config = { "hyperparameter": { "axis_00": { "domain": "normal", "data": [500, 650], "type": float }, "axis_01": { "domain": "normal", "data": [0, 1], "type": float }, "axis_02": { "domain": "normal", "data": [4, 5], "type": float } }, "max_iterations": 500, } solver = RandomsearchSolver(config) vfunc = FunctionSimulator() vfunc.load_default() solver.blackbox = vfunc solver.run(print_stats=False) df, best = solver.get_results() self.assertTrue(500 <= best['axis_00'] <= 650) self.assertTrue(0 <= best['axis_01'] <= 1) self.assertTrue(4 <= best['axis_02'] <= 5) for status in df['status']: self.assertTrue(status) for loss in df['losses']: self.assertTrue(isinstance(loss, float)) if __name__ == '__main__': unittest.main()