#!/usr/bin/env python
# -*- coding: utf-8 -*-
# License: BSD-3 (https://tldrlegal.com/license/bsd-3-clause-license-(revised))
# Copyright (c) 2016-2021, Cabral, Juan; Luczywo, Nadia
# Copyright (c) 2022, 2023, 2024 QuatroPe
# All rights reserved.
# =============================================================================
# DOCS
# =============================================================================
"""Test Criterion #1 for evaluating the effectiveness MCDA method.
According to this criterion, the best alternative identified by the method
should remain unchanged when a non-optimal alternative is replaced by a
worse alternative, provided that the relative importance of each decision
criterion remains the same.
"""
# =============================================================================
# IMPORTS
# =============================================================================
import numpy as np
import numpy.lib.arraysetops as arrset
import pandas as pd
from .. import RanksComparator
from ...agg import RankResult
from ...core import SKCMethodABC
from ...utils import Bunch, unique_names
# =============================================================================
# CONSTANT
# =============================================================================
_LAST_DIFF_STRATEGIES = {
"median": np.median,
"mean": np.mean,
}
# =============================================================================
# CLASS
# =============================================================================
[docs]
class RankInvariantChecker(SKCMethodABC):
r"""Test Criterion #1 for evaluating the effectiveness MCDA method.
According to this criterion, the best alternative identified by the method
should remain unchanged when a non-optimal alternative is replaced by a
worse alternative, provided that the relative importance of each decision
criterion remains the same.
To illustrate, suppose that the MCDA method has ranked a set of
alternatives, and one of the alternatives, :math:`A_j`, is replaced by
another alternative, :math:`A_j'`, which is less desirable than Ak. The
MCDA method should still identify the same best alternative when the
alternatives are re-ranked using the same method. Furthermore, the relative
rankings of the remaining alternatives that were not changed should also
remain the same.
The current implementation worsens each non-optimal alternative ``repeat``
times, and stores each resulting output in a collection for comparison with
the original ranking. In essence, the test is run once for each suboptimal
alternative.
This class assumes that there is another suboptimal alternative :math:`A_j`
that is just the next worst alternative to :math:`A_k`, so that
:math:`A_k \succ A_j`. Then it generates a mutation :math:`A_k'` such that
:math:`A_k'` is worse than :math:`A_k` but still better than :math:`A_j`
(:math:`A_k \succ A_k' \succ A_j`). In the case that the worst alternative
is reached, its degradation is limited by default with respect to the
median of all limits of the previous alternatives mutations, in order not
to break he distribution of each criterion.
Parameters
----------
dmaker: Decision maker - must implement the ``evaluate()`` method
The MCDA method, or pipeline to evaluate.
repeat: int, default = 1
How many times to mutate each suboptimal alternative.
The total number of rankings returned by this method is given by the
number of alternatives in the decision matrix minus one multiplied by
``repeat``.
allow_missing_alternatives: bool, default = False
``dmaker`` can somehow return rankings with fewer alternatives than the
original ones (using a pipeline that implements a filter, for example).
By setting this parameter to ``True``, the invariance test allows for
missing alternatives in a ranking to be added with a value of the
maximum value of the ranking obtained + 1.
On the other hand, if the value is ``False``, when a ranking is missing
an alternative, the test will fail with a ``ValueError``.
If more than one alternative is removed, all of them are added
with the same value
last_diff_strategy: str or callable (default: "median").
True if any mutation is allowed that does not possess all the
alternatives of the original decision matrix.
random_state: int, numpy.random.default_rng or None (default: None)
Controls the random state to generate variations in the sub-optimal
alternatives.
"""
_skcriteria_dm_type = "rank_reversal"
_skcriteria_parameters = [
"dmaker",
"repeat",
"allow_missing_alternatives",
"last_diff_strategy",
"random_state",
]
def __init__(
self,
dmaker,
*,
repeat=1,
allow_missing_alternatives=False,
last_diff_strategy="median",
random_state=None,
):
if not (hasattr(dmaker, "evaluate") and callable(dmaker.evaluate)):
raise TypeError("'dmaker' must implement 'evaluate()' method")
self._dmaker = dmaker
# REPEAT AND ALLOW MISSING ALTERNATIVES
self._repeat = int(repeat)
self._allow_missing_alternatives = bool(allow_missing_alternatives)
# LAST DIFF
self._last_diff_strategy = (
_LAST_DIFF_STRATEGIES.get(last_diff_strategy)
if isinstance(last_diff_strategy, str)
else last_diff_strategy
)
if not callable(self._last_diff_strategy):
diff_alternatives = ", ".join(_LAST_DIFF_STRATEGIES)
msg = (
"'last_diff_strategy' must be a "
f"'{diff_alternatives}' or a callable"
)
raise TypeError(msg)
# RANDOM
self._random_state = np.random.default_rng(random_state)
def __repr__(self):
"""x.__repr__() <==> repr(x)."""
name = self.get_method_name()
dm = repr(self.dmaker)
repeats = self.repeat
ama = self._allow_missing_alternatives
lds = self.last_diff_strategy
return (
f"<{name} {dm} repeats={repeats}, "
f"allow_missing_alternatives={ama} last_diff_strategy={lds!r}>"
)
# PROPERTIES ==============================================================
@property
def dmaker(self):
"""The MCDA method, or pipeline to evaluate."""
return self._dmaker
@property
def repeat(self):
"""How many times to mutate each suboptimal alternative."""
return self._repeat
@property
def allow_missing_alternatives(self):
"""True if any mutation is allowed that does not possess all the \
alternatives of the original decision matrix."""
return self._allow_missing_alternatives
@property
def last_diff_strategy(self):
"""Since the least preferred alternative has no lower bound (since \
there is nothing immediately below it), this function calculates a \
limit ceiling based on the bounds of all the other suboptimal \
alternatives."""
return self._last_diff_strategy
@property
def random_state(self):
"""Controls the random state to generate variations in the \
sub-optimal alternatives."""
return self._random_state
# LOGIC ===================================================================
def _maximum_abs_noises(self, *, dm, rank):
"""Calculate the absolute difference between the alternatives in order.
This difference is used as a maximum possible noise to worsen an
alternative.
The last alternative in the ranking has no next alternative to compare,
so the value calculated by ``last_diff_strategy`` is applied as the
abs difference (the default is the ``np.nanmedian``).
Parameters
----------
dm : ``skcriteria.core.data.DecisionMatrix``
The decision matrix from which the maximum possible absolute noises
of each alternative are to be extracted.
rank : ``skcriteria.agg.Rank``
Ranking of alternatives.
Returns
-------
Maximum absolute noise: pandas.DataFrame
Each row contains the maximum possible absolute noise to worsen
the current alternative (``mutate``) with respect to the next
(``mute_next``).
"""
# TODO: room for improvement: pandas to numpy
# Here we generate a pandas Series of alternatives in order of ranking
alts = rank.to_series().sort_values().index.to_numpy(copy=True)
# We only need all but the best one.
not_best = alts[1:]
# we only need the rows without the best alternative
not_best_df = dm.matrix.loc[not_best]
# we need to create the index
not_best_and_next = dict(zip(not_best, np.roll(not_best, -1)))
not_best_and_next[not_best[-1]] = np.nan
not_best_and_next = list(not_best_and_next.items())
index = pd.MultiIndex.from_tuples(
not_best_and_next, names=["mutate", "mutate_next"]
)
# cleanup
del alts, not_best, not_best_and_next
# we create the differences
maximum_abs_noises = not_best_df.diff(-1).abs()
maximum_abs_noises.set_index(index, inplace=True)
# we apply the median as last
maximum_abs_noises.iloc[-1] = maximum_abs_noises.iloc[:-1].apply(
self.last_diff_strategy
)
return maximum_abs_noises
def _mutate_dm(self, *, dm, mutate, alternative_max_abs_noise, random):
"""Create a new decision matrix by replacing a suboptimal alternative \
with a slightly worse one.
The algorithm operates as follows:
- A random uniform noise [0, b] is generated, where b is the
absolute difference in value of the criterion to be modified
with respect to the immediately worse alternative.
- Negative sign is assigned to criteria to be maximized.
- The noise is applied to the alternative to be worsened
('mutated').
This algorithm is designed in such a way that the 'worsened'
alternative is not worse than the immediately worse one in the original
ranking
Parameters
----------
dm : ``skcriteria.core.data.DecisionMatrix``
The original decision matrix.
mutate : str
The alternative to mutate.
alternative_max_abs_noise: pandas.Series
The maximum possible noise with which the alternative to mutate can
be made worse, without being worse than the immediately worse
alternative.
random: `numpy.random.default_rng`
Random number generator.
Returns
-------
mutated_dm: ``skcriteria.DecisionMatrix``
Decision matrix with the 'mutate' alternative "worsened".
noise: ``pandas.Series``
Noise used to worsen the alternative.
"""
# TODO: room for improvement: pandas to numpy
# matrix with alternatives
df = dm.matrix
noise = 0 # all noises == 0
while np.all(noise == 0): # at least we need one noise > 0
# calculate the noises without sign
noise = alternative_max_abs_noise.apply(
lambda b: random.uniform(0, b)
)
# negate when the objective is to maximize
# onwards the noise is no longer absolute
noise[dm.maxwhere] *= -1
# apply the noise
df.loc[mutate] += noise
# transform the noised matrix into a dm
mutated_dm = dm.copy(matrix=df.to_numpy(copy=True), dtypes=None)
return mutated_dm, noise
def _generate_mutations(self, *, dm, orank, repeat, random):
"""Generate all experiments data.
This method yields all the data needed to run the underlying
decision-maker with all possible worst suboptimal alternatives.
Parameters
----------
dm : ``skcriteria.core.data.DecisionMatrix``
The decision matrix to mutate in every experiment.
orank : ``skcriteria.agg.Rank``
The original ranking without mutations.
repeat : int
How many times an suboptimal alternative must be mutated.
random: `numpy.random.default_rng`
Random number generator.
Yields
------
iteration: int
Each suboptimal alternative is worsened `repeat` times. This value
keeps a count of which of those times the ``mutated`` alternative
is worsened.
mutates: str
The name of the worsened alternative.
mutated_dm: ``skcriteria.core.data.DecisionMatrix``
The decision matrix with the suboptimal alternative already
worsen.
noise: ``pandas.Series``
Noise used to worsen the 'mutated' alternative.
"""
# check the maximum absolute difference between any alternative and
# the next one in the ranking to establish a worse-limit
maximum_abs_noises = self._maximum_abs_noises(dm=dm, rank=orank)
# we repeat the experiment _repeats time
for iteration in range(repeat):
# we iterate over every sub optimal alternative
for (mutated, _), alt_max_anoise in maximum_abs_noises.iterrows():
# create the new matrix with a worse alternative than mutate
mutated_dm, noise = self._mutate_dm(
dm=dm,
mutate=mutated,
alternative_max_abs_noise=alt_max_anoise,
random=random,
)
yield iteration, mutated, mutated_dm, noise
def _add_mutation_info_to_rank(
self,
*,
rank,
iteration,
mutated,
noise,
full_alternatives,
allow_missing_alternatives,
):
"""Adds information on how an alternative was "worsened" in the \
decision matrix with respect to the original.
All aggregated information is included within the ``rrt1`` (Rank
Reversal Test 1) key in the ``extra_`` attribute.
Parameters
----------
rank: ``skcriteria.agg.Rank``
Ranking to which you want to add information about the executed
test.
iteration: int
Each suboptimal alternative is worsened `repeat` times. This value
keeps a count of which of those times the ``mutated`` alternative
is worsened.
mutated: str
The name of the worsened alternative.
noise: ``pandas.Series``
Noise used to worsen the 'mutated' alternative.
full_alternatives: array-like
The full list of alternatives in the original decision matrix.
allow_missing_alternatives: bool, default ``True``
If the value is ``False``, when a ranking is missing
an alternative, the test will fail with a ``ValueError``,
and if True the missing alternatives are added at the end of the
ranking all with a value $R_max$ + 1, where $R_max$ is the maximum
ranking obtained by the alternatives that were not eliminated.
Returns
-------
patched_rank : ``skcriteria.agg.Rank``
Ranking with all the information about the worsened alternative and
the rank reversal test added to the `extra_.rrt1` attribute.
"""
# extract the original data
method = str(rank.method)
alternatives = rank.alternatives.copy()
values = rank.values.copy()
extra = dict(rank.extra_.items())
# we check if the decision_maker did not eliminate any alternatives
alts_diff = arrset.setxor1d(alternatives, full_alternatives)
has_missing_alternatives = len(alts_diff) > 0
# check for the missing alternatives
if has_missing_alternatives:
# if a missing alternative are not allowed must raise an error
if not allow_missing_alternatives:
raise ValueError(
f"Missing alternative/s {set(alts_diff)!r} in mutation "
f"{mutated!r} of iteration {iteration}"
)
# add missing alternatives with the worst ranking + 1
fill_values = np.full_like(alts_diff, rank.rank_.max() + 1)
# concatenate the missing alternatives and the new rankings
alternatives = np.concatenate((alternatives, alts_diff))
values = np.concatenate((values, fill_values))
# change the method name if this is part of a mutation
if (mutated, iteration) != (None, None):
method = f"{method}+RRT1+{mutated}_{iteration}"
noise = noise.copy()
# patch the new data
extra["rrt1"] = Bunch(
"rrt1",
{
"iteration": iteration,
"mutated": mutated,
"noise": noise,
"missing_alternatives": alts_diff,
},
)
# return the new rank result
patched_rank = RankResult(
method=method,
alternatives=alternatives,
values=values,
extra=extra,
)
return patched_rank
[docs]
def evaluate(self, dm):
"""Executes a the invariance test.
Parameters
----------
dm : DecisionMatrix
The decision matrix to be evaluated.
Returns
-------
RanksComparator
An object containing multiple rankings of the alternatives, with
information on any changes made to the original decision matrix in
the `extra_` attribute. Specifically, the `extra_` attribute
contains a an object in the key `rrt1` that provides
information on any changes made to the original decision matrix,
including the the noise applied to worsen any sub-optimal
alternative.
"""
# FIRST THE DATA THAT WILL BE USED IN ALL THE ITERATIONS ==============
# the test configuration
dmaker = self.dmaker
allow_missing_alternatives = self.allow_missing_alternatives
repeat = self.repeat
random = self.random_state
# all alternatives to be used to check consistency
full_alternatives = dm.alternatives
# we need a first reference ranking
orank = dmaker.evaluate(dm)
patched_orank = self._add_mutation_info_to_rank(
rank=orank,
mutated=None,
noise=None,
iteration=None,
full_alternatives=full_alternatives,
allow_missing_alternatives=allow_missing_alternatives,
)
# Here we create a containers for the rank comparator starting with
# the original rank
names, results = ["Original"], [patched_orank]
# START EXPERIMENTS ===================================================
mutants_generator = self._generate_mutations(
dm=dm,
orank=patched_orank,
repeat=repeat,
random=random,
)
for it, mutated, mdm, noise in mutants_generator:
# calculate the new rank
mrank = dmaker.evaluate(mdm)
# add info about the mutation to rhe rank
patched_mrank = self._add_mutation_info_to_rank(
rank=mrank,
mutated=mutated,
noise=noise,
iteration=it,
full_alternatives=full_alternatives,
allow_missing_alternatives=allow_missing_alternatives,
)
# store the information
names.append(f"M.{mutated}")
results.append(patched_mrank)
# manually creates a new RankComparator
named_ranks = unique_names(names=names, elements=results)
return RanksComparator(named_ranks)