Source code for pompy.models

# -*- coding: utf-8 -*-
"""Implementations of puff-based plume model components."""

from __future__ import division

__authors__ = 'Matt Graham'
__license__ = 'MIT'

import numpy as np
import scipy.interpolate as interp

class SlottedIterable(object):
    """Base class for objects with slots which can be used as iterables.

    Allows instances of subclasses to be used as iterables and provides a
    human-readable string representation.

    __slots__ = ()

    def __iter__(self):
        """Iterate through slot attributes in defined order."""
        for name in self.__slots__:
            yield getattr(self, name)

    def __repr__(self):
        """String representation of object."""
        return '{cls}({attr})'.format(
            attr=', '.join(['{0}={1}'.format(
                name, getattr(self, name)) for name in self.__slots__]))

[docs]class Puff(SlottedIterable): """Container for the properties of a single odour puff.""" __slots__ = ('x', 'y', 'z', 'r_sq')
[docs] def __init__(self, x, y, z, r_sq): """ Parameters ---------- x : float x-coordinate of puff centre. y : float y-coordinate of puff centre. z : float z-coordinate of puff centre. r_sq : float Squared radius of puff. """ assert r_sq >= 0., 'r_sq must be non-negative.' self.x = x self.y = y self.z = z self.r_sq = r_sq
[docs]class Rectangle(SlottedIterable): """Axis-aligned rectangular region. Rectangle is defined by two points `(x_min, y_min)` and `(x_max, y_max)` with it required that `x_max > x_min` and `y_max > y_min`. """ __slots__ = ('x_min', 'x_max', 'y_min', 'y_max')
[docs] def __init__(self, x_min, x_max, y_min, y_max): """ Parameters ---------- x_min : float x-coordinate of bottom-left corner of rectangle. y_min : float x-coordinate of bottom-right corner of rectangle. x_max : float x-coordinate of top-right corner of rectangle. y_max : float y-coordinate of top-right corner of rectangle. """ assert x_min < x_max, 'Rectangle x_min must be < x_max.' assert y_min < y_max, 'Rectangle y_min must be < y_max.' self.x_min = x_min self.x_max = x_max self.y_min = y_min self.y_max = y_max
@property def w(self): """Width of rectangle (i.e. distance covered on x-axis).""" return self.x_max - self.x_min @property def h(self): """Height of rectangle (i.e. distance covered on y-axis).""" return self.y_max - self.y_min
[docs] def contains(self, x, y): """Whether `(x, y)`` position is contained within this rectangle. Tests whether the supplied position, an `(x,y)` pair, is contained within the region defined by this `Rectangle` object and returns `True` if so and `False` if not. Parameters ---------- x : float x-coordinate of position to test. y : float y-coordinate of position to test. Returns ------- contains : boolean `True` if `(x, y)` is within the rectangle and `False` otherwise. """ return (x >= self.x_min and x <= self.x_max and y >= self.y_min and y <= self.y_max)
[docs]class PlumeModel(object): """Puff-based odour plume dispersion model from Farrell et. al. (2002). The odour plume is modelled as a series of odour puffs which are released from a fixed source position. The odour puffs are dispersed by a modelled 2D wind velocity field plus a white noise process model of mid-scale puff mass diffusion relative to the plume centre line. The puffs also spread in size over time to model fine-scale diffusive processes. """
[docs] def __init__(self, sim_region=None, source_pos=(5., 0., 0.), wind_model=None, model_z_disp=True, centre_rel_diff_scale=2., puff_init_rad=0.0316, puff_spread_rate=0.001, puff_release_rate=10, init_num_puffs=10, max_num_puffs=1000, rng=None): """ Parameters ---------- sim_region : Rectangle 2D rectangular region of space over which the simulation is conducted. This should be a subset of the simulation region defined for the wind model. source_pos : float iterable Coordinates of the fixed source position within the simulation region from which puffs are released. If a length 2 iterable is passed, the z coordinate will be set a default of 0 (dimension: length). wind_model : WindModel Dynamic model of the large scale wind velocity field in the simulation region. model_z_disp : boolean Whether to model dispersion of puffs from plume centre-line in z direction. If set `True` then the puffs will be modelled as dispersing in the vertical direction by a random walk process (the wind model is limited to 2D hence the vertical wind speed is assumed to be zero), if set `False` the puff z-coordinates will not be updated from their initial value of 0. centre_rel_diff_scale : float or float iterable Scaling for the stochastic process used to model the centre-line relative diffusive transport of puffs. Either a single float value of isotropic diffusion in all directions, or one of a pair of values specifying different scales for the x and y directions respectively if `model_z_disp=False` or a triplet of values specifying different scales for x, y and z scales respectively if `model_z_disp=True` (dimension: length / time**0.5). puff_init_rad: float Initial radius of the puffs (dimension: length). puff_spread_rate : float Constant which determines the rate at which the odour puffs increase in size over time (dimension: length**2 / time). puff_release_rate : float Mean rate at which new puffs are released into the plume. Puff release is modelled as a stochastic Poisson process, with each puff released assumed to be independent and the mean release rate fixed (dimension: count/time). init_num_puffs : integer Initial number of puffs to release at the beginning of the simulation. max_num_puffs : integer Maximum number of puffs to permit to be in existence simultaneously within model, used to limit memory and processing requirements of model. This parameter needs to be set carefully in relation to the puff release rate and simulation region size as if too small it will lead to breaks in puff release when the number of puffs remaining in the simulation region reaches the limit. rng : RandomState Random number generator to use in generating input noise. Defaults to `numpy.random` global generator if set to `None` however a seeded `RandomState` object can be passed if it is desired to have reproducible output. """ if sim_region is None: sim_region = Rectangle(0., 50., -12.5, 12.5) if rng is None: rng = np.random self.sim_region = sim_region if wind_model is None: wind_model = WindModel() self.wind_model = wind_model self.rng = rng self.model_z_disp = model_z_disp self._vel_dim = 3 if model_z_disp else 2 if model_z_disp and hasattr(centre_rel_diff_scale, '__len__'): assert len(centre_rel_diff_scale) == 2, ( 'When model_z_disp=True, centre_rel_diff_scale must be a ' 'scalar or length 1 or 3 iterable.') self.centre_rel_diff_scale = centre_rel_diff_scale assert sim_region.contains(source_pos[0], source_pos[1]), ( 'Specified source position must be within simulation region.') # default to zero height source when source_pos is 2D source_z = 0 if len(source_pos) != 3 else source_pos[2] self._new_puff_params = ( source_pos[0], source_pos[1], source_z, puff_init_rad**2) self.puff_spread_rate = puff_spread_rate self.puff_release_rate = puff_release_rate self.max_num_puffs = max_num_puffs # initialise puff list with specified number of new puffs self.puffs = [ Puff(*self._new_puff_params) for i in range(init_num_puffs)]
[docs] def update(self, dt): """Update plume puff objects by forward intgating one time-step. Performs a single time-step update of plume model using Euler integration scheme. Parameters ---------- dt : float Simulation time-step (dimension: time). """ # add more puffs (stochastically) if enough capacity if len(self.puffs) < self.max_num_puffs: # puff release modelled as Poisson process at fixed mean rate # with number to release clipped if it would otherwise exceed # the maximum allowed num_to_release = min( self.rng.poisson(self.puff_release_rate * dt), self.max_num_puffs - len(self.puffs)) self.puffs += [ Puff(*self._new_puff_params) for i in range(num_to_release)] # initialise empty list for puffs that have not left simulation area alive_puffs = [] for puff in self.puffs: # interpolate wind velocity at Puff position from wind model grid # assuming zero wind speed in vertical direction if modelling # z direction dispersion wind_vel = np.zeros(self._vel_dim) wind_vel[:2] = self.wind_model.velocity_at_pos(puff.x, puff.y) # approximate centre-line relative puff transport velocity # component as being a (Gaussian) white noise process scaled by # constants filament_diff_vel = (self.rng.normal(size=self._vel_dim) * self.centre_rel_diff_scale) vel = wind_vel + filament_diff_vel # update puff position using Euler integration puff.x += vel[0] * dt puff.y += vel[1] * dt if self.model_z_disp: puff.z += vel[2] * dt # update puff size using Euler integration with second puff # growth model described in paper puff.r_sq += self.puff_spread_rate * dt # only keep puff alive if it is still in the simulated region if self.sim_region.contains(puff.x, puff.y): alive_puffs.append(puff) # store alive puffs only self.puffs = alive_puffs
@property def puff_array(self): """NumPy array of the properties of the simulated puffs. Each row corresponds to one puff with the first column containing the puff position x-coordinate, the second the y-coordinate, the third the z-coordinate and the fourth the puff squared radius. """ return np.array([tuple(puff) for puff in self.puffs])
[docs]class WindModel(object): """Wind velocity model to calculate advective transport of odour. A 2D approximation is used as described in the paper, with the wind velocities calculated over a regular 2D grid of points using a finite difference method. The boundary conditions at the edges of the simulated region are for both components of the velocity field constant mean values plus coloured noise. For each of the field components these are calculated for the four corners of the simulated region and then linearly interpolated over the edges. """
[docs] def __init__(self, sim_region=None, n_x=21, n_y=21, u_av=1., v_av=0., k_x=20., k_y=20., noise_gain=2., noise_damp=0.1, noise_bandwidth=0.2, use_original_noise_updates=False, rng=None): """ Parameters ---------- sim_region : Rectangle Two-dimensional rectangular region over which to model wind velocity field. n_x : integer Number of grid points in x direction. n_y : integer Number of grid points in y direction. u_av : float Mean x-component of wind velocity (dimension: length / time). v_av : float Mean y-component of wind velocity (dimension: length / time). k_x : float or array_like Diffusivity constant in x direction. Either a single scalar value across the whole simulated region or an array of size `(n_x, n_y)` defining values for each grid point (dimension: length**2 / time). k_y : float or array_like Diffusivity constant in y direction. Either a single scalar value across the whole simulated region or an array of size `(n_x, n_y)` defining values for each grid point (dimension: length**2 / time). noise_gain : float Input gain constant for boundary condition noise generation (dimensionless). noise_damp : float Damping ratio for boundary condition noise generation (dimensionless). noise_bandwidth : float Bandwidth for boundary condition noise generation (dimension: angle / time). use_original_noise_updates : boolean Whether to use the original non-SDE based updates for the noise process as defined in Farrell et al. (2002), see notes in `ColouredNoiseGenerator` documentation. rng : RandomState Random number generator to use in generating input noise. Defaults to `numpy.random` global generator if set to `None` however a seeded `RandomState` object can be passed if it is desired to have reproducible output. """ if sim_region is None: sim_region = Rectangle(0, 100, -50, 50) if rng is None: rng = np.random self.sim_region = sim_region self.u_av = u_av self.v_av = v_av self.n_x = n_x self.n_y = n_y self.k_x = k_x self.k_y = k_y # set coloured noise generator for applying boundary condition # need to generate coloured noise samples at four corners of boundary # for both components of the wind velocity field so (2,8) state # vector (2 as state includes first derivative) self.noise_gen = ColouredNoiseGenerator( np.zeros((2, 8)), noise_damp, noise_bandwidth, noise_gain, use_original_noise_updates, rng) # compute grid node spacing self.dx = sim_region.w / (n_x - 1) # x grid point spacing self.dy = sim_region.h / (n_y - 1) # y grid point spacing # initialise wind velocity field to mean values # +2s are to account for boundary grid points self._u = np.ones((n_x + 2, n_y + 2)) * u_av self._v = np.ones((n_x + 2, n_y + 2)) * v_av # create views on to field interiors (i.e. not including boundaries) # for notational ease - note this does not copy any data self._u_int = self._u[1:-1, 1:-1] self._v_int = self._v[1:-1, 1:-1] # preassign array of corner means values self._corner_means = np.array([u_av, v_av]).repeat(4) # precompute linear ramp arrays with size of boundary edges for # linear interpolation of corner values self._ramp_x = np.linspace(0., 1., n_x + 2) self._ramp_y = np.linspace(0., 1., n_y + 2) # set up cubic spline interpolator for calculating off-grid wind # velocity field values self._x_points = np.linspace(sim_region.x_min, sim_region.x_max, n_x) self._y_points = np.linspace(sim_region.y_min, sim_region.y_max, n_y) # initialise flag to indicate velocity field interpolators not set self._interp_set = True
def _set_interpolators(self): """ Set spline interpolators using current velocity fields.""" self._interp_u = interp.RectBivariateSpline( self.x_points, self.y_points, self._u_int) self._interp_v = interp.RectBivariateSpline( self.x_points, self.y_points, self._v_int) self._interp_set = True @property def x_points(self): """1D array of the range of x-coordinates of simulated grid points.""" return self._x_points @property def y_points(self): """1D array of the range of y-coordinates of simulated grid points.""" return self._y_points @property def velocity_field(self): """Current calculated velocity field across simulated grid points.""" return np.dstack((self._u_int, self._v_int))
[docs] def velocity_at_pos(self, x, y): """Calculate velocity at a position or positions. Calculates the components of the velocity field at arbitrary point(s) in the simulation region using a bivariate spline interpolation over the calculated grid point values. Parameters ---------- x : float or array x-coordinate of the point(s) to calculate the velocity at (dimension: length). y : float or array y-coordinate of the point(s) to calculate the velocity at (dimension: length). Returns ------- vel : array Velocity field (2D) values evaluated at specified point(s) (dimension: length / time). """ if not self._interp_set: self._set_interpolators() return np.array([float(self._interp_u(x, y)), float(self._interp_v(x, y))])
[docs] def update(self, dt): """Update wind velocity field by forward integrating one time-step. Updates wind velocity field values using finite difference approximations for spatial derivatives and Euler integration for time-step update. Parameters ---------- dt : float Simulation time-step (dimension: time). """ # update boundary values self._apply_boundary_conditions(dt) # approximate spatial first derivatives with centred finite difference # equations for both components of wind field du_dx, du_dy = self._centred_first_diffs(self._u) dv_dx, dv_dy = self._centred_first_diffs(self._v) # approximate spatial second derivatives with centred finite difference # equations for both components of wind field d2u_dx2, d2u_dy2 = self._centred_second_diffs(self._u) d2v_dx2, d2v_dy2 = self._centred_second_diffs(self._v) # compute approximate time derivatives across simulation region # interior from defining PDEs # du/dt = -(u*du/dx + v*du/dy) + 0.5*k_x*d2u/dx2 + 0.5*k_y*d2u/dy2 # dv/dt = -(u*dv/dx + v*dv/dy) + 0.5*k_x*d2v/dx2 + 0.5*k_y*d2v/dy2 du_dt = (-self._u_int * du_dx - self._v_int * du_dy + 0.5 * self.k_x * d2u_dx2 + 0.5 * self.k_y * d2u_dy2) dv_dt = (-self._u_int * dv_dx - self._v_int * dv_dy + 0.5 * self.k_x * d2v_dx2 + 0.5 * self.k_y * d2v_dy2) # perform update with Euler integration self._u_int += du_dt * dt self._v_int += dv_dt * dt # set flag to indicate interpolators no longer valid as fields updated self._interp_set = False
def _apply_boundary_conditions(self, dt): """Applies boundary conditions to wind velocity field.""" # update coloured noise generator self.noise_gen.update(dt) # extract four corner values for each of u and v fields as component # mean plus current noise generator output (u_tl, u_tr, u_bl, u_br, v_tl, v_tr, v_bl, v_br) = ( self.noise_gen.output + self._corner_means) # linearly interpolate along edges self._u[:, 0] = u_tl + self._ramp_x * (u_tr - u_tl) # u top edge self._u[:, -1] = u_bl + self._ramp_x * (u_br - u_bl) # u bottom edge self._u[0, :] = u_tl + self._ramp_y * (u_bl - u_tl) # u left edge self._u[-1, :] = u_tr + self._ramp_y * (u_br - u_tr) # u right edge self._v[:, 0] = v_tl + self._ramp_x * (v_tr - v_tl) # v top edge self._v[:, -1] = v_bl + self._ramp_x * (v_br - v_bl) # v bottom edge self._v[0, :] = v_tl + self._ramp_y * (v_bl - v_tl) # v left edge self._v[-1, :] = v_tr + self._ramp_y * (v_br - v_tr) # v right edge def _centred_first_diffs(self, f): """Calculates centred first-order finite differences.""" return ((f[2:, 1:-1] - f[0:-2, 1:-1]) / (2 * self.dx), (f[1:-1, 2:] - f[1:-1, 0:-2]) / (2 * self.dy)) def _centred_second_diffs(self, f): """Calculates centred second-order finite differences.""" return ( (f[2:, 1:-1] - 2 * f[1:-1, 1:-1] + f[0:-2, 1:-1]) / self.dx**2, (f[1:-1, 2:] - 2 * f[1:-1, 1:-1] + f[1:-1, 0:-2]) / self.dy**2)
[docs]class ColouredNoiseGenerator(object): """Generator of coloured (correlated) Gaussian noise process. Generates a coloured noise output via numerical integration of a stochastic differential equation formulation. The system is assumed to be defined by the system of SDEs:: dx_0 = x_1 * dt dx_1 = -(a * x_0 + b * x_1) * dt + c * dn where `a = bandwidth**2` and `b = 2 * damping * bandwidth`, `c = gain * bandwidth**2` and `dn` is a standard Gaussian white noise process. This is numerically integrated using an Euler-Maruyama scheme:: for t in range(n_timestep): x[t+1,0] = x[t,0] + dt * x[t,1] x[t+1,1] = x[t,1] - dt * (a*x[t,0] + b*x[t,1]) + dt**0.5 * c * n[t] where `x` is an array of shape `(n_timestep, 2)` and `n` is an array of shape `(n_timestep)` filled with random standard normal draws. This differs from the code accompanying Farrell et al. (2002) which applies an Euler integration scheme to a state space formulation of a second-order linear system with Gaussian noise input at each time step, resulting in updates of the form:: for t in range(n_timestep): x[t+1,0] = x[t,0] + dt * x[t,1] x[t+1,1] = x[t,1] + dt * (-a * x[t,0] - b * x[t,1] + c * n[t]) This differs from the scheme implemented here by scaling the noise input by the timestep `dt` rather than its square root `dt**0.5`. This introduces an implicit dependence of the amplitude of the process on `dt`, in particular that the amplitude scales roughly as `dt**0.5`. Updates consistent with the Farrell et al. (2002) implementation can be achieved by setting the `use_original_updates` flat to `True`. """
[docs] def __init__(self, init_state, damping=0.1, bandwidth=0.2, gain=1., use_original_updates=False, rng=None): """ Parameters ---------- init_state : array_like The initial state of system, must be of shape `(2,n)` where `n` is the size of the noise vector to be produced. The first row sets the initial values and the second the initial first derivatives. damping : float Damping ratio for the system, affects system damping, values of < 1 give an underdamped system, = 1 a critically damped system and > 1 an overdamped system (dimensionless). bandwidth : float Bandwidth or equivalently undamped natural frequency of system, affects system reponsiveness to variations in (noise) input (dimension = 1 / time). gain : float Input gain of system, affects scaling of (noise) input. rng : RandomState Random number generator to use in generating input noise. Defaults to `numpy.random` global generator if set to `None` however a seeded `RandomState` object can be passed if it is desired to have reproducible output. use_original_updates : boolean Whether to use the original non-SDE based updates for the noise process as defined in Farrell et al. (2002), see above notes. """ if rng is None: rng = np.random # set up state space matrices self.a_mtx = np.array([ [0., 1.], [-bandwidth**2, -2. * damping * bandwidth]]) self.b_mtx = np.array([[0.], [gain * bandwidth**2]]) # initialise state self.state = init_state self.rng = rng self.use_original_updates = use_original_updates
@property def output(self): """Coloured noise output.""" return self.state[0, :]
[docs] def update(self, dt): """Update state of noise generator. Parameters ---------- dt : float Integrator time step. """ # get normal random input n = self.rng.normal(size=(1, self.state.shape[1])) if self.use_original_updates: # apply Farrell et al. (2002) update self.state += dt * ( + else: # apply update with Euler-Maruyama integration self.state += ( dt * + * dt**0.5)