Source code for faultmap.infodynamics

"""Methods used in the calculation of transfer entropy. A JIDT wrapper."""

import logging
from pathlib import Path

import jpype  # type: ignore
import numpy as np

from faultmap.type_definitions import EntropyMethods, MutualInformationMethods

logger = logging.getLogger(__name__)


[docs] def check_jvm(infodynamics_path: Path): """ Check if the Java Virtual Machine (JVM) is started and start it if it is not. Args: infodynamics_path (str): The file path to the infodynamics jar file. Returns: None """ if not jpype.isJVMStarted(): jpype.startJVM( jpype.getDefaultJVMPath(), "-Xms32M", "-Xmx512M", "-ea", "-Djava.class.path=" + str(infodynamics_path.absolute()), convertStrings=True, )
[docs] def setup_te(infodynamics_path: Path, method: MutualInformationMethods, **parameters): """Prepares the teCalc class of the Java Infodynamics Toolkit (JIDT) in order to calculate transfer entropy according to the kernel or Kraskov estimator method. Also supports discrete transfer entropy calculation. """ check_jvm(infodynamics_path) if method == "kernel": calculator_class = jpype.JPackage( "infodynamics.measures.continuous.kernel" ).TransferEntropyCalculatorKernel te_calculator = calculator_class() # Normalisation is performed before this step, set property to false to # prevent accidental data standardisation te_calculator.setProperty("NORMALISE", "false") # Parameter definitions - refer to JIDT Javadocs # k - destination embedded history length (Schreiber k=1) # kernelWidth - if NORMALISE_PROP_NAME property has been set, # then this kernel width corresponds to the number of # standard deviations from the mean (otherwise it is an absolute value) k = parameters.get("k", 1) kernel_width = parameters.get("kernel_width", 0.25) te_calculator.initialise(k, kernel_width) elif method == "kraskov": # The Kraskov method is the recommended method and also provides methods for # auto-embedding. The max corr AIS auto-embedding method will be enabled as the # default. calculator_class = jpype.JPackage( "infodynamics.measures.continuous.kraskov" ).TransferEntropyCalculatorKraskov te_calculator = calculator_class() # Parameter definitions - refer to JIDT javadocs # k - embedding length of destination past history to consider # k_tau - embedding delay for the destination variable # l - embedding length of source past history to consider # l_tau - embedding delay for the source variable # delay - time lag between last element of source and destination # next value # Normalisation is performed before this step, set property to false to # prevent accidental data standardisation te_calculator.setProperty("NORMALISE", "false") # Allow for manual override if "k_history" in parameters: k_history = parameters["k_history"] te_calculator.setProperty("k_HISTORY", str(k_history)) if "k_tau" in parameters: k_tau = parameters["k_tau"] te_calculator.setProperty("k_TAU", str(k_tau)) if "l_history" in parameters: l_history = parameters["l_history"] te_calculator.setProperty("l_HISTORY", str(l_history)) if "l_tau" in parameters: l_tau = parameters["l_tau"] te_calculator.setProperty("l_TAU", str(l_tau)) if "auto_embed" in parameters: auto_embed = parameters["auto_embed"] if auto_embed is True: # Enable the Ragwitz or max AIS method criterion # TODO: Enable flag to determine which one # Enable source as well as destination embedding due to the nature of # our data. # Use a maximum history and tau search of 5 # teCalc.setProperty("AUTO_EMBED_METHOD", "RAGWITZ") te_calculator.setProperty("AUTO_EMBED_METHOD", "MAX_CORR_AIS") k_search_max = parameters.get("k_search_max", 5) te_calculator.setProperty("AUTO_EMBED_K_SEARCH_MAX", str(k_search_max)) tau_search_max = parameters.get("tau_search_max", 5) te_calculator.setProperty( "AUTO_EMBED_TAU_SEARCH_MAX", str(tau_search_max) ) # Note: If setting the delay is needed to be changed on each iteration, # it may be best to do this outside the loop and initialise teCalc # after each change. if "delay" in parameters: delay = parameters["delay"] te_calculator.setProperty("DELAY", str(delay)) if "use_gpu" in parameters: if parameters["use_gpu"]: te_calculator.setProperty("USE_GPU", "true") else: te_calculator.setProperty("USE_GPU", "false") te_calculator.initialise() elif method == "discrete": calculator_class = jpype.JPackage( "infodynamics.measures.discrete" ).TransferEntropyCalculatorDiscrete # Parameter definitions - refer to JIDT javadocs # base - number of quantisation levels for each variable # binary variables are in base-2 # destHistoryEmbedLength - embedded history length of the destination to # condition on - this is k in Schreiber's notation # sourceHistoryEmbeddingLength - embedded history length of the source # to include - this is l in Schreiber's notation # TODO: Allow these settings to be defined by configuration file # Base default of 2 (binary) is used" base = parameters.get("base", 2) dest_history_embed_length = parameters.get("destHistoryEmbedLength", 1) # source_history_embed_length = None # not used at the moment te_calculator = calculator_class(base, dest_history_embed_length) te_calculator.initialise() else: raise ValueError(f"Transfer entropy method name not recognized: {method}") return te_calculator
[docs] def calc_te(infodynamics_path, calc_method, affected_data, causal_data, **parameters): """Calculates the transfer entropy for a specific time lag (equal to prediction horizon) between two sets of time series data. This implementation makes use of the infodynamics toolkit: https://jlizier.github.io/jidt/ The transfer entropy should have a maximum value when time lag = delay used to generate an autoregressive dataset, or will otherwise indicate the dead time between data indicating a causal relationship. """ te_calc = setup_te(infodynamics_path, calc_method, **parameters) mi_calc = setup_mi_calculator(infodynamics_path, calc_method, **parameters) test_significance = parameters.get("test_significance", False) significance_permutations = parameters.get("significance_permutations", 30) if len(causal_data) != len(affected_data): raise ValueError( f"The source and destination arrays are of different lengths " f"(source={len(causal_data)}, destination={len(affected_data)})" ) if calc_method == "discrete": source = map(int, causal_data) destination = map(int, affected_data) te_calc.addObservations(source, destination) mi_calc.addObservations(source, destination) else: te_calc.setObservations(causal_data, affected_data) mi_calc.setObservations(causal_data, affected_data) transfer_entropy = te_calc.computeAverageLocalOfObservations() mutual_information = mi_calc.computeAverageLocalOfObservations() # Convert nats to bits if necessary if calc_method == "kraskov": transfer_entropy = transfer_entropy / np.log(2.0) mutual_information = mutual_information / np.log(2.0) elif calc_method in ("kernel", "discrete"): pass else: raise ValueError(f"Infodynamics method name not recognized: {calc_method}") if test_significance: te_significance = te_calc.computeSignificance(significance_permutations) mi_significance = mi_calc.computeSignificance(significance_permutations) else: te_significance = None mi_significance = None # Get all important properties from used te_calc if calc_method != "discrete": k_history = te_calc.getProperty("k_HISTORY") k_tau = te_calc.getProperty("k_TAU") l_history = te_calc.getProperty("l_HISTORY") l_tau = te_calc.getProperty("l_TAU") delay = te_calc.getProperty("DELAY") properties = [ k_history, k_tau, l_history, l_tau, delay, ] else: properties = [None] return ( transfer_entropy, [[te_significance, mi_significance], properties, mutual_information], )
[docs] def setup_mi_calculator( infodynamics_path: Path, method: MutualInformationMethods, **parameters ): """Instantiates a mutual information class of the Java Infodynamics Toolkit (JIDT) to calculate mutual information according to the kernel or Kraskov estimator method. Also supports discrete mutual information calculation. The Kraskov method is the recommended method and also provides methods for auto-embedding. The max corr AIS auto-embedding method will be enabled as the default. """ check_jvm(infodynamics_path) if method == "kernel": mi_calc_class = jpype.JPackage( "infodynamics.measures.continuous.kernel" ).MutualInfoCalculatorMultiVariateKernel mi_calc = mi_calc_class() # Normalisation is performed before this step, set property false to # prevent accidental data standardisation mi_calc.setProperty("NORMALISE", "false") # Parameter definitions - refer to JIDT Javadocs # k - destination embedded history length (Schreiber k=1) # kernelWidth - if NORMALISE_PROP_NAME property has been set, # then this kernel width corresponds to the number of # standard deviations from the mean (otherwise it is an absolute value) # k = parameters.get("k", 1) kernel_width = parameters.get("kernel_width", 0.25) mi_calc.setProperty("KERNEL_WIDTH", str(kernel_width)) mi_calc.initialise() elif method == "kraskov": mi_calc_class = jpype.JPackage( "infodynamics.measures.continuous.kraskov" ).MutualInfoCalculatorMultiVariateKraskov1 mi_calc = mi_calc_class() # Parameter definitions - refer to JIDT javadocs # k - embedding length of destination past history to consider # delay - time lag between last element of source and destination # next value # Normalisation is performed before this step, set property false to # prevent accidental data standardisation mi_calc.setProperty("NORMALISE", "false") # Note: If setting the delay is needed to be changed on each iteration, # it may be best to do this outside the loop and initialise teCalc # after each change. if "delay" in parameters: delay = parameters["delay"] mi_calc.setProperty("TIME_DIFF", str(delay)) mi_calc.initialise() elif method == "discrete": mi_calc_class = jpype.JPackage( "infodynamics.measures.discrete" ).MutualInformationCalculatorDiscrete # Parameter definitions - refer to JIDT javadocs # base - number of quantisation levels for each variable # binary variables are in base 2 # TODO: Allow these settings to be defined by configuration file base = parameters.get("base", 2) # print "base default of 2 (binary) is used" mi_calc = mi_calc_class(base, base, 0) mi_calc.initialise() else: raise ValueError(f"Mutual information method name not recognized: {method}") return mi_calc
[docs] def setup_entropy_calculator( infodynamics_path: Path, estimator: EntropyMethods = "kernel", kernel_bandwidth: float = 0.1, multivariate: bool = False, ): """Instantiates an entropy calculator from a class of the Java Infodynamics Toolkit (JIDT) to calculate differential entropy (continuous signals) according to the estimation method specified. Parameters ---------- infodynamics_path : path Location of infodynamics.jar estimator : string, default='kernel' Either 'kernel' or 'gaussian'. Specifies the estimator to use in determining the required probability density functions. kernel_bandwidth : float The width of the kernels for the kernel method. If normalisation is performed, these are in terms of standard deviation, otherwise absolute. multivariate : bool, default=False Indicates whether the entropy is to be calculated on a univariate or multivariate signal. Returns ------- entropy_calc : EntropyCalculator JIDT object Args: estimator: """ check_jvm(infodynamics_path) if estimator == "kernel": if multivariate: entropy_calc_class = jpype.JPackage( "infodynamics.measures.continuous.kernel" ).EntropyCalculatorMultiVariateKernel else: entropy_calc_class = jpype.JPackage( "infodynamics.measures.continuous.kernel" ).EntropyCalculatorKernel entropy_calculator = entropy_calc_class() # Normalisation is performed before this step, set property false to # prevent accidental data standardisation entropy_calculator.setProperty("NORMALISE", "false") entropy_calculator.initialise(kernel_bandwidth) elif estimator == "gaussian": if multivariate: entropy_calc_class = jpype.JPackage( "infodynamics.measures.continuous.gaussian" ).EntropyCalculatorMultiVariateGaussian else: entropy_calc_class = jpype.JPackage( "infodynamics.measures.continuous.gaussian" ).EntropyCalculatorGaussian entropy_calculator = entropy_calc_class() entropy_calculator.initialise() elif estimator == "kozachenko": entropy_calc_class = jpype.JPackage( "infodynamics.measures.continuous.kozachenko" ).EntropyCalculatorMultiVariateKozachenko entropy_calculator = entropy_calc_class() entropy_calculator.initialise() else: raise ValueError(f"Estimator not recognized: {estimator}") return entropy_calculator
[docs] def calc_entropy(entropy_calculator, data, estimator: EntropyMethods) -> float: """Estimates the entropy of a single signal. Parameters ---------- entropy_calculator : EntropyCalculator JIDT object The estimation method is determined during initialisation of this object beforehand. data : one-dimensional numpy.ndarray The uni-variate signal. Returns ------- entropy : float The entropy of the signal. Notes ----- The entropy calculated with the Gaussian estimator is in nats, while that calculated by the kernel estimator is in bits. Nats can be converted to bits by division with ln(2). """ data_array = data.tolist() data_array_java = jpype.JArray(jpype.JDouble, 1)(data_array) # type: ignore[operator] entropy_calculator.setObservations(data_array_java) entropy = entropy_calculator.computeAverageLocalOfObservations() if estimator == "gaussian": # Convert nats to bits entropy = entropy / np.log(2.0) elif estimator == "kernel": pass elif estimator == "kozachenko": entropy = entropy / np.log(2.0) else: raise ValueError(f"Estimator not recognized: {estimator}") return entropy