From 9d552de22635148c2e338cbd1285e795afe61675 Mon Sep 17 00:00:00 2001 From: Qianyang Chen <163785477+qianyangchen@users.noreply.github.com> Date: Tue, 17 Sep 2024 17:34:24 +1000 Subject: [PATCH] Upload modelling file Simulation run with L=50 lattice size --- IsingModel.ipynb | 1105 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1105 insertions(+) create mode 100644 IsingModel.ipynb diff --git a/IsingModel.ipynb b/IsingModel.ipynb new file mode 100644 index 0000000..f6fdfdc --- /dev/null +++ b/IsingModel.ipynb @@ -0,0 +1,1105 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f0af85a1-5faf-44bf-9742-208123e14c4e", + "metadata": {}, + "source": [ + "# Ising model as Perception-Action Loop\n", + "\n", + "Description: Interpreting Ising model as perception-action loop to compare different intrinsic utility measures. Measures include:\n", + "\n", + "- predictive information\n", + "- empowerment\n", + "- variational free energy (active inference), intrinsic component only\n", + "- thermodynamic efficiency" + ] + }, + { + "cell_type": "markdown", + "id": "f2a1d8f7-33ca-42e3-9edb-1663b2165f12", + "metadata": {}, + "source": [ + "## 1. Simulation\n", + "Simulate the process of magnetization based on 2D Ising model, assume no external magnetic field. Use Metroplolis Algorithm." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "603d4982-e3e5-4515-8be6-df9bc32c0fad", + "metadata": {}, + "outputs": [], + "source": [ + "# -*- coding: utf-8 -*-\n", + "from jpype import *\n", + "import sys\n", + "import numpy as np\n", + "from collections import defaultdict # to compute pdf\n", + "import matplotlib.pyplot as plt\n", + "import numba\n", + "from numba import njit\n", + "from scipy.ndimage import convolve, generate_binary_structure\n", + "import plotly.graph_objs as go\n", + "from plotly.offline import plot\n", + "from scipy.signal import savgol_filter\n", + "import pandas as pd\n", + "from joblib import Parallel, delayed\n", + "from scipy.interpolate import interp1d\n", + "import json # for loading config file and exporting intermediate results\n", + "\n", + "# Load configuration from a JSON file\n", + "with open('config.json') as config_file:\n", + " config = json.load(config_file)\n", + "\n", + "sys.path.append(config['infodynamics_path'])\n", + "jarLocation = config['jar_location']\n", + "\n", + "if not isJVMStarted():\n", + " # Start the JVM with the jarLocation path from config\n", + " startJVM(getDefaultJVMPath(), \"-ea\", \"-Djava.class.path=\" + jarLocation, convertStrings=True)\n", + "\n", + "# Constant setting\n", + "EPSILON = 1e-6 # to avoid log(p) getting too large due to random fluctuation\n", + "CRITICAL_VALUE = np.log(1+2**0.5)/2 # critical value for J\n", + "\n", + "# Plot setting\n", + "plt.rcParams['font.family'] = 'serif'\n", + "plt.rcParams['font.serif'] = 'cmr10' # Use the Computer Modern Roman font\n", + "plt.rcParams['mathtext.fontset'] = 'cm' # Use Computer Modern for math text\n", + "plt.rcParams['axes.formatter.use_mathtext'] = True\n", + "plt.rcParams['axes.labelsize'] = 16 # Axis labels\n", + "plt.rcParams['axes.titlesize'] = 20 # Title\n", + "plt.rcParams['xtick.labelsize'] = 14 # X tick labels\n", + "plt.rcParams['ytick.labelsize'] = 14 # Y tick labels\n", + "# plt.rcParams['legend.fontsize'] = 12 # Legend" + ] + }, + { + "cell_type": "markdown", + "id": "9a73f683-94e7-4eb1-9ef3-613da51708eb", + "metadata": {}, + "source": [ + "### Description of process\n", + "Consider a set $\\Lambda$ of lattice sites in 2D, each site $i$ has a spin $\\sigma_i\\in\\{-1,+1\\}$. For each adjacent site $i,j$ there is an interaction $J_{ij}$ between them. Assume interaction strength is the same everywhere in the lattice, and hence $J_{ij}=J$. Consider only energy in the links between the nearest 4 neighbors. Without external magnetic field, the energy of a specific configuration $\\underline{\\sigma}=\\{\\sigma_1, ..., \\sigma_N\\}$ is therefore:\n", + "\n", + "$E(\\underline{\\sigma}) = -\\sum_{}{J\\sigma_i\\sigma_j} = -\\sum_{}{J\\mu_{i,j}}$\n", + "\n", + "Where $$ denotes sum over all the adjacent neighbors $i,j$, and $\\mu_{i,j}$ represents the interaction between two neighboring sites, with +1 represents alignment and -1 represents misalignment. Note that this formualtion of energy favours alignment between sites (i.e. lower energy when sites are aligned).\n", + "\n", + "The Boltzmann distribution describes the probability of configuration $\\underline{\\sigma}$ as:\n", + "\n", + "$p_{\\beta}(\\underline{\\sigma}) = \\frac{e^{-\\beta E(\\underline{\\sigma})}}{Z_{\\beta}}$\n", + "\n", + "Where $\\beta = 1/(k_B T)$ is the inverse of temperature.\n", + "\n", + "#### The metropolis algorithm:\n", + "This algorithm evolves the system to equilibrium:\n", + "1. start with a square LxL lattice\n", + "2. randomly choose a site i, now we need to decide wether or not the flip the spin of this site. Let initial energy be E_i and the energy of the flipped state be E_f.\n", + "3. if E_i>E_f, flipping results in a lower energy state, hence flip spin(i)\n", + "4. if E_i(sigma_i, sigma_j)\n", + " # applies the nearest neighbours summation, ignore beta*J\n", + " kern = generate_binary_structure(2, 1) \n", + " kern[1][1] = False\n", + " arr = -lattice * convolve(lattice, kern, mode='constant', cval=0)\n", + " # total energy should be divided by 2 as each of the neighboring pair contributed once to the energy\n", + " return arr.sum()/2\n", + "\n", + "def get_magnetisation(lattice):\n", + " # average spin\n", + " return lattice.sum()/lattice.size\n", + "\n", + "@numba.njit(\"UniTuple(f8[:], 5)(i8[:,:], i8, f8, f8)\", nopython=True, nogil=True)\n", + "def metropolis(lattice, time, J, mu):\n", + " ''' Run Metropolis algorithem for a period of time, using coupling parameter J.\n", + " Perform flip on one random site. each time, assume beta=1 or T = 1/kB for Boltzmann distribution \n", + " Return S, A, SNext, magnetisations(i.e. W) of each timestep: for intrinsic utility\n", + " mus for each timestep: for sense checking \n", + " '''\n", + " beta = 1 # beta = 1/(kB*T), assume temperature is constant 1/kB\n", + " L = lattice.shape[0]\n", + "\n", + " # save energies and magnetisation for checking\n", + " mus = np.zeros(time)\n", + " magnetisations = np.zeros(time)\n", + " \n", + " # save values for perception-action loop\n", + " S = np.zeros(time) # sensory values at time t, sum of sigma_i*sigma_j for four neighbours\n", + " A = np.zeros(time) # actions at time t\n", + " SNext = np.zeros(time) # sensory values at time t+1, sum of sigma_i*sigma_j for four neighbours\n", + "\n", + " for t in range(time):\n", + " mus[t] = mu # net interactions mu = E/J = -sum{sigma_i*sigma_j} across the whole lattice\n", + " \n", + " # at each time step, randomly choose a site, compute energy before and after flipping\n", + " x,y = np.random.randint(L), np.random.randint(L)\n", + " spin_i = lattice[x][y] # initial spin\n", + " spin_f = -1*spin_i # spin after flipping\n", + " \n", + " # compute change in Energy/J, E/J=sum{-si*sj}\n", + " mu_i = -spin_i*(lattice[x-1][y] + lattice[(x+1)%L][y] + lattice[x][y-1] + lattice[x][(y+1)%L])\n", + " mu_f = -spin_f*(lattice[x-1][y] + lattice[(x+1)%L][y] + lattice[x][y-1] + lattice[x][(y+1)%L])\n", + " dmu = mu_f - mu_i # change in energy after flipping\n", + "\n", + " S[t] = mu_i # \"energy\" sensor (senses E/J), when particles align S is negative\n", + " if dmu<0 or np.random.rand()" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Sense check: simulation for one J value\n", + "\n", + "time = 20000\n", + "j = 2\n", + "bias = 0.5\n", + "L = 50\n", + "\n", + "plot_lattice_evolution(time, L, j, bias)" + ] + }, + { + "cell_type": "markdown", + "id": "7b1a54d8-1a5c-40b4-8841-c11cee9e7750", + "metadata": {}, + "source": [ + "### Run simulation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "577674fb", + "metadata": {}, + "outputs": [], + "source": [ + "def run_single_simulation(L, time, j, sampleSize, bias=0.5):\n", + " \"\"\" \n", + " Run a single simulation of the Metropolis algorithm.\n", + " \n", + " Parameters:\n", + " L (int): The size of the lattice.\n", + " time (int): The number of time steps to run the simulation.\n", + " j (float): The coupling strength.\n", + " bias (float, optional): The bias for initialising the lattice. Defaults to 0.5.\n", + " \n", + " Returns:\n", + " tuple: A tuple containing the arrays S, A, SNext, magnetisations, and the last lattice.\n", + " \"\"\"\n", + " lattice = initialise(L, bias=bias) # initialize lattice\n", + " senses, actions, sensesNext, magnetisations, _ = metropolis(lattice, time, j, get_mu(lattice)) # run one simulation\n", + " return senses[-sampleSize:], actions[-sampleSize:], sensesNext[-sampleSize:], magnetisations[-sampleSize:], lattice\n", + "\n", + "def run_multi_simulation(L, time, j, sampleSize, numSims, bias=0.5, n_jobs=-1):\n", + " results = Parallel(n_jobs=n_jobs)(delayed(run_single_simulation)(L, time, j, sampleSize, bias=bias) for _ in range(numSims))\n", + " return results" + ] + }, + { + "cell_type": "markdown", + "id": "c53a4895-7c6a-4380-8fd6-5b106e65edb7", + "metadata": {}, + "source": [ + "## 2. Compute intrinsic utilities" + ] + }, + { + "cell_type": "markdown", + "id": "d9d73da0", + "metadata": {}, + "source": [ + "### 2.1 Predictive information\n", + "Compute predictive information and plot values w.r.t. J\n", + "\n", + "Each simulation:\n", + "- the transient period will be disregarded.\n", + "- the last k timesteps are sampled (assuming at equilibrium). Returning k pairs of (St, St+1).\n", + "- I(St,St+1) are computed based on the k samples from this simulation.\n", + "\n", + "The estimate of $\\mathcal{I}(J) = \\sum_{sims}(\\mathcal{I}(J))/num\\_sims$" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "258fbb16", + "metadata": {}, + "outputs": [], + "source": [ + "def mutual_information(source, destination, sourceBase, destinationBase):\n", + " # 1. Construct the calculator:\n", + " calcClass = JPackage(\"infodynamics.measures.discrete\").MutualInformationCalculatorDiscrete\n", + " calc = calcClass(sourceBase, destinationBase, 0) # use timeDiff = 0\n", + " # 2. No other properties to set for discrete calculators.\n", + " # 3. Initialise the calculator for (re-)use:\n", + " calc.initialise()\n", + "\n", + " # 4. Supply the sample data:\n", + " # check input dimension:\n", + " if source.ndim >1:\n", + " for k in range(source.shape[1]):\n", + " src = JArray(JInt, 1)(source[:,k].tolist())\n", + " dest = JArray(JInt, 1)(destination[:,k].tolist())\n", + " calc.addObservations(src, dest)\n", + " else:\n", + " src = JArray(JInt, 1)(source.tolist())\n", + " dest = JArray(JInt, 1)(destination.tolist())\n", + " calc.addObservations(src, dest) \n", + " \n", + " # 5. Compute the estimate:\n", + " result = calc.computeAverageLocalOfObservations()\n", + " return result\n", + "\n", + "def entropy(source, sourceBase):\n", + " # 1. Construct the calculator:\n", + " calcClass = JPackage(\"infodynamics.measures.discrete\").EntropyCalculatorDiscrete\n", + " calc = calcClass(sourceBase)\n", + " # 2. No other properties to set for discrete calculators.\n", + " # 3. Initialise the calculator for (re-)use:\n", + " calc.initialise()\n", + "\n", + " # 4. Supply the sample data:\n", + " # check input dimension:\n", + " if source.ndim >1:\n", + " for k in range(source.shape[1]):\n", + " # 0. Load/prepare the data:\n", + " src = JArray(JInt, 1)(source[:,k].tolist())\n", + " calc.addObservations(src)\n", + " else:\n", + " src = JArray(JInt, 1)(source.tolist())\n", + " calc.addObservations(src) \n", + " \n", + " # 5. Compute the estimate:\n", + " result = calc.computeAverageLocalOfObservations() #unit = bits\n", + " return result\n", + "\n", + "def conditional_entropy(source, condition, sourceBase, conditionBase):\n", + " entropySource = entropy(source, sourceBase) #H(S)\n", + " miSourceCondition = mutual_information(source, condition, sourceBase, conditionBase) #I(S;C)\n", + " condEntropy = entropySource - miSourceCondition # H(S|C) = H(S) - I(S;C)\n", + " return condEntropy\n", + "\n", + "def get_pred_info(sensory, nextSensory):\n", + " offset = 4 # need to offset {-4,-2,0,2,4} to {0,1,2,3,4} for the base\n", + " base = 2 * offset + 1\n", + " predInfo = mutual_information(nextSensory + offset, sensory + offset, base, base)\n", + " return predInfo\n", + "\n", + "def get_sensory_entropy(sensory):\n", + " offset = 4 # need to offset {-4,-2,0,2,4} to {0,1,2,3,4} for the base\n", + " base = 2 * offset + 1\n", + " sensoryEntropy = entropy(sensory + offset, base)\n", + " return sensoryEntropy\n" + ] + }, + { + "cell_type": "markdown", + "id": "2dd4ff0f", + "metadata": {}, + "source": [ + "Check if PI makes sense:\n", + "where J is very low (assume beta=1), all the sites starts to jiggle. As J->0, p(flip)->1, and we shall observe $(S_t,S_{t+1}) = (s,-s)$ almost all of the time. $I(S_t,S_{t+1})$ is therefore maximized, and approximately $H(S_t)$ = log2(#values of s) = log2(5) as the majority of sites are internal (i.e. with 4 neigbours), so $\\sum_{}{\\sigma_i\\sigma_j}\\in\\{-4,-2,0,2,4\\}$. The conditional entropy is zero in this case." + ] + }, + { + "cell_type": "markdown", + "id": "2ec05bb8", + "metadata": {}, + "source": [ + "### 2.2 Empowerment\n", + "Compute average empowerment (channel capacity) and plot values w.r.t J\n", + "\n", + "Each simulation:\n", + "- the transient period will be disregarded.\n", + "- the last k timesteps are sampled (assuming at equilibrium). Returning k pairs of (At, St+1).\n", + "- I(At,St+1) are computed based on the k samples from this simulation.\n", + "\n", + "The estimate of $\\mathcal{E}(J) = \\sum_{sims}(\\mathcal{E}(J))/num\\_sims$" + ] + }, + { + "cell_type": "markdown", + "id": "66374fda", + "metadata": {}, + "source": [ + "\n", + " #### Channel capacity\n", + " Considering an arbitrary site i in the lattice. $\\sigma_i$ denotes its spin and $\\mu_i=\\sum_{}{-\\sigma_i\\sigma_j}=\\frac{E}{J}$ measures its energy (scaled by a factor J). Lower energy (i.e. negative $\\mu$) or equivalently, alignment with neighbors, is preferred.\n", + "\n", + "For this single site, let sensory state $S_t = \\mu_i(t)$ and $A_t\\in\\{flip, no-flip\\}$. The action channel of the site is characterised by the conditional probability p(s'|a,s), where s' is the value of the sensory state following an action, given that the site starts from S=s. If $S_t=s$, after flip/no-flip action, the only possible values for $S_{t+1}$ are {s, -s} (except that when s=0 then only one value is possible). Therefore we essentially have a channel looks like {flip, no-flip}$\\rightarrow${-s, s}, and the mapping is deterministic. This is the embodiment of the site (agent).\n", + "\n", + "With a binary channel like this, the channel capacity (maximum mutual information max$I(S_{t+1}, A_t)$) C(s)=1 bit, and it's achieved when the distribution between {flip, no-flip} is uniform (50/50 chance). This is true for all values of $s\\in\\{-4,-3,-2,-1,1,2,3,4\\}$. Therefore all these states are equally empowered, if the site is free to designate its action distributions.\n", + "\n", + "The channel capacity of s=0, C(0)=0, because no matter how the site acts (flip or not), it can only perceive s=0. \n", + "\n", + "Therefore, a more empowered state is where the lattice has net magnetisation $\\neq0$, i.e. empoewrment is high when sites are aligned.\n", + "\n", + "Let the subset of S states where channel capacity is nonzero be $\\Gamma=\\{-4,-3,-2,-1,1,2,3,4\\}$. The average empowerment given control parameter J is: $\\mathcal{E}(S;J) = \\sum{p(s;J)\\mathcal{E}(s)}=\\sum_{s\\in\\Gamma}{p(s;J)\\times1}$\n", + "\n", + "#### Mutual information\n", + "On the other hand, we can also compute the actual mutual information (instead of its potential maximum) under the constraint of the \"controller\" that regulates the action distribution of the site. For a given channel s, the action distribution p(a) is:\n", + "\n", + "- When s < 0: p(flip) = $e^{2\\beta Js}$, p(no flip) = 1- p(flip)\n", + "- When s >= 0: p(flip) = 1, p(np flip) = 0\n", + "\n", + "Hence mutual information for a given channel s: $I(S_{t+1};A_t|S_t=s) = H(S_{t+1}|S_t=s) - H(S_{t+1}|A_t, S_t=s) = H(S_{t+1}|S_t=s) = -\\sum{p*logp}$ (since flip$\\rightarrow$-s and no-flip$\\rightarrow$s is deterministic, the conditional entropy is zero)\n", + "\n", + "- When s< 0: $I(S_{t+1};A_t|S_t=s) = p(flip)*logp(flip) + p(noflip)*logp(noflip))$ \n", + "- When s>=0: 0\n", + "\n", + "Let the subset of S states where mutual information is nonzero be $\\Theta = \\{-4,-3,-2,-1\\}$. The average mutual information given control parameter J is:\n", + "$I(S_{t+1};A_t|J) = \\sum{p(s;J)I(S_{t+1};A_t|J,s)} = -\\sum_{s\\in\\Theta}{p(s;J)\\{e^{2\\beta Js}*log2(e^{2\\beta Js}) + (1-e^{2\\beta Js})*log2(1-e^{2\\beta Js})\\}}$ bits\n", + "\n", + "Compared with empowerment, average mutual information gives smaller weight to full alignments $H(S_{t+1}|S=-4) << 1$ when J is large (as we have a very skewed distribution of $S_{t+1}$). The larger is J, the further away is $p(a)$ from optimal $p^*(a)$ (which is 1/2, 1/2)" + ] + }, + { + "cell_type": "markdown", + "id": "d23213ec", + "metadata": {}, + "source": [ + " is zero)\n", + "\n", + "- When s< 0: $I(S_{t+1};A_t|S_t=s) = p(flip)*logp(flip) + p(noflip)*logp(noflip))$ \n", + "- When s>=0: 0\n", + "\n", + "Let the subset of S states where mutual information is nonzero be $\\Theta = \\{-4,-3,-2,-1\\}$. The average mutual information given control parameter J is:\n", + "$I(S_{t+1};A_t|J) = \\sum{p(s;J)I(S_{t+1};A_t|J,s)} = -\\sum_{s\\in\\Theta}{p(s;J)\\{e^{2\\beta Js}*log2(e^{2\\beta Js}) + (1-e^{2\\beta Js})*log2(1-e^{2\\beta Js})\\}}$ bits\n", + "\n", + "Compared with empowerment, average mutual information gives smaller weight to full alignments $H(S_{t+1}|S=-4) << 1$ when J is large (as we have a very skewed distribution of $S_{t+1}$). The larger is J, the further away is $p(a)$ from optimal $p^*(a)$ (which is 1/2, 1/2)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "035862c3", + "metadata": {}, + "outputs": [], + "source": [ + "def get_empowerment(actions, nextSensory):\n", + " # all non-zero states are equally empowered (1 bit) as explained in \"Channel capacity\" \n", + " return sum(nextSensory!=0)/len(nextSensory)" + ] + }, + { + "cell_type": "markdown", + "id": "23c70a13", + "metadata": {}, + "source": [ + "### 2.3 Conditional entropy (active inference intrinsic only)\n", + "\n", + "Each simulation:\n", + "- the transient period will be disregarded.\n", + "- the last k timesteps are sampled (assuming at equilibrium). Returning k pairs of (At, Wt+1, St+1), where W is the global magnetisation\n", + "- H(St+1,Wt+1|At=at) are computed based on the k samples from this simulation.\n", + "\n", + "The estimate of $\\mathcal{F}(J) = \\sum_{sims}(\\mathcal{F}(J))/num\\_sims$" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "8af2b130", + "metadata": {}, + "outputs": [], + "source": [ + "def get_pdf(samples):\n", + " sigma, counts = np.unique(samples, return_counts=True)\n", + " p = {}\n", + " for sig, cnt in zip(sigma, counts):\n", + " p[sig] = cnt/samples.size\n", + " return p\n", + "\n", + "def get_free_energy(nextSensory, nextEnvironment, action, numBins=10):\n", + " \"\"\"\n", + " Calculate the free energy as conditional entropy H(St+1, Wt+1|At=at).\n", + " Parameters:\n", + " nextSensory (numpy.ndarray): The sensory input for the next state.\n", + " nextEnvironment (numpy.ndarray): The environment input for the next state.\n", + " action (numpy.ndarray): The action taken.\n", + " numBins (int, optional): The number of bins to use for digitizing the next environment. Defaults to 10.\n", + " Returns:\n", + " float: The calculated free energy.\n", + " \"\"\"\n", + "\n", + " offset = 4\n", + " base = 4 * 2 + 1\n", + " nextSensory_offset = nextSensory + offset\n", + " # work out the distribution of different actions\n", + " pA = get_pdf(action)\n", + " # put next environment into bins\n", + " nextEnvDigitized = np.digitize(nextEnvironment, np.linspace(-1,1,numBins), right=True) # put magnetisation [-1,1] into bins\n", + " # compute negative conditional entropy for given A=a, then take the weighted sum across all a\n", + " freeEnergy = 0\n", + " for a in pA.keys():\n", + " SNext = nextSensory_offset[action==a] # already offest to 0-8\n", + " RNext = nextEnvDigitized[action==a] \n", + " freeEnergy -= pA[a] * conditional_entropy(SNext, RNext, base, numBins)\n", + " return freeEnergy" + ] + }, + { + "cell_type": "markdown", + "id": "92101f0b", + "metadata": {}, + "source": [ + "### 2.4 Thermodynamic efficiency\n", + "\n", + "$\\eta$ is the ratio between rate of entropy reduction (increasing order in the collective system) and the amount of work done by changing the control parameter $\\theta$, it has a unit of bit/joule. It is defined as follow:\n", + "\n", + "$\\eta(\\theta) = -\\frac{d S/d \\theta}{d W/d \\theta} = - \\frac{d S/d \\theta}{\\int F(\\theta)d\\theta}$\n", + "\n", + "where S is entropy of the system, $\\theta$ is control parameter, W is work done by changing the control parameter. The denominator is equivalent to integrating fisher information w.r.t the control parameter.\n", + "\n", + "To compute $\\eta$, we need the following quantities:\n", + "\n", + "- configuration entropy of the system $S(\\underline{\\sigma}; \\theta)$, where $\\underline{\\sigma} = \\{\\sigma_1, ..., \\sigma_{N*N}\\}$\n", + "- derivative of entropy w.r.t. $\\theta$\n", + "- Fisher information of control parameter \\theta $F(\\theta) = 4*\\sum_{x}{(\\frac{\\sqrt{p(x;\\theta+\\delta \\theta)} - \\sqrt{p(x;\\theta-\\delta \\theta)}}{2\\delta \\theta})^2}$\n", + "- integral of fisher information w.r.t. $\\theta$\n", + "\n", + "Each simulation:\n", + "- the transient period will be disregarded.\n", + "- the last k timesteps are sampled (assuming at equilibrium). Returning the average of k magnetisation values $\\bar{m}$.\n", + "- Configuration entropy of this simulation $\\mathcal{S}_n$ and $p(x;\\theta)$ is computed based on the final configuration at the end of the simulation\n", + "\n", + "The estimate of $\\mathcal{S}(\\theta) = \\sum_{sims}(\\mathcal{S}(\\theta))/num\\_sims$. $\\eta$ is computed given configuration entropy $\\mathcal{S}(\\theta)$ and equilibrium distribution $p(x;\\theta)$ as functions of $\\theta$." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "31f8e3a8", + "metadata": {}, + "outputs": [], + "source": [ + "# Compute thermodynamic efficiency\n", + "def compute_probability_distribution(lattice, n, m):\n", + " \"\"\"\n", + " Compute the probability distribution of configurations in a given nxm area\n", + " in a wrapped around square lattice.\n", + " \n", + " Parameters:\n", + " lattice (np.ndarray): A 10x10 numpy array with values -1 or 1.\n", + " n (int): The number of rows in the area.\n", + " m (int): The number of columns in the area.\n", + " \n", + " Returns:\n", + " dict: A dictionary with configurations as keys and their probabilities as values.\n", + " \"\"\"\n", + " L = lattice.shape[0]\n", + " count_dict = defaultdict(int)\n", + " \n", + " for i in range(L):\n", + " for j in range(L):\n", + " # Extract the nxm sub-lattice starting at (i, j)\n", + " sub_lattice = tuple(tuple(lattice[(i + x) % L, (j + y) % L] for y in range(m)) for x in range(n))\n", + " count_dict[sub_lattice] += 1\n", + "\n", + " # Compute the probability distribution\n", + " total_counts = sum(count_dict.values())\n", + " prob_distribution = {k: v / total_counts for k, v in count_dict.items()}\n", + " \n", + " return prob_distribution\n", + "\n", + "def compute_entropy(prob_distribution):\n", + " \"\"\"\n", + " Compute the entropy of a given probability distribution.\n", + " \n", + " Parameters:\n", + " prob_distribution (dict): A dictionary with probabilities.\n", + " \n", + " Returns:\n", + " float: The entropy value.\n", + " \"\"\"\n", + " entropy = -sum(p * np.log2(p) for p in prob_distribution.values() if p > EPSILON)\n", + " return entropy\n", + "\n", + "def get_entropy_kikuchi(lattice):\n", + " \"\"\" \n", + " Compute configuration entropy using kikuchi approximation S = S1-2*S2+S4.\n", + "\n", + " Parameters:\n", + " lattice (np.array): A 2D integer array of the LxL squre lattice. Values +/-1.\n", + " \n", + " Returns:\n", + " float: The entropy value.\n", + " \"\"\"\n", + " # compute kikuchi approx \n", + " entp1 = compute_entropy(compute_probability_distribution(lattice, 1, 1))\n", + " entp2 = compute_entropy(compute_probability_distribution(lattice, 1, 2))\n", + " entp4 = compute_entropy(compute_probability_distribution(lattice, 2, 2))\n", + " return entp1 - 2 * entp2 + entp4\n", + "\n", + "def get_entropy_meanfield(pdf):\n", + " \"\"\" \n", + " Compute configuration entropy using meanfield approximation S = sum(-1,1) -p*log(p).\n", + "\n", + " Parameters:\n", + " pdf (dict): Proability distribution {spin:p(spin)}. This can be an average distribution computed over \n", + " a period of time and (or) mutltiple simulations.\n", + " \n", + " Returns:\n", + " float: The entropy value.\n", + " \"\"\"\n", + " # compute mean-field approx \n", + " return -sum(p * np.log2(p) for p in pdf.values() if p > EPSILON)\n", + " \n", + "def get_fisher(pdf, method = 'sqrt'):\n", + " \"\"\"\n", + " Compute fisher information using two different methods. Default is square-root approximation because it is more stable when p is small.\n", + "\n", + " Parameters:\n", + " pdf (dict): A dictionary {theta: f(x)}, where f(x) is also a dictionary ({x:p(x)}). Density function takes the form f(x;theta), \n", + " and theta is the parameter with respect to which we compute the fisher information. Assume theta is uniformly spaced. \n", + " For continuous distribution make sure samples are binned and normalised.\n", + " \n", + " Returns:\n", + " dict: Fisher information of different Js.\n", + " \"\"\"\n", + " thetas = np.array(list(pdf.keys()))\n", + " fisher = {}\n", + "\n", + " # Convert pdf dictionaries to arrays\n", + " x_values = np.array(list(pdf[thetas[0]].keys()))\n", + " p_values = np.array([[pdf[theta].get(x, 0) for x in x_values] for theta in thetas])\n", + "\n", + " if method == 'sqrt':\n", + " # Compute the square root of p_values\n", + " sqrt_p_values = np.sqrt(p_values)\n", + " \n", + " # Compute the gradient of sqrt_p_values with respect to theta\n", + " dsqrtp_dtheta = np.gradient(sqrt_p_values, thetas, axis=0)\n", + " \n", + " # Compute Fisher Information for each theta\n", + " for i, theta in enumerate(thetas):\n", + " fisher[theta] = 4* np.sum(dsqrtp_dtheta[i]**2)\n", + " else: \n", + " # using original sum_x {(dp_dtheta)^2 / p}\n", + " # Compute the gradient of p_values with respect to theta. Use dp_dtheta is more stable than dlogp_dtheta when p is small\n", + " dp_dtheta = np.gradient(p_values, thetas, axis=0)\n", + " \n", + " # Compute Fisher Information for each theta\n", + " for i, theta in enumerate(thetas):\n", + " # Avoid division by zero, sum over x\n", + " valid_mask = p_values[i] > 0\n", + " fisher[theta] = np.sum((dp_dtheta[i][valid_mask]**2) / p_values[i][valid_mask])\n", + " return fisher\n", + " \n", + "def get_efficiency(configEntropy, magnetisations, fisherMethod='sqrt', entropyFilt=False, derivativeFilt=False, fisherFilt=False, integralFilt=False, pdfFilter=False, window=15, theta_star=None, threshold=None):\n", + " \"\"\"\n", + " Compute thermodynamic efficiency. Interpolate from where fisherInfo < threshold to theta_star. Change return eta to np.array.\n", + " \n", + " Parameters:\n", + " configEntropy (dict): A dictionary with parameter and configuration entropy.\n", + " fisherInfo (dict): A dictionary with parameter and Fisher information.\n", + " entropyFilt, derivativeFilt, fisherFilt, integralFilt (bool): switches for filters (Savitzky-Golay filter).\n", + " window (int): window size for the filter.\n", + " \n", + " Returns:\n", + " np.array: thermodynamic efficiency eta;\n", + " np.array: configuration entropy values extracted from configEntropy, for checking;\n", + " np.array: numerators for each eta, for checking;\n", + " np.array: fisher information values extracted from fisherInfo, for checking;\n", + " np.array: denominators for each eta, for checking.\n", + " \"\"\"\n", + " theta = np.array(list(configEntropy.keys()))\n", + " hx = np.array(list(configEntropy.values()))\n", + "\n", + " # Check if smoothing magnetisation is on:\n", + " if pdfFilter:\n", + " mask = theta > CRITICAL_VALUE # mask for J > Jc\n", + " filtered_section = savgol_filter(magnetisations[mask], window_length=20, polyorder=1)\n", + " mm = np.copy(magnetisations)\n", + " mm[mask] = filtered_section\n", + " else:\n", + " mm = np.copy(magnetisations)\n", + "\n", + " # Retrieve average pdf for each J \n", + " pdfs = {} # pdf for each J\n", + " for theta_idx, theta_val in enumerate(theta):\n", + " Mbar = mm[theta_idx]\n", + " pdfs[theta_val] = {1:(1 + Mbar) / 2, -1:(1 - Mbar) / 2}\n", + "\n", + " # Compute fisher information given pdfs for different Js\n", + " fisherInfo = get_fisher(pdfs, method=fisherMethod)\n", + " fisher = np.array(list(fisherInfo.values()))\n", + " \n", + " # Values to keep\n", + " denominator = np.empty(len(hx)) \n", + "\n", + " # Check if sum to theta_star is on\n", + " if (theta_star != None) and (theta_star > theta.max()):\n", + " theta = np.append(theta, theta_star)\n", + " fisher = np.append(fisher, 0) # assume theta_star correspond to zero fisher information\n", + "\n", + " # interpolate between fi_small and fi_thetastar (to remove noise)\n", + " if (threshold != None) and (threshold > 0):\n", + " start_index = np.where(fisher > threshold)[0][-1] # find the last index where fisher > threshold\n", + " x = np.append(theta[start_index], theta[-1]) # define starting and end point for interpolation\n", + " y = np.append(fisher[start_index], fisher[-1]) # define starting and end point for interpolation\n", + " interp_func = interp1d(x, y, kind='linear') # Create the interpolation function\n", + " x_interp = theta[start_index:] # Define the range of x values to interpolate over\n", + " y_interp = interp_func(x_interp) # Perform the interpolation\n", + " fisher = np.append(fisher[:start_index], y_interp)\n", + "\n", + " # apply filters: entropy, fisher\n", + " if entropyFilt:\n", + " hx = savgol_filter(hx, window_length=window, polyorder=1)\n", + " if fisherFilt:\n", + " fisher = savgol_filter(fisher, window_length=window, polyorder=1)\n", + " \n", + " # compute numerator, denominator\n", + " numerator = np.gradient(hx,theta[:len(hx)])\n", + " for i in range(len(hx)):\n", + " denominator[i] = np.trapz(fisher[i:], theta[i:]) # integrate from theta0 to point to zero-response 10\n", + " \n", + " # apply filters: derivative, integral\n", + " if derivativeFilt:\n", + " numerator = savgol_filter(numerator, window_length=window, polyorder=1)\n", + " if integralFilt:\n", + " denominator = savgol_filter(denominator, window_length=window, polyorder=1)\n", + " \n", + " # compute eta\n", + " mask = (abs(denominator) < EPSILON) | (abs(numerator) < EPSILON)\n", + " eta = np.zeros_like(numerator)\n", + " eta[~mask] = -numerator[~mask] / denominator[~mask]\n", + " return eta, hx, numerator, fisher, denominator, mm" + ] + }, + { + "cell_type": "markdown", + "id": "33e0a372", + "metadata": {}, + "source": [ + "### 2.5 Compute all utilities" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "0c7541a2", + "metadata": {}, + "outputs": [], + "source": [ + "def unpack_results(results):\n", + " senses_array = np.array([result[0].astype(int) for result in results])\n", + " actions_array = np.array([result[1].astype(int) for result in results])\n", + " sensesNext_array = np.array([result[2].astype(int) for result in results])\n", + " magnetisations_array = np.array([result[3] for result in results])\n", + " lattice_array = np.array([result[4].astype(int) for result in results])\n", + " return senses_array, actions_array, sensesNext_array, magnetisations_array, lattice_array\n", + "\n", + "def compute_all_intrinsic_utilities(results, subSample, method='kikuchi', absolute_m=True):\n", + " \"\"\"\n", + " Compute all intrinsic utilities for a given j value.\n", + " \n", + " Parameters:\n", + " results (list): A list of simulation results, where each result is a tuple containing the arrays of sensory input, actions, next sensory input, magnetisations, and lattice at the end of each simulation. \n", + " method (str): The method to use for computing configuration entropy (thermodynamic efficiency). Defaults to 'kikuchi'.\n", + " absolute_m (bool): Whether to take the absolute value of magnetisation for Fisher Information (thermodynamic efficiency). Defaults to True.\n", + " \n", + " Returns:\n", + " tuple: A tuple containing the mean values of predictive information, empowerment, free energy, configuration entropy, and average magnetisation across all simulations.\n", + " \"\"\"\n", + " senses_array, actions_array, sensesNext_array, magnetisations_array, lattice_array = unpack_results(results)\n", + " numSims = len(senses_array) # each element in the array is a list of samples collected for one simulation. Length of the array is the number of simulations.\n", + " pi = np.zeros(numSims) # predictive information for each simulation\n", + " se = np.zeros(numSims) # sensory entropy for each simulation\n", + " ep = np.zeros(numSims) # empowerment for each simulation\n", + " fe = np.zeros(numSims) # free energy for each simulation\n", + " hx = np.zeros(numSims) # configuration entropy for each simulation\n", + " mm = np.zeros(numSims) # average magnetisation for each simulation\n", + " \n", + " for i in range(numSims):\n", + " pi[i] = get_pred_info(senses_array[i], sensesNext_array[i])\n", + " se[i] = get_sensory_entropy(sensesNext_array[i]) \n", + " ep[i] = get_empowerment(actions_array[i], sensesNext_array[i])\n", + " fe[i] = get_free_energy(sensesNext_array[i], magnetisations_array[i], actions_array[i])\n", + " # average magnetisation each simulation\n", + " # magnetisation is sampled every subSample time steps because it is highly correlated\n", + " if absolute_m:\n", + " mm[i] = np.mean(abs(magnetisations_array[i][:][::subSample]))\n", + " else:\n", + " mm[i] = np.mean(magnetisations_array[i][:][::subSample])\n", + " # configuration entropy each simulation\n", + " if method == 'kikuchi':\n", + " hx[i] = get_entropy_kikuchi(lattice_array[i])\n", + " else: \n", + " pdf = {1:(1 + mm[i]) / 2, -1:(1 - mm[i]) / 2}\n", + " hx[i] = get_entropy_meanfield(pdf)\n", + " return np.mean(pi), np.mean(se), np.mean(ep), np.mean(fe), np.mean(hx), np.mean(mm)" + ] + }, + { + "cell_type": "markdown", + "id": "5ebe4bc8-02d2-4ad5-acaf-1288b20138f6", + "metadata": {}, + "source": [ + "## 3. Analysis" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "d912c50f-2fb3-4877-926c-ec09cbed942e", + "metadata": {}, + "outputs": [], + "source": [ + "# Run simulations and analysis\n", + "# parameters for simulation\n", + "L = 50\n", + "time = 20200000 # perform 20.2mil flips\n", + "Js = np.linspace(0.01,2,100)\n", + "sampleSize = 200000 # take 200k observations to compute pi, emp, fe\n", + "subSample = L*L # sample every sweep for magnetisation\n", + "numSims = 20 # number of simulations\n", + "bias = 0.5\n", + "\n", + "# parameters for computing intrinsic utilities\n", + "absolute_m = True # use absolute value of magnetisation for fisher information\n", + "theta_star = 10 # theta_star for thermodynamic efficiency upper bound\n", + "threshold = 5e-2 # threshold for interpolation in fisher information integral\n", + "\n", + "# initialise arrays to store results\n", + "pred_info = np.zeros_like(Js) # predictive information\n", + "sens_entp = np.zeros_like(Js) # sensory entropy\n", + "empw_vals = np.zeros_like(Js) # empowerment\n", + "free_engy = np.zeros_like(Js) # free energy\n", + "cnfg_entp = np.zeros_like(Js) # configuration entropy\n", + "avrg_magt = np.zeros_like(Js) # average magnetisation\n", + "pdf_dict = {} # pdf for each J\n", + "cnfg_entp_dict = {} # average configuration entropy for each J\n", + "\n", + "# run simulations for different J values\n", + "for i, J in enumerate(Js):\n", + " results = run_multi_simulation(L, time, J, sampleSize, numSims, bias=bias)\n", + " pred_info[i], sens_entp[i], empw_vals[i], free_engy[i], cnfg_entp[i], avrg_magt[i] = compute_all_intrinsic_utilities(results, subSample, method='kikuchi', absolute_m=absolute_m)\n", + " cnfg_entp_dict[J] = cnfg_entp[i]\n", + "\n", + "# compute thermodynamic efficiency\n", + "result_eta,_,_,_,_, result_mag = get_efficiency(cnfg_entp_dict, avrg_magt, fisherMethod='sqrt', derivativeFilt=True, pdfFilter=True, window=15, theta_star=theta_star, threshold=threshold)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "856efef6", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Make plots\n", + "# predictive information\n", + "plt.figure(figsize=(8, 6)) # Define the figure size\n", + "plt.scatter(Js, pred_info, s=5, color='k', alpha=0.9);\n", + "plt.axvline(x=CRITICAL_VALUE, color='r', linestyle='--', label='Indicates $J_c$');\n", + "plt.title(r'Predictive information $\\mathcal{I}$ vs J');\n", + "plt.ylabel(r'$\\mathcal{I}$ [bits]');\n", + "plt.xlabel('J');\n", + "plt.savefig('pi_v_J.png', dpi=300) # dpi=300 for high resolution" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "cc07b137", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# decomposition of predictive information\n", + "plt.figure(figsize=(8, 6)) # Define the figure size\n", + "plt.scatter(Js, sens_entp, s=20,color='k', marker='.', alpha=0.9, label=r'Entropy $H(S\\prime)$');\n", + "plt.plot(Js, sens_entp - pred_info,c='k', linewidth=3, linestyle='--', alpha=0.9, label=r'Conditional Entropy $H(S\\prime|S)$');\n", + "plt.axvline(x=CRITICAL_VALUE, color='r', linestyle='--', label='Indicates $J_c$');\n", + "plt.title('Entropy and conditional entropy of S vs J');\n", + "plt.ylabel(r'Entropy or conditional entropy [bits]');\n", + "plt.xlabel('J');\n", + "plt.legend();\n", + "plt.savefig('hs_v_J.png', dpi=300) # dpi=300 for high resolution" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "0387678f", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# empowerment\n", + "plt.figure(figsize=(8, 6)) # Define the figure size\n", + "plt.scatter(Js, empw_vals,s=5, color='k', alpha=0.9);\n", + "plt.axvline(x=CRITICAL_VALUE, color='r', linestyle='--', label='Indicates $J_c$');\n", + "plt.title(r'Empowerment $\\mathfrak{E}$ vs J');\n", + "plt.ylabel(r'$\\mathfrak{E}$ [bits]');\n", + "plt.xlabel('J');\n", + "plt.savefig('emp_v_J.png', dpi=300) # dpi=300 for high resolution" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "c6ad35dd", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# free energy\n", + "plt.figure(figsize=(8, 6)) # Define the figure size\n", + "plt.scatter(Js, free_engy, s=5, color='k', alpha=0.9);\n", + "plt.axvline(x=CRITICAL_VALUE, color='r', linestyle='--', label='Indicates $J_c$');\n", + "plt.title(r'Negative free energy $-\\mathcal{F}$ vs J');\n", + "plt.ylabel(r'$-\\mathcal{F}$ [bits]');\n", + "plt.xlabel('J');\n", + "plt.savefig('nfe_v_J.png', dpi=300) # dpi=300 for high resolution" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "8dcf947a", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Thermodynamic efficiency\n", + "plt.figure(figsize=(8, 6)) # Define the figure size\n", + "plt.scatter(Js, result_eta,s=5, color='k', alpha=0.9);\n", + "plt.axvline(x=CRITICAL_VALUE, color='r', linestyle='--', label='Indicates $p_c$');\n", + "plt.title(r'Thermodynamic efficiency $\\eta$ vs J');\n", + "plt.ylabel(r'$\\eta$ [bit/joule]');\n", + "plt.xlabel('J');\n", + "plt.savefig('eta_v_J.png', dpi=300) # dpi=300 for high resolution" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "22d7593e", + "metadata": {}, + "outputs": [], + "source": [ + "# Export data \n", + "combined_data = {\n", + " 'L' : L,\n", + " 'time' : time,\n", + " 'Js' : Js.tolist(),\n", + " 'sampleSize' : sampleSize,\n", + " 'subSample' : subSample,\n", + " 'numSims' : numSims,\n", + " 'bias' : bias,\n", + " 'absolute_m' : absolute_m,\n", + " 'theta_star' : theta_star,\n", + " 'threshold' : threshold,\n", + " 'pred_info' : pred_info.tolist(),\n", + " 'sens_entp' : sens_entp.tolist(),\n", + " 'empw_vals' : empw_vals.tolist(),\n", + " 'free_engy' : free_engy.tolist(),\n", + " 'cnfg_entp' : cnfg_entp.tolist(),\n", + " 'avrg_magt' : avrg_magt.tolist()\n", + "}\n", + "\n", + "# Save combined data to a JSON file\n", + "with open('output.json', 'w') as json_file:\n", + " json.dump(combined_data, json_file)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}