Source code for pompy.processors

# -*- coding: utf-8 -*-
"""Helper classes to process outputs of models."""

from __future__ import division

__authors__ = 'Matt Graham'
__license__ = 'MIT'

import math
import numpy as np


[docs]class ConcentrationValueCalculator(object): """Calculates odour concentration values in simulation region."""
[docs] def __init__(self, puff_molecular_amount): """ Parameters ---------- puff_molecular_amount : float Molecular content of each puff (e.g. in moles or raw number of molecules). This is conserved as the puff is transported within the plume but the puff becomes increasingly diffuse as it's radius grows due to diffusion. """ # precompute constant used to scale Gaussian amplitude self._ampl_const = puff_molecular_amount / (8 * np.pi**3)**0.5
def _puff_conc_dist(self, x, y, z, px, py, pz, r_sq): """Calculate Gaussian puff concentration distribution.""" return ( self._ampl_const / r_sq**1.5 * np.exp(-((x - px)**2 + (y - py)**2 + (z - pz)**2) / (2 * r_sq)) )
[docs] def calc_conc_point(self, puff_array, x, y, z=0): """Calculate concentration at a single point. Parameters ---------- puff_array : array 2D array of puff properties of shape `(n_puffs, 4)` with `n_puffs` being the number of puffs and the four columns of the array corresponding in order to the puff x-coordinates, y-coordinates, z-coordinates and squared radii. x : float x-coordinate of point. y : float y-coordinate of point. z : float z-coordinate of point. """ # filter for non-nan puff entries and separate properties for # convenience px, py, pz, r_sq = puff_array[~np.isnan(puff_array[:, 0]), :].T return self._puff_conc_dist(x, y, z, px, py, pz, r_sq).sum(-1)
[docs] def calc_conc_list(self, puff_array, x, y, z=0): """Calculate concentrations across a 1D list of points in a xy-plane. Parameters ---------- puff_array : array 2D array of puff properties of shape `(n_puffs, 4)` with `n_puffs` being the number of puffs and the four columns of the array corresponding in order to the puff x-coordinates, y-coordinates, z-coordinates and squared radii. x : array 1D array of x-coordinates of points. y : array 1D array of y-coordinates of points. z : float z-coordinate (height) of plane. """ # filter for non-nan puff entries and separate properties for # convenience px, py, pz, r_sq = puff_array[~np.isnan(puff_array[:, 0]), :].T return self._puff_conc_dist( x[:, None], y[:, None], z, px[None], py[None], pz[None], r_sq[None] ).sum(-1)
[docs] def calc_conc_grid(self, puff_array, x, y, z=0): """Calculate concentrations across a 2D grid of points in a xy-plane. Parameters ---------- puff_array : array 2D array of puff properties of shape `(n_puffs, 4)` with `n_puffs` being the number of puffs and the four columns of the array corresponding in order to the puff x-coordinates, y-coordinates, z-coordinates and squared radii. x : array 2D array of x-coordinates of grid points. y : array 2D array of y-coordinates of grid points. z : float z-coordinate (height) of grid plane. """ # filter for non-nan puff entries and separate properties for # convenience px, py, pz, r_sq = puff_array[~np.isnan(puff_array[:, 0]), :].T return self._puff_conc_dist( x[..., None], y[..., None], z, px[None, None], py[None, None], pz[None, None], r_sq[None, None]).sum(-1)
[docs]class ConcentrationArrayGenerator(object): """Produces odour concentration field arrays from puff property arrays. Instances of this class can take single or multiple arrays of puff properties outputted from a `PlumeModel` and process them to produce an array of the concentration values across the a specified region using a Gaussian model for the individual puff concentration distributions. Compared to the `ConcentrationValueCalculator` class, this class should be more efficient for calculating large concentration field arrays for real-time graphical display of odour concentration fields for example at the expense of less accurate values due to the truncation of spatial extent of each puff. Notes ----- The returned array values correspond to the point concentration measurements across a regular grid of sampling points - i.e. the equivalent to convolving the true continuous concentration distribution with a regular 2D grid of Dirac delta / impulse functions. An improvement in some ways would be to instead calculate the integral of the concentration distribution over the (square) region around each grid point however this would be extremely computationally costly and due to the lack of a closed form solution for the integral of a Gaussian also potentially difficult to implement without introducing other numerical errors. An integrated field can be approximated with this class by generating an array at a higher resolution than required and then filtering with a suitable kernel and down-sampling. This implementation estimates the concentration distribution puff kernels with sub-grid resolution, giving improved accuracy at the cost of increased computational cost versus using a precomputed radial field aligned with the grid to compute kernel values or using a library of precomputed kernels. For cases where the array region cover the whole simulation region the computational cost could also be reduced by increasing the size of the region the array corresponds to outside of the simulation region such that when adding the puff concentration kernels to the concentration field array, checks do not need to be made to restrict to the overlapping region for puffs near the edges of the simulation region which have a concentration distribution which extends beyond its extents. """
[docs] def __init__(self, array_xy_region, array_z, n_x, n_y, puff_mol_amount, kernel_rad_mult=3): """ Parameters ---------- array_region : Rectangle Two-dimensional rectangular region defined in world coordinates over which to calculate the concentration field. array_z : float Height on the vertical z-axis at which to calculate the concentration field over. n_x : integer Number of grid points to sample at across x-dimension. n_y : integer Number of grid points to sample at across y-dimension. puff_mol_amount : float Molecular content of each puff (e.g. in moles or raw number of molecules). This is conserved as the puff is transported within the plume but the puff becomes increasingly diffuse as its radius grows due to diffusion. (dimensionality:molecular amount) kernel_rad_mult : float Multiplier used to determine to within how many puff radii from the puff centre to truncate the concentration distribution kernel calculated to. The default value of 3 will truncate the Gaussian kernel at (or above) the point at which the concentration has dropped to 0.004 of the peak value at the puff centre. """ self.array_xy_region = array_xy_region self.array_z = array_z self.n_x = n_x self.n_y = n_y self.dx = array_xy_region.w / n_x # calculate x grid point spacing self.dy = array_xy_region.h / n_y # calculate y grid point spacing # precompute constant used to scale Gaussian kernel amplitude self._ampl_const = puff_mol_amount / (8*np.pi**3)**0.5 self.kernel_rad_mult = kernel_rad_mult
def _puff_kernel(self, shift_x, shift_y, z_offset, r_sq, even_w, even_h): """Computes a 2D array representing puff concentration distribution.""" # kernel is truncated to min +/- kernel_rad_mult * effective puff # radius from centre i.e. Gaussian kernel with >= kernel_rad_mult * # standard deviation span # (effective puff radius is (r_sq - (z_offset/k_r_mult)**2)**0.5 to # account for the cross sections of puffs with centres out of the # array plane being 'smaller') # the truncation will introduce some errors shape = (2 * (r_sq * self.kernel_rad_mult**2 - z_offset**2)**0.5 / np.array([self.dx, self.dy])) # depending on whether centre is on grid points or grid centres # kernel dimensions will need to be forced to odd/even respectively shape[0] = self._round_up_to_next_even_or_odd(shape[0], even_w) shape[1] = self._round_up_to_next_even_or_odd(shape[1], even_h) # generate x and y grids with required shape [x_grid, y_grid] = 0.5 + np.mgrid[-shape[0] // 2:shape[0] // 2, -shape[1] // 2:shape[1] // 2] # apply shifts to correct for offset of true centre from nearest # grid-point / centre x_grid = x_grid * self.dx + shift_x y_grid = y_grid * self.dy + shift_y # compute square radial field r_sq_grid = x_grid**2 + y_grid**2 + z_offset**2 # output scaled Gaussian kernel return (self._ampl_const / r_sq**1.5) * np.exp(-r_sq_grid / (2 * r_sq)) @staticmethod def _round_up_to_next_even_or_odd(value, to_even): """Round value up to first even or odd number `>= value`. Returns value rounded up to first even number `>= value` if `to_even==True` and to first odd number `>= value` if `to_even==False`. """ value = math.ceil(value) if to_even: if value % 2 == 1: value += 1 else: if value % 2 == 0: value += 1 return value
[docs] def generate_single_array(self, puff_array): """Generates a single concentration field array from a puff array. Parameters ---------- puff_array : array 2D array of puff properties of shape `(n_puffs, 4)` with `n_puffs` being the number of puffs and the four columns of the array corresponding in order to the puff x-coordinates, y-coordinates, z-coordinates and squared radii. Returns ------- conc_array : array 2D array of shape `(n_x, n_y)` containing concentration field values at specified grid points. """ # initialise concentration array conc_array = np.zeros((self.n_x, self.n_y)) # loop through all the puffs for (puff_x, puff_y, puff_z, puff_r_sq) in puff_array: # to begin with check this a real puff and not a placeholder nan # entry as puff arrays may have been pre-allocated with nan # at a fixed size for efficiency and as the number of puffs # existing at any time interval is variable some entries in the # array will be unallocated, placeholder entries should be # contiguous (i.e. all entries after the first placeholder will # also be placeholders) therefore break out of loop completely # if one is encountered if np.isnan(puff_x): break # check puff centre is within region array is being calculated # over otherwise skip if not self.array_xy_region.contains(puff_x, puff_y): continue # finally check that puff z-coordinate is within # kernel_rad_mult*r_sq of array evaluation height otherwise skip puff_z_offset = (self.array_z - puff_z) if abs(puff_z_offset) / puff_r_sq**0.5 > self.kernel_rad_mult: continue # calculate (float) row index corresponding to puff x coord p = (puff_x - self.array_xy_region.x_min) / self.dx # calculate (float) column index corresponding to puff y coord q = (puff_y - self.array_xy_region.y_min) / self.dy # calculate nearest integer or half-integer row index to p u = math.floor(2 * p + 0.5) / 2 # calculate nearest integer or half-integer row index to q v = math.floor(2 * q + 0.5) / 2 # generate puff kernel array of appropriate scale and taking # into account true centre offset from nearest half-grid # points (u,v) kernel = self._puff_kernel( (p - u) * self.dx, (q - v) * self.dy, puff_z_offset, puff_r_sq, u % 1 == 0, v % 1 == 0) # compute row and column slices for source kernel array and # destination concentration array taking in to the account # the possibility of the kernel being partly outside the # extents of the destination array (w, h) = kernel.shape r_rng_arr = slice(int(max(0, u - w / 2.)), int(max(min(u + w / 2., self.n_x), 0))) c_rng_arr = slice(int(max(0, v - h / 2.)), int(max(min(v + h / 2., self.n_y), 0))) r_rng_knl = slice(int(max(0, -u + w / 2.)), int(min(-u + w / 2. + self.n_x, w))) c_rng_knl = slice(int(max(0, -v + h / 2.)), int(min(-v + h / 2. + self.n_y, h))) # add puff kernel values to concentration field array conc_array[r_rng_arr, c_rng_arr] += kernel[r_rng_knl, c_rng_knl] return conc_array
[docs] def generate_multiple_arrays(self, puff_arrays): """Generates multiple concentration field arrays from puff arrays. Parameters ---------- puff_arrays : array iterable Iterable sequence of 2D arrays of puff properties, with each array of shape `(n_puffs, 4)` with `n_puffs` being the number of puffs and the four columns of the array corresponding in order to the puff x-coordinates, y-coordinates, z-coordinates and squared radii. Returns ------- conc_arrays : array iterable List of 2D arrays of shape `(n_x, n_y)` containing concentration field values at specified grid points, with each array in list corresponding to the puff properties array at the corresponding index in `puff_arrays`. """ conc_arrays = [] for puff_array in puff_arrays: conc_arrays.append(self.generate_single_frame(puff_array)) return conc_arrays