# -*- coding: utf-8 -*-
"""Classes and functions for the analysis of PMCTRACK output."""
import operator
import warnings
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
from .decor import ReprTrackRun, get_pbar
from .exceptions import (
ArgumentError,
ConcatenationError,
GridError,
InconsistencyWarning,
LoadError,
MissingConfWarning,
NotCategorisedError,
SelectError,
)
from .grid import cell_bounds, cell_centres, grid_cell_areas
from .misc import _exclude_by_first_day, _exclude_by_last_day
from .params import ARCH_KEY, ARCH_KEY_CAT, COLUMNS, EARTH_RADIUS, FILLVAL, HOUR, KM2M, M2KM
from .parts import TrackSettings
from .utils import (
distance_metric,
great_circle,
point_density_cell,
point_density_rad,
total_dist,
track_density_cell,
track_density_rad,
)
class _OctantSeries(pd.Series):
"""`pandas.Series` subclass used in octant library."""
@property
def _constructor(self):
return _OctantSeries
[docs]class OctantTrack(pd.DataFrame):
"""
Instance of cyclone track.
DataFrame with a bunch of extra methods and properties.
"""
[docs] def __init__(self, *args, **kw):
"""Initialise octant.core.OctantTrack."""
super(OctantTrack, self).__init__(*args, **kw)
@property
def _constructor(self):
return OctantTrack # replace with self.__class__?
_constructor_sliced = _OctantSeries
@property
def gb(self):
"""Shortcut to group by track_idx index."""
return self.groupby("track_idx")
[docs] @classmethod
def from_df(cls, df):
"""Create OctantTrack from pandas.DataFrame."""
return cls.from_records(df.to_records(index=False))
[docs] @classmethod
def from_mux_df(cls, df):
"""Create OctantTrack from a multi-index pandas.DataFrame."""
if df.shape[0] > 0:
return cls.from_records(df.to_records(index=True), index=df.index.names)
else:
return cls(columns=df.columns, index=df.index)
@property
def coord_view(self):
"""Numpy view of track coordinates: longitude, latitude, time."""
return (
self.lon.values.view("double"),
self.lat.values.view("double"),
self.time.values.view("int64"),
)
@property
def lonlat(self):
"""Values of longitude and latitude as 2D numpy array."""
return self[["lon", "lat"]].values
@property
def lonlat_c(self):
"""Values of longitude and latitude as C-ordered 2D numpy array."""
return self.lonlat.astype("double", order="C")
@property
def tridlonlat(self):
"""Values of track index, longitude, latitude as 2D numpy array."""
return self.reset_index("track_idx")[["track_idx", "lon", "lat"]].values
@property
def tridlonlat_c(self):
"""Values of track index, longitude, latitude as C-order 2D array."""
return self.tridlonlat.astype("double", order="C")
@property
def lifetime_h(self):
"""Track duration in hours."""
if self.shape[0] > 0:
return (self.time.values[-1] - self.time.values[0]) / HOUR
else:
return 0
@property
def gen_lys_dist_km(self):
"""Distance between genesis and lysis of the cyclone track in km."""
# TODO: include planet radius
if self.shape[0] > 0:
return (
great_circle(
self.lonlat[0, 0], self.lonlat[-1, 0], self.lonlat[0, 1], self.lonlat[-1, 1]
)
* M2KM
)
else:
return 0
@property
def total_dist_km(self):
"""Total track distance in km."""
return total_dist(self.lonlat_c) * M2KM
@property
def average_speed(self):
"""Average cyclone propagation speed in km per hour."""
if self.lifetime_h == 0:
return np.nan
else:
return self.total_dist_km / self.lifetime_h
@property
def max_vort(self):
"""Maximum vorticity of the cyclone track."""
return np.nanmax(self.vo.values)
[docs] def within_rectangle(self, lon0, lon1, lat0, lat1, time_frac=1):
"""
Check that OctantTrack is within a rectangle for a fraction of its lifetime.
Parameters
----------
lon0, lon1, lat0, lat1: float
Boundaries of longitude-latitude rectangle (lon_min, lon_max, lat_min, lat_max)
time_frac: float, optional
Time fraction threshold.
By default, set to maximum, i.e. track should be within the box entirely.
Returns
-------
bool
Examples
--------
Test that cyclone spends no more than a third of its life time outside the box
>>> bbox = [-10, 25, 68, 78]
>>> ot.within_rectangle(*bbox, time_frac=0.67)
True
See Also
--------
octant.misc.check_far_from_boundaries
"""
_within = self[
(self.lon >= lon0) & (self.lon <= lon1) & (self.lat >= lat0) & (self.lat <= lat1)
]
if self.lifetime_h == 0:
return _within.shape[0] == 1
else:
return _within.lifetime_h / self.lifetime_h >= time_frac
[docs] def plot_track(self, ax=None, **kwargs):
"""
Plot cyclone track using as plot function from plotting submodule.
Filled circle shows the beginning, empty circle - the end of the track.
Parameters
----------
ax: matplotlib axes object, optional
Axes in which to plot the track
If not given, a new figure with cartopy geoaxes is created
transform: matplotlib transform, optional
Default: cartopy.crs.PlateCarree()
kwargs: other keyword arguments
Options to pass to matplotlib plot() function
Returns
-------
ax: matplotlib axes object
The same ax as the input ax (if given), or a newly created axes
"""
from .plotting import plot
return plot(self, ax=ax, **kwargs)
[docs]class TrackRun:
"""
Results of tracking experiment.
Attributes
----------
data: octant.core.OctantTrack
DataFrame-like container of tracking locations, times, and other data
filelist: list
List of source "vortrack" files; see PMCTRACK docs for more info
conf: octant.parts.TrackSettings
Configuration used for tracking
is_categorised: bool
Flag if categorisation has been applied to the TrackRun
cats: None or pandas.DataFrame
DataFrame with the same index as data and the number of columns equal to
the number of categories; None if `is_categorised` is False
"""
_mux_names = ["track_idx", "row_idx"]
[docs] def __init__(self, dirname=None, columns=COLUMNS, **kwargs):
"""
Initialise octant.core.TrackRun.
Parameters
----------
dirname: pathlib.Path, optional
Path to the directory with tracking output
If present, load the data during on init
columns: sequence of str, optional
List of column names. Should contain 'time' to parse datetimes.
kwargs: other keyword arguments
Parameters passed to load_data()
"""
self.dirname = dirname
self.conf = None
mux = pd.MultiIndex.from_arrays([[], []], names=self._mux_names)
self.columns = columns
self.data = OctantTrack(index=mux, columns=self.columns)
self.filelist = []
self.sources = []
self.cats = None
self.is_categorised = False
self.is_cat_inclusive = False
self._cat_sep = "|"
if isinstance(self.dirname, Path):
# Read all files and store in self.all
# as a list of `pandas.DataFrame`s
self.load_data(self.dirname, columns=self.columns, **kwargs)
elif self.dirname is not None:
raise LoadError("To load data, `dirname` should be Path-like object")
if not self.data.empty:
# Define time step
for (_, ot) in self.gb:
if ot.shape[0] > 1:
self.tstep_h = ot.time.diff().values[-1] / HOUR
break
def __len__(self):
"""Get the number of cyclone tracks within TrackRun."""
return self.data.index.get_level_values(0).to_series().nunique()
def __repr__(self): # noqa
rtr = ReprTrackRun(self)
return rtr.str_repr(short=True)
def __str__(self): # noqa
rtr = ReprTrackRun(self)
return rtr.str_repr(short=False)
def _repr_html_(self):
rtr = ReprTrackRun(self)
return rtr.html_repr()
def __add__(self, other):
"""Combine two TrackRun objects together."""
new = self.__class__()
new.extend(self)
new.extend(other)
return new
def __getitem__(self, subset): # noqa
if (subset in [slice(None), None, "all"]) or len(self) == 0:
return self.data
else:
if self.is_categorised:
if isinstance(subset, str):
subsets = [subset]
else:
# if list of several subsets is given
subsets = subset
selected = True
for label in subsets:
if label not in self.cats.columns:
raise SelectError(
f"'{label}' is not among categories: {', '.join(self.cats.columns)}"
)
selected &= self.cats[label]
idx = self.cats[selected].index
return self.data.loc[idx, :]
else:
raise NotCategorisedError
@property
def cat_labels(self):
"""List of category labels."""
try:
return list(self.cats.columns)
except AttributeError:
return []
@property
def _pbar(self):
"""Get progress bar."""
return get_pbar()
@property
def gb(self):
"""Group by track index."""
return self.data.gb
[docs] def size(self, subset=None):
"""Size of subset of tracks."""
return self[subset].index.get_level_values(0).to_series().nunique()
[docs] def rename_cats(self, **mapping):
"""
Rename categories of the TrackRun.
Parameters
----------
mapping: dict
How to rename categories, {old_key: new_key}
"""
if self.is_categorised:
self.cats = self.cats.rename(columns=mapping)
else:
raise NotCategorisedError
[docs] def load_data(
self, dirname, columns=COLUMNS, wcard="vortrack*0001.txt", conf_file=None, scale_vo=1e-3
):
"""
Read tracking results from a directory into `TrackRun.data` attribute.
Parameters
----------
dirname: pathlib.Path
Path to the directory with tracking output
columns: sequence of str, optional
List of column names. Should contain 'time' to parse datetimes.
conf_file: pathlib.Path, optional
Path to the configuration file. If omitted, an attempt is
made to find a .conf file in the `dirname` directory
wcard: str
Wildcard for files to read in. By default, loads only "primary" vortices
and skips merged. See PMCTRACK docs for more info.
scale_vo: float, optional
Scale vorticity values column to SI units (s-1)
By default PMCTRACK writes out vorticity in (x10-3 s-1),
so scale_vo default is 1e-3. To switch off scaling, set it to 1.
"""
if not dirname.is_dir():
raise LoadError(f"No such directory: {dirname}")
# deprecated...
# if primary_only:
# wcard = "vortrack*0001.txt"
# else:
# wcard = "vortrack*.txt"
self.filelist = sorted([*dirname.glob(wcard)])
self.sources.append(str(dirname))
# Load configuration
if conf_file is None:
try:
conf_file = list(dirname.glob("*.conf"))[0]
self.conf = TrackSettings(conf_file)
except (IndexError, AttributeError):
msg = (
"Track settings file (.conf) in the `dirname` directory"
"is missing or could not be read"
)
warnings.warn(msg, MissingConfWarning)
# Load the tracks
self.columns = columns
load_kw = {"delimiter": r"\s+", "names": self.columns, "parse_dates": ["time"]} # noqa
_data = []
for fname in self._pbar(self.filelist):
_data.append(OctantTrack.from_df(pd.read_csv(fname, **load_kw)))
if len(_data) > 0:
self.data = pd.concat(_data, keys=range(len(_data)), names=self._mux_names)
# self.data["cat"] = 0
# Scale vorticity to (s-1)
self.data["vo"] *= scale_vo
del _data
[docs] @classmethod
def from_archive(cls, filename):
"""
Construct TrackRun object from HDF5 file.
Parameters
----------
filename: str
File path to HDF5 file
Returns
-------
octant.core.TrackRun
"""
with pd.HDFStore(filename, mode="r") as store:
df = store[ARCH_KEY]
metadata = store.get_storer(ARCH_KEY).attrs.metadata
if metadata["is_categorised"]:
try:
df_cat = store.get(ARCH_KEY_CAT)
except KeyError:
msg = (
"TrackRun is saved as categorised, but the file does not have"
f" {ARCH_KEY_CAT} with category data;"
" replacing self.cats with an empty DataFrame."
)
warnings.warn(msg, InconsistencyWarning)
df_cat = pd.DataFrame(
index=df[cls._mux_names[0]].unique(), columns=[cls._mux_names[0]]
)
out = cls()
if df.shape[0] > 0:
out.data = OctantTrack.from_mux_df(df.set_index(cls._mux_names))
else:
out.data = OctantTrack.from_mux_df(df)
metadata["conf"] = TrackSettings.from_dict(metadata["conf"])
out.__dict__.update(metadata)
if out.is_categorised:
out.cats = df_cat.set_index(cls._mux_names[0]).astype(bool)
return out
[docs] def to_archive(self, filename):
"""
Save TrackRun and its metadata to HDF5 file.
Parameters
----------
filename: str
File path to HDF5 file
"""
with pd.HDFStore(filename, mode="w") as store:
if self.size() > 0:
df = pd.DataFrame.from_records(self.data.to_records(index=True))
else:
df = pd.DataFrame(columns=self.columns, index=self.data.index)
store.put(ARCH_KEY, df)
metadata = {
k: v
for k, v in self.__dict__.items()
if k not in ["data", "filelist", "conf", "cats"]
}
metadata["conf"] = getattr(self.conf, "to_dict", lambda: {})()
store.get_storer(ARCH_KEY).attrs.metadata = metadata
# Store DataFrame with categorisation data
if self.is_categorised:
df_cat = pd.DataFrame.from_records(self.cats.astype("uint8").to_records(index=True))
store.put(ARCH_KEY_CAT, df_cat)
[docs] def extend(self, other, adapt_conf=True):
"""
Extend the TrackRun by appending elements from another TrackRun.
Parameters
---------
other: octant.core.TrackRun
Another TrackRun
adapt_conf: bool
Merge TrackSettings (.conf attribute) of each of the TrackRuns
This is done by retaining matching values and setting other to None
"""
# Check if category metadata match
if (self.size() > 0) and (other.size() > 0):
for attr in ["is_cat_inclusive", "is_categorised"]:
a, b = getattr(self, attr), getattr(other, attr)
if a != b:
raise ConcatenationError(
f"Categorisation metadata is different for '{attr}': {a} != {b}"
)
elif other.size() > 0:
for attr in ["is_cat_inclusive", "is_categorised"]:
setattr(self, attr, getattr(other, attr))
if getattr(self, "tstep_h", None) is None:
self.tstep_h = getattr(other, "tstep_h", None)
else:
if getattr(other, "tstep_h", None) is not None:
if self.tstep_h != other.tstep_h:
raise ConcatenationError(
"Extending by a TrackRun with different timestep is not allowed"
)
if adapt_conf and other.conf is not None:
if self.conf is None:
self.conf = other.conf.copy()
else:
for field in self.conf._fields:
if getattr(self.conf, field) != getattr(other.conf, field):
setattr(self.conf, field, None)
self.sources.extend(other.sources)
new_data = pd.concat([self.data, other.data], sort=False)
new_track_idx = new_data.index.get_level_values(0).to_series()
new_track_idx = new_track_idx.ne(new_track_idx.shift()).cumsum() - 1
mux = pd.MultiIndex.from_arrays(
[new_track_idx, new_data.index.get_level_values(1)], names=new_data.index.names
)
self.data = new_data.set_index(mux)
# Concatenate categories
if (self.cats is not None) or (other.cats is not None):
new_cats = pd.concat([self.cats, other.cats], sort=False).fillna(False)
new_track_idx = new_cats.index.get_level_values(0).to_series()
new_track_idx = new_track_idx.ne(new_track_idx.shift()).cumsum() - 1
ix = pd.Index(new_track_idx, name=new_cats.index.name)
self.cats = new_cats.set_index(ix)
[docs] def time_slice(self, start=None, end=None):
"""
Subset TrackRun by time using pandas boolean indexing.
Parameters
----------
start: str or datetime.datetime, optional
Start of the slice (inclusive)
stop: str or datetime.datetime, optional
End of the slice (inclusive)
Returns
-------
octant.core.TrackRun
Examples
--------
>>> from octant.core import TrackRun
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> sub_tr = tr.time_slice('2018-09-04', '2018-11-25')
"""
if (start is None) and (end is None):
return self
else:
crit = True
if start is not None:
crit &= self.data.time >= start
if end is not None:
crit &= self.data.time <= end
# Create a copy of this TrackRun
result = self.__class__()
result.extend(self)
# Replace data with TrackRun.data sliced by start or end
result.data = result.data[crit]
# Clear up sources to avoid confusion
result.sources = []
result.dirname = None
result.filelist = []
try:
result.conf.dt_start = None
result.conf.dt_end = None
except AttributeError:
pass
return result
[docs] def classify(self, conditions, inclusive=False, clear=True):
"""
Categorise the loaded tracks.
Parameters
----------
conditions: list
List of tuples. Each tuple is a (label, list) pair containing the category label and
a list of functions each of which has OctantTrack as its only argument.
The method assigns numbers to the labels in the same order
that they are given, starting from number 1 (see examples).
inclusive: bool, optional
If true, next category in the list is a subset of lower category;
otherwise categories are independent.
clear: bool, optional
If true, existing TrackRun categories are deleted.
Examples
--------
Simple check using OctantTrack properties
>>> from octant.core import TrackRun
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> def myfun(x):
bool_flag = (x.vortex_type != 0).sum() / x.shape[0] < 0.2
return bool_flag
>>> conds = [
('category_a', [lambda ot: ot.lifetime_h >= 6]), # only 1 function checking lifetime
('category_b', [myfun,
lambda ot: ot.gen_lys_dist_km > 300.0]) # 2 functions
]
>>> tr.classify(conds)
>>> tr.size('category_a'), tr.size('category_b')
31, 10
For more examples, see example notebooks.
See Also
--------
octant.misc.check_by_mask
"""
if clear:
self.clear_categories()
self.is_cat_inclusive = inclusive
cond_with_new_labels = []
for icond, (label, funcs) in enumerate(conditions):
if label == "all":
raise ArgumentError("'all' is not a permitted label")
if self.is_cat_inclusive:
lab = (
label
+ self._cat_sep * min(icond, 1)
+ self._cat_sep.join([j[0] for j in conditions[:icond][::-1]])
)
else:
lab = label
cond_with_new_labels.append((lab, funcs))
self.cats = pd.concat(
[
self.cats,
pd.DataFrame(
index=self.data.index.get_level_values(0).unique(),
columns=[cond[0] for cond in cond_with_new_labels],
).fillna(False),
],
axis="columns",
)
for i, ot in self._pbar(self.gb):
prev_flag = True
for label, funcs in cond_with_new_labels:
if self.is_cat_inclusive:
_flag = prev_flag
else:
_flag = True
for func in funcs:
_flag &= func(ot)
if _flag:
self.cats.loc[i, label] = True
if self.is_cat_inclusive:
prev_flag = _flag
self.is_categorised = True
[docs] def categorise(self, *args, **kwargs):
"""Alias for classify()."""
return self.classify(*args, **kwargs)
[docs] def categorise_by_percentile(self, by, subset=None, perc=95, oper="ge"):
"""
Categorise by percentile.
Parameters
----------
by: str or tuple
Property of OctantTrack to apply percentile to.
If tuple is passed, it should be of form (label, function), where the function
takes OctantTrack as the only parameter and returns one value for each track.
subset: str, optional
Subset of Trackrun to apply percentile to.
perc: float, optional
Percentile to define a category of cyclones.
E.g. (perc=95, oper='ge') is the top 5% cyclones.
oper: str, optional
Math operator to select track above or below the percentile threshold.
Can be one of (lt|le|gt|ge).
Examples
--------
Existing property: `max_vort`
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> tr.is_categorised
False
>>> tr.categorise_by_percentile("max_vort", perc=90, oper="gt")
>>> tr.cat_labels
['max_vort__gt__90pc']
>>> tr.size()
71
>>> tr.size("max_vort__gt__90pc")
7
Custom function
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> tr.is_categorised
False
>>> tr.categorise_by_percentile(by=("slp_min", lambda x: np.nanmin(x.slp.values)),
perc=20, oper="le")
>>> tr.cat_labels
['slp_min__le__20pc']
>>> tr.size("slp_min__le__10pc")
14
See Also
--------
octant.core.TrackRun.classify
"""
allowed_ops = ["lt", "le", "gt", "ge"]
if oper not in allowed_ops:
raise ArgumentError(f"oper={oper} should be one of {allowed_ops}")
op = getattr(operator, oper)
if subset is None:
label = ""
else:
label = f"{self._cat_sep}{subset}"
if isinstance(by, str):
by_label = by
func = lambda x: getattr(x, by) # noqa
else:
by_label, func = by
label = f"{by_label}__{oper}__{perc}pc" + label
v_per_track = self[subset].gb.apply(func)
if len(v_per_track) > 0:
# If this subset is not empty, create a new column in categories
new_col = pd.DataFrame(
index=self.data.index.get_level_values(0).unique(), columns=[label]
).fillna(False)
# Find numerical threshold with the given percentage
thresh = np.percentile(v_per_track, perc)
# Find all tracks above it
above_thresh = v_per_track[op(v_per_track, thresh)]
# Punch the corresponding elements in cats
new_col.loc[above_thresh.index, label] = True
# Append the new category column to cats
self.cats = pd.concat([self.cats, new_col], axis=1)
self.is_categorised = True
[docs] def clear_categories(self, subset=None, inclusive=None):
"""
Clear TrackRun of its categories.
If categories are inclusive, it destroys child categories
Parameters
----------
subset: str, optional
If None (default), all categories are removed.
inclusive: bool or None, optional
If supplied, is used instead of is_cat_inclusive attribute.
Examples
--------
Inclusive categories
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> tr.is_cat_inclusive
True
>>> tr.cat_labels
['pmc', 'max_vort__ge__90pc|pmc']
>>> tr.clear_categories(subset='pmc')
>>> tr.cat_labels
[]
Non-inclusive
>>> tr = TrackRun(path_to_directory_with_tracks)
>>> tr.cat_labels
['pmc', 'max_vort__ge__90pc|pmc']
>>> tr.clear_categories(subset='pmc', inclusive=False)
>>> tr.cat_labels
['max_vort__ge__90pc|pmc']
"""
if inclusive is not None:
inc = inclusive
else:
inc = self.is_cat_inclusive
if subset is None:
# clear all categories
self.cats = None
else:
# Do not use self[subset].blah = 0 ! - SettingWithCopyWarning
if inc:
self.cats = self.cats.drop(
columns=[col for col in self.cats.columns.values if subset in col]
)
else:
self.cats = self.cats.drop(columns=subset)
if len(self.cat_labels) == 0:
self.is_categorised = False
self.is_cat_inclusive = False
[docs] def match_tracks(
self,
others,
subset=None,
method="simple",
interpolate_to="other",
thresh_dist=250.0,
time_frac=0.5,
return_dist_matrix=False,
beta=100.0,
r_planet=EARTH_RADIUS,
):
"""
Match tracked vortices to a list of vortices from another data source.
Parameters
----------
others: list or octant.core.TrackRun
List of dataframes or a TrackRun instance
subset: str, optional
Subset (category) of TrackRun to match.
If not given, the matching is done for all categories.
method: str, optional
Method of matching (intersection|simple|bs2000)
interpolate_to: str, optional
Interpolate `TrackRun` times to `other` times, or vice versa
thresh_dist: float, optional
Radius (km) threshold of distances between vortices.
Used in 'intersection' and 'simple' methods
time_frac: float, optional
Fraction of a vortex lifetime used as a threshold in 'intersection'
and 'simple' methods
return_dist_matrix: bool, optional
Used when method='bs2000'. If True, the method returns a tuple
of matching pairs and distance matrix used to calculate them
beta: float, optional
Parameter used in 'bs2000' method
E.g. beta=100 corresponds to 10 m/s average steering wind
r_planet: float, optional
Radius of the planet in metres
Default: EARTH_RADIUS
Returns
-------
match_pairs: list
Index pairs of `other` vortices matching a vortex in `TrackRun`
in a form (<index of `TrackRun` subset>, <index of `other`>)
dist_matrix: numpy.ndarray
2D array, returned if return_dist_matrix=True
"""
# Recursive call for each of the available categies
if subset is None:
result = {}
for subset_key in self.cat_labels:
result[subset_key] = self.match_tracks(
others,
subset=subset_key,
method=method,
interpolate_to=interpolate_to,
thresh_dist=thresh_dist,
time_frac=time_frac,
return_dist_matrix=return_dist_matrix,
beta=beta,
r_planet=r_planet,
)
return result
# Select subset
sub_gb = self[subset].gb
if len(sub_gb) == 0 or len(others) == 0:
return []
if isinstance(others, list):
# match against a list of DataFrames of tracks
other_gb = pd.concat(
[OctantTrack.from_df(df) for df in others],
keys=range(len(others)),
names=self._mux_names,
).gb
elif isinstance(others, self.__class__):
# match against another TrackRun
other_gb = others[subset].gb
else:
raise ArgumentError('Argument "others" ' f"has a wrong type: {type(others)}")
match_pairs = []
if method == "intersection":
for idx, ot in self._pbar(sub_gb): # , desc="self tracks"):
for other_idx, other_ot in self._pbar(other_gb, leave=False):
times = other_ot.time.values
time_match_thresh = time_frac * (times[-1] - times[0]) / HOUR
intersect = pd.merge(other_ot, ot, how="inner", left_on="time", right_on="time")
n_match_times = intersect.shape[0]
if n_match_times > 0:
_tstep_h = intersect.time.diff().values[-1] / HOUR
dist = intersect[["lon_x", "lon_y", "lat_x", "lat_y"]].apply(
lambda x: great_circle(*x.values, r_planet=r_planet), axis=1
)
prox_time = (dist < (thresh_dist * KM2M)).sum() * _tstep_h
if (
n_match_times * _tstep_h > time_match_thresh
) and prox_time > time_match_thresh:
match_pairs.append((idx, other_idx))
break
elif method == "simple":
# TODO: explain
ll = ["lon", "lat"]
match_pairs = []
for other_idx, other_ct in self._pbar(other_gb): # , desc="other tracks"):
candidates = []
for idx, ct in self._pbar(sub_gb, leave=False): # , desc="self tracks"):
if interpolate_to == "other":
df1, df2 = ct.copy(), other_ct
elif interpolate_to == "self":
df1, df2 = other_ct, ct.copy()
l_start = max(df1.time.values[0], df2.time.values[0])
e_end = min(df1.time.values[-1], df2.time.values[-1])
if (e_end - l_start) / HOUR > 0:
# df1 = df1.set_index('time')[ll]
# ts = pd.Series(index=df2.time)
# new_df1 = (pd.concat([df1, ts]).sort_index()
# .interpolate(method='values')
# .loc[ts.index])[ll]
tmp_df2 = pd.DataFrame(
data={"lon": np.nan, "lat": np.nan, "time": df2.time}, index=df2.index
)
new_df1 = (
pd.concat([df1[[*ll, "time"]], tmp_df2], ignore_index=True, keys="time")
.set_index("time")
.sort_index()
.interpolate(method="values")
.loc[tmp_df2.time]
)[ll]
new_df1 = new_df1[~new_df1.lon.isnull()]
# thr = (time_frac * 0.5
# * (df2.time.values[-1] - df2.time.values[0]
# + df1.time.values[-1] - df2.time.values[0]))
thr = time_frac * df2.shape[0]
dist_diff = np.full(new_df1.shape[0], FILLVAL)
for i, ((x1, y1), (x2, y2)) in enumerate(
zip(new_df1[ll].values, df2[ll].values)
):
dist_diff[i] = great_circle(x1, x2, y1, y2, r_planet=r_planet)
within_r_idx = dist_diff < (thresh_dist * KM2M)
# if within_r_idx.any():
# if (new_df1[within_r_idx].index[-1]
# - new_df1[within_r_idx].index[0]) > thr:
# candidates.append((idx, within_r_idx.sum()))
if within_r_idx.sum() > thr:
candidates.append((idx, within_r_idx.sum()))
if len(candidates) > 0:
candidates = sorted(candidates, key=lambda x: x[1])
final_idx = candidates[-1][0]
match_pairs.append((final_idx, other_idx))
elif method == "bs2000":
# sub_list = [i[0] for i in list(sub_gb)]
sub_indices = list(sub_gb.indices.keys())
other_indices = list(other_gb.indices.keys())
dist_matrix = np.full((len(sub_gb), len(other_gb)), FILLVAL)
for i, (_, ct) in enumerate(self._pbar(sub_gb, leave=False)): # , desc="self tracks"):
x1, y1, t1 = ct.coord_view
for j, (_, other_ct) in enumerate(self._pbar(other_gb, leave=False)):
x2, y2, t2 = other_ct.coord_view
dist_matrix[i, j] = distance_metric(
x1, y1, t1, x2, y2, t2, beta=float(beta), r_planet=r_planet
)
for i, idx1 in enumerate(np.nanargmin(dist_matrix, axis=0)):
for j, idx2 in enumerate(np.nanargmin(dist_matrix, axis=1)):
if i == idx2 and j == idx1:
match_pairs.append((sub_indices[idx1], other_indices[idx2]))
if return_dist_matrix:
return match_pairs, dist_matrix
else:
raise ArgumentError(f"Unknown method: {method}")
return match_pairs
[docs] def density(
self,
lon1d,
lat1d,
by="point",
subset=None,
method="cell",
dist=222.0,
exclude_first={"m": 10, "d": 1},
exclude_last={"m": 4, "d": 30},
grid_centres=True,
weight_by_area=True,
r_planet=EARTH_RADIUS,
):
"""
Calculate different types of cyclone density for a given lon-lat grid.
- `point`: all points of all tracks
- `track`: each track only once for a given cell or circle
- `genesis`: starting positions (excluding start date of tracking)
- `lysis`: ending positions (excluding final date of tracking)
Parameters
----------
lon1d: numpy.ndarray
Longitude points array of shape (M,)
lat1d: numpy.ndarray
Latitude points array of shape (N,)
by: str, optional
Type of cyclone density (point|track|genesis|lysis)
subset: str, optional
Subset (category) of TrackRun to calculate density from.
If not given, the calculation is done for all categories.
method: str, optional
Method to calculate density (radius|cell)
dist: float, optional
Distance in km
Used when method='radius'
Default: ~2deg on Earth
exclude_first: dict, optional
Exclude start date (month, day)
exclude_last: dict, optional
Exclude end date (month, day)
grid_centres: bool, optional
If true, the function assumes that lon (M,) and lat (N,) arrays are grid centres
and calculates boundaries, arrays of shape (M+1,) and (N+1,) so that the density
values refer to centre points given.
If false, the density is calculated between grid points.
weight_by_area: bool, optional
Weight result by area of grid cells.
r_planet: float, optional
Radius of the planet in metres
Default: EARTH_RADIUS
Returns
-------
dens: xarray.DataArray
Array of track density of shape (M, N) with useful metadata in attrs
"""
# Recursive call for each of the available categies
if subset is None:
result = {}
for subset_key in self.cat_labels:
result[subset_key] = self.density(
lon1d,
lat1d,
by=by,
subset=subset_key,
method=method,
dist=dist,
exclude_first=exclude_first,
exclude_last=exclude_last,
grid_centres=grid_centres,
weight_by_area=weight_by_area,
r_planet=r_planet,
)
return result
# Redefine grid if necessary
if grid_centres:
# Input arrays are centres of grid cells, so cell boundaries need to be calculated
lon = cell_bounds(lon1d)
lat = cell_bounds(lat1d)
# Prepare coordinates for output
xlon = xr.IndexVariable(dims="longitude", data=lon1d, attrs={"units": "degrees_east"})
xlat = xr.IndexVariable(dims="latitude", data=lat1d, attrs={"units": "degrees_north"})
else:
# Input arrays are boundaries of grid cells, so cell centres need to be
# calculated for the output
lon, lat = lon1d, lat1d
# Prepare coordinates for output
xlon = xr.IndexVariable(
dims="longitude", data=cell_centres(lon1d), attrs={"units": "degrees_east"}
)
xlat = xr.IndexVariable(
dims="latitude", data=cell_centres(lat1d), attrs={"units": "degrees_north"}
)
# Create 2D mesh
lon2d, lat2d = np.meshgrid(lon, lat)
# Select subset
sub_df = self[subset]
# Prepare coordinates for cython
lon2d_c = lon2d.astype("double", order="C")
lat2d_c = lat2d.astype("double", order="C")
# Select method
if method == "radius":
# Convert radius to metres
dist_metres = dist * KM2M
units = f"per {round(np.pi * dist**2)} km2"
if by == "track":
cy_func = partial(track_density_rad, dist=dist_metres, r_planet=r_planet)
else:
cy_func = partial(point_density_rad, dist=dist_metres, r_planet=r_planet)
elif method == "cell":
# TODO: make this check more flexible
if (np.diff(lon2d[0, :]) < 0).any() or (np.diff(lat2d[:, 0]) < 0).any():
raise GridError("Grid values must be in an ascending order")
units = "1"
if by == "track":
cy_func = track_density_cell
else:
cy_func = point_density_cell
# Convert dataframe columns to C-ordered arrays
if by == "point":
sub_data = sub_df.lonlat_c
elif by == "track":
sub_data = sub_df.tridlonlat_c
elif by == "genesis":
sub_data = (
sub_df.gb.filter(_exclude_by_first_day, **exclude_first).xs(0, level="row_idx")
).lonlat_c
elif by == "lysis":
sub_data = (
self[subset].gb.tail(1).gb.filter(_exclude_by_last_day, **exclude_last)
).lonlat_c
else:
raise ArgumentError("`by` should be one of point|track|genesis|lysis")
data = cy_func(lon2d_c, lat2d_c, sub_data).base
if weight_by_area:
# calculate area in metres
area = grid_cell_areas(xlon.values, xlat.values, r_planet=r_planet)
data /= area
data *= KM2M * KM2M # convert to km^{-2}
units = "km-2"
dens = xr.DataArray(
data,
name=f"{by}_density",
attrs={"units": units, "subset": subset, "method": method},
dims=("latitude", "longitude"),
coords={"longitude": xlon, "latitude": xlat},
)
return dens