Skip to content

algorithms.knowledge.nsga2_kgb

Classes

NSGA2KGB

NSGA2KGB(
    c_size=13,
    perturb_dev=0.1,
    ps={},
    save_ps=False,
    **kwargs,
)

Bases: DNSGA2

Knowledge Guided Bayesian (KGB).

References

Ye, Y., Li, L., Lin, Q., Wong, K.-C., Li, J., and Ming, Z. (2022). Knowledge guided bayesian classification for dynamic multi-objective optimization. Knowledge-Based Systems, 250, 109173. https://doi.org/10.1016/j.knosys.2022.109173

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def __init__(
    self,
    c_size=13,
    perturb_dev=0.1,
    ps={},
    save_ps=False,
    **kwargs,
):

    super().__init__(**kwargs)

    self.C_SIZE = c_size
    self.PERTURB_DEV = perturb_dev

    self.ps = ps
    self.save_ps = save_ps

    self.nr_rand_solutions = 50 * self.pop_size
    self.t = 0

Functions

_response_mechanism
_response_mechanism()

Response mechanism.

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def _response_mechanism(self):
    """Response mechanism."""
    # increase t counter for unique key of PS
    self.t += 1

    # conduct knowledge reconstruction examination
    pop_useful, pop_useless, c = self.knowledge_reconstruction_examination()

    # Train a naive bayesian classifier
    model = self.naive_bayesian_classifier(pop_useful, pop_useless)

    # generate a lot of random solutions with the dimensions of problem decision space
    X_test = self.random_strategy(self.nr_rand_solutions)

    # introduce noise to vary previously useful solutions
    noise = self.random_state.normal(0, self.PERTURB_DEV, self.problem.n_var)
    noisy_useful_history = np.asarray(pop_useful) + noise

    # check whether solutions are within bounds
    noisy_useful_history = self.check_boundaries(noisy_useful_history)

    # add noisy useful history to randomly generated solutions
    X_test = np.vstack((X_test, noisy_useful_history))

    # predict whether random solutions are useful or useless
    Y_test = model.predict(X_test)

    # create list of useful predicted solutions
    predicted_pop = self.predicted_population(X_test, Y_test)

    # ------ POPULATION GENERATION --------
    # take a random sample from predicted pop and known useful pop
    if len(predicted_pop) >= self.pop_size - self.C_SIZE:
        init_pop = []
        predicted_pop = random.sample(
            predicted_pop, self.pop_size - self.C_SIZE
        )

        # add sampled solutions to init_pop
        for solution in predicted_pop:
            init_pop.append(solution)

        # add cluster centroids to init_pop
        for solution in c:
            init_pop.append(np.asarray(solution))

    else:

        # if not enough predicted solutions are available, add all predicted solutions to init_pop
        init_pop = []

        for solution in predicted_pop:
            init_pop.append(solution)

        # add cluster centroids to init_pop
        for solution in c:
            init_pop.append(np.asarray(solution))

    # if there are still not enough solutions in init_pop randomly sample previously useful solutions directly without noise to init_pop
    if len(init_pop) < self.pop_size:

        # fill up init_pop with randomly sampled solutions from pop_useful
        if len(pop_useful) >= self.pop_size - len(init_pop):

            init_pop = np.vstack(
                (
                    init_pop,
                    random.sample(pop_useful, self.pop_size - len(init_pop)),
                )
            )
        else:
            # if not enough solutions are available, add all previously known useful solutions without noise to init_pop
            for solution in pop_useful:
                init_pop.append(solution)

    # if there are still not enough solutions in init_pop generate random solutions with the dimensions of problem decision space
    if len(init_pop) < self.pop_size:

        # fill up with random solutions
        init_pop = np.vstack(
            (init_pop, self.random_strategy(self.pop_size - len(init_pop)))
        )

    # recreate the current population without being evaluated
    pop = Population.new(X=init_pop)

    # reevaluate because we know there was a change
    self.evaluator.eval(self.problem, pop)

    # do a survival to recreate rank and crowding of all individuals
    pop = self.survival.do(self.problem, pop, n_survive=len(pop), random_state=self.random_state)

    return pop
add_to_ps
add_to_ps()

Add the current Pareto optimal set (POS) to the Pareto set (PS) with individual keys.

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def add_to_ps(self):
    """
    Add the current Pareto optimal set (POS) to the Pareto set (PS) with individual keys.
    """

    PS_counter = 0

    for individual in self.opt:

        if isinstance(individual.X, list):
            individual.X = np.asarray(individual.X)

        centroid = self.calculate_cluster_centroid(individual.X)

        self.ps[str(PS_counter) + "-" + str(self.t)] = {
            "solutions": [individual.X.tolist()],
            "centroid": centroid,
        }

        PS_counter += 1
calculate_cluster_centroid
calculate_cluster_centroid(solution_cluster)

Calculate the centroid for a given cluster of solutions. :param solution_cluster: List of solutions in the cluster :return: Cluster centroid

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def calculate_cluster_centroid(self, solution_cluster):
    """
    Calculate the centroid for a given cluster of solutions.
    :param solution_cluster: List of solutions in the cluster
    :return: Cluster centroid
    """
    # Get number of variable shape
    try:
        n_vars = len(solution_cluster[0])
    except TypeError:
        solution_cluster = np.array(solution_cluster)
        return solution_cluster.tolist()

    # TODO: this is lazy garbage fix whats coming in
    cluster = []
    for i in range(len(solution_cluster)):
        # cluster.append(solution_cluster[i].tolist())
        cluster.append(solution_cluster[i])
    solution_cluster = np.asarray(cluster)

    # Get number of solutions
    length = solution_cluster.shape[0]

    centroid_points = []

    # calculate centroid for each variable, by taking mean of every variable of cluster
    for i in range(n_vars):
        # calculate sum over cluster
        centroid_points.append(np.sum(solution_cluster[:, i]))

    return [x / length for x in centroid_points]
check_boundaries
check_boundaries(pop)

Check and fix the boundaries of the given population. :param pop: Population to check and fix boundaries :return: Population with corrected boundaries

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def check_boundaries(self, pop):
    """
    Check and fix the boundaries of the given population.
    :param pop: Population to check and fix boundaries
    :return: Population with corrected boundaries
    """
    # check whether numpy array or pymoo population is given
    if isinstance(pop, Population):
        pop = pop.get("X")

    # check if any solution is outside the bounds
    for individual in pop:
        for i in range(len(individual)):
            if individual[i] > self.problem.xu[i]:
                individual[i] = self.problem.xu[i]
            elif individual[i] < self.problem.xl[i]:
                individual[i] = self.problem.xl[i]
    return pop
knowledge_reconstruction_examination
knowledge_reconstruction_examination()

Perform the knowledge reconstruction examination. :return: Tuple containing the useful population, useless population, and cluster centroids

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def knowledge_reconstruction_examination(self):
    """
    Perform the knowledge reconstruction examination.
    :return: Tuple containing the useful population, useless population, and cluster centroids
    """
    clusters = self.ps  # set historical PS set as clusters
    Nc = self.C_SIZE  # set final nr of clusters
    size = len(self.ps)  # set size iteration to length of cluster
    run_counter = 0  # counter variable to give unique key

    # while there are still clusters to be condensed
    while size > Nc:

        counter = 0
        min_distance = None
        min_distance_index = []

        # get clusters that are closest to each other by calculating the euclidean distance
        for keys_i in clusters.keys():
            for keys_j in clusters.keys():
                if (
                    clusters[keys_i]["solutions"]
                    is not clusters[keys_j]["solutions"]
                ):

                    dst = euclidean_distance(
                        clusters[keys_i]["centroid"],
                        clusters[keys_j]["centroid"],
                    )

                    if min_distance == None:
                        min_distance = dst
                        min_distance_index = [keys_i, keys_j]
                    elif dst < min_distance:
                        min_distance = dst

                        min_distance_index = [keys_i, keys_j]

                    counter += 1

        # merge closest clusters
        for solution in clusters[min_distance_index[1]]["solutions"]:
            clusters[min_distance_index[0]]["solutions"].append(solution)

        # calculate new centroid for merged cluster
        clusters[min_distance_index[0]][
            "centroid"
        ] = self.calculate_cluster_centroid(
            clusters[min_distance_index[0]]["solutions"]
        )

        # remove cluster that was merged
        del clusters[min_distance_index[1]]

        size -= 1
        run_counter += 1

    c = []  # list of centroids
    pop_useful = []
    pop_useless = []

    # get centroids of clusters
    for key in clusters.keys():
        c.append(clusters[key]["centroid"])

    # create pymoo population objected to evaluate centroid solutions
    centroid_pop = Population.new("X", c)

    # evaluate centroids
    self.evaluator.eval(self.problem, centroid_pop)

    # do non-dominated sorting on centroid solutions
    ranking = NonDominatedSorting().do(centroid_pop.get("F"), return_rank=True)[-1]

    # add the individuals from the clusters with the best objective values to the useful population the rest is useless :(

    for idx, rank in enumerate(ranking):
        if rank == 0:
            for key in clusters.keys():
                if centroid_pop[idx].X == clusters[key]["centroid"]:
                    for cluster_individual in clusters[key]["solutions"]:
                        pop_useful.append(cluster_individual)
        else:
            for key in clusters.keys():
                if centroid_pop[idx].X == clusters[key]["centroid"]:
                    for cluster_individual in clusters[key]["solutions"]:
                        pop_useless.append(cluster_individual)

    # return useful and useless population and the centroid solutions
    return pop_useful, pop_useless, c
naive_bayesian_classifier
naive_bayesian_classifier(pop_useful, pop_useless)

Train a naive Bayesian classifier using the useful and useless populations. :param pop_useful: Useful population :param pop_useless: Useless population :return: Trained GaussianNB classifier

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def naive_bayesian_classifier(self, pop_useful, pop_useless):
    """
    Train a naive Bayesian classifier using the useful and useless populations.
    :param pop_useful: Useful population
    :param pop_useless: Useless population
    :return: Trained GaussianNB classifier
    """
    labeled_useful_solutions = []
    labeled_useless_solutions = []

    # add labels to solutions
    for individual in pop_useful:
        labeled_useful_solutions.append((individual, +1))

    for individual in pop_useless:
        labeled_useless_solutions.append((individual, -1))

    x_train = []
    y_train = []

    for i in range(len(labeled_useful_solutions)):
        x_train.append(labeled_useful_solutions[i][0])
        y_train.append(labeled_useful_solutions[i][1])

    for i in range(len(labeled_useless_solutions)):
        x_train.append(labeled_useless_solutions[i][0])
        y_train.append(labeled_useless_solutions[i][1])

    x_train = np.asarray(x_train)
    y_train = np.asarray(y_train)

    # fit the naive bayesian classifier with the training data
    model = GaussianNB()
    model.fit(x_train, y_train)

    return model
predicted_population
predicted_population(X_test, Y_test)

Create a predicted population from the test set with positive labels. :param X_test: Test set of features :param Y_test: Test set of labels :return: Predicted population

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def predicted_population(self, X_test, Y_test):
    """
    Create a predicted population from the test set with positive labels.
    :param X_test: Test set of features
    :param Y_test: Test set of labels
    :return: Predicted population
    """
    predicted_pop = []
    for i in range(len(Y_test)):
        if Y_test[i] == 1:
            predicted_pop.append(X_test[i])
    return predicted_pop
random_strategy
random_strategy(N_r)

Generate a random population within the problem boundaries. :param N_r: Number of random solutions to generate :return: Randomly generated population

Source code in pydmoo/algorithms/knowledge/nsga2_kgb.py
def random_strategy(self, N_r):
    """
    Generate a random population within the problem boundaries.
    :param N_r: Number of random solutions to generate
    :return: Randomly generated population
    """
    # generate a random population of size N_r
    # TODO: Check boundaries
    random_pop = self.random_state.random((N_r, self.problem.n_var))

    # check if any solution is outside the bounds
    for individual in random_pop:
        for i in range(len(individual)):
            if individual[i] > self.problem.xu[i]:
                individual[i] = self.problem.xu[i]
            elif individual[i] < self.problem.xl[i]:
                individual[i] = self.problem.xl[i]

    return random_pop