Skip to content

Bugfix phase_data unit conversion in correct step #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions input_parameters.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ nan_conversion: 1
# refnx/y: number of search grid points in x/y image dimensions
# refchipsize: size of the data window at each search grid point
# refminfrac: minimum fraction of valid (non-NaN) pixels in the data window
refx: -99.083
refy: 19.441
refx: -99.18
refy: 19.44
refnx: 5
refny: 5
refchipsize: 7
Expand Down
119 changes: 62 additions & 57 deletions pyrate/core/aps.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@
from pyrate.core import shared, ifgconstants as ifc, mpiops
from pyrate.core.covariance import cvd_from_phase, RDist
from pyrate.core.algorithm import get_epochs
from pyrate.core.shared import Ifg, Tile, EpochList
from pyrate.core.shared import Ifg, Tile, EpochList, nan_and_mm_convert
from pyrate.core.timeseries import time_series
from pyrate.merge import assemble_tiles
from pyrate.configuration import MultiplePaths, Configuration


def wrap_spatio_temporal_filter(params: dict) -> None:
def spatio_temporal_filter(params: dict) -> None:
"""
A wrapper for the spatio-temporal filter so it can be tested.
See docstring for spatio_temporal_filter.
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Firstly the incremental
time series is computed using the SVD method, before a cascade of temporal
then spatial Gaussian filters is applied. The resulting APS corrections are
saved to disc before being subtracted from each interferogram.

:param params: Dictionary of PyRate configuration parameters.
"""
if params[C.APSEST]:
Expand All @@ -61,44 +65,38 @@ def wrap_spatio_temporal_filter(params: dict) -> None:
log.debug('Finished APS correction')
return # return if True condition returned

aps_error_files_on_disc = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_error_files_on_disc):
log.warning("Reusing APS errors from previous run!!!")
for ifg_path, a in mpiops.array_split(list(zip(ifg_paths, aps_error_files_on_disc))):
phase = np.load(a)
_save_aps_corrected_phase(ifg_path, phase)
else:
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
aps_paths = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_paths):
log.warning('Reusing APS errors from previous run')
_apply_aps_correction(ifg_paths, aps_paths, params)
return

spatio_temporal_filter(tsincr, ifg_paths, params, preread_ifgs)
# obtain the incremental time series using SVD
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
shared.save_numpy_phase(ifg_paths, params)

# get lists of epochs and ifgs
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
epochlist = mpiops.run_once(get_epochs, ifgs)[0]

# first perform temporal high pass filter
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)

def spatio_temporal_filter(tsincr: np.ndarray, ifg_paths: List[str], params: dict,
preread_ifgs: dict) -> None:
"""
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Before performing this step,
the time series is computed using the SVD method. This function then
performs temporal and spatial filtering.

:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param ifg_paths: List of interferogram file paths
:param params: Dictionary of PyRate configuration parameters
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
"""
# second perform spatial low pass filter to obtain APS correction in ts domain
ifg = Ifg(ifg_paths[0]) # just grab any for parameters in slpfilter
ifg.open()
epochlist = mpiops.run_once(get_epochs, preread_ifgs)[0]
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)
ts_aps = spatial_low_pass_filter(ts_hp, ifg, params)
tsincr -= ts_aps

_ts_to_ifgs(tsincr, preread_ifgs, params)
ifg.close()

# construct APS corrections for each ifg
_make_aps_corrections(ts_aps, ifgs, params)

# apply correction to ifgs and save ifgs to disc.
_apply_aps_correction(ifg_paths, aps_paths, params)

# update/save the phase_data in the tiled numpy files
shared.save_numpy_phase(ifg_paths, params)


def _calc_svd_time_series(ifg_paths: List[str], params: dict, preread_ifgs: dict,
tiles: List[Tile]) -> np.ndarray:
Expand Down Expand Up @@ -150,42 +148,49 @@ def _assemble_tsincr(ifg_paths: List[str], params: dict, preread_ifgs: dict,
return np.dstack([v[1] for v in sorted(tsincr_g.items())])


def _ts_to_ifgs(tsincr: np.ndarray, preread_ifgs: dict, params: dict) -> None:
def _make_aps_corrections(ts_aps: np.ndarray, ifgs: List[Ifg], params: dict) -> None:
"""
Function that converts an incremental displacement time series into
interferometric phase observations. Used to re-construct an interferogram
network from a time series.
:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
Function to convert the time series APS filter output into interferometric
phase corrections and save them to disc.

:param ts_aps: Incremental APS time series array.
:param ifgs: List of Ifg class objects.
:param params: Dictionary of PyRate configuration parameters.
"""
log.debug('Reconstructing interferometric observations from time series')
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
_, n = mpiops.run_once(get_epochs, ifgs)
# get first and second image indices
_ , n = mpiops.run_once(get_epochs, ifgs)
index_first, index_second = n[:len(ifgs)], n[len(ifgs):]

num_ifgs_tuples = mpiops.array_split(list(enumerate(ifgs)))
num_ifgs_tuples = [(int(num), ifg) for num, ifg in num_ifgs_tuples]

for i, ifg in num_ifgs_tuples:
for i, ifg in [(int(num), ifg) for num, ifg in num_ifgs_tuples]:
# sum time slice data from first to second epoch
ifg_aps = np.sum(ts_aps[:, :, index_first[i]: index_second[i]], axis=2)
aps_error_on_disc = MultiplePaths.aps_error_path(ifg.tmp_path, params)
phase = np.sum(tsincr[:, :, index_first[i]: index_second[i]], axis=2)
np.save(file=aps_error_on_disc, arr=phase)
_save_aps_corrected_phase(ifg.tmp_path, phase)
np.save(file=aps_error_on_disc, arr=ifg_aps) # save APS as numpy array

mpiops.comm.barrier()

def _save_aps_corrected_phase(ifg_path: str, phase: np.ndarray) -> None:

def _apply_aps_correction(ifg_paths: List[str], aps_paths: List[str], params: dict) -> None:
"""
Save (update) interferogram metadata and phase data after
spatio-temporal filter (APS) correction.
Function to read and apply (subtract) APS corrections from interferogram data.
"""
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
ifg.phase_data[~np.isnan(ifg.phase_data)] = phase[~np.isnan(ifg.phase_data)]
# set aps tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
ifg.write_modified_phase()
ifg.close()
for ifg_path, aps_path in mpiops.array_split(list(zip(ifg_paths, aps_paths))):
# read the APS correction from numpy array
aps_corr = np.load(aps_path)
# open the Ifg object
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
# convert NaNs and convert to mm
nan_and_mm_convert(ifg, params)
# subtract the correction from the ifg phase data
ifg.phase_data[~np.isnan(ifg.phase_data)] -= aps_corr[~np.isnan(ifg.phase_data)]
# set meta-data tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
# write phase data to disc and close ifg.
ifg.write_modified_phase()
ifg.close()


def spatial_low_pass_filter(ts_hp: np.ndarray, ifg: Ifg, params: dict) -> np.ndarray:
Expand Down
3 changes: 2 additions & 1 deletion pyrate/core/dem_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _per_tile_setup(ifgs: list) -> Tuple[np.ndarray, np.ndarray, int, int, np.nd
nifgs = len(ifgs)
ifg_data = np.zeros((nifgs, nrows, ncols), dtype=np.float32)
for ifg_num in range(nifgs):
ifg_data[ifg_num] = ifgs[ifg_num].phase_data
ifg_data[ifg_num] = ifgs[ifg_num].phase_data # phase data is read from numpy files
mst = ~np.isnan(ifg_data)
ifg_time_span = np.zeros((nifgs))
for ifg_num in range(nifgs):
Expand Down Expand Up @@ -287,6 +287,7 @@ def _write_dem_errors(ifg_paths: list, params: dict, preread_ifgs: dict) -> None
for ifg_path in ifg_paths:
ifg = Ifg(ifg_path)
ifg.open()
shared.nan_and_mm_convert(ifg, params) # ensure we have phase data in mm
# read dem error correction file from tmpdir
dem_error_correction_ifg = assemble_tiles(shape, params[C.TMPDIR], tiles, out_type='dem_error_correction',
index=idx)
Expand Down
9 changes: 3 additions & 6 deletions pyrate/core/orbital.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,13 @@ def independent_orbital_correction(ifg_path, params):
fullres_dm = get_design_matrix(ifg0, degree, intercept=intercept)

ifg = shared.dem_or_ifg(ifg_path) if isinstance(ifg_path, str) else ifg_path
ifg_path = ifg.data_path
multi_path = MultiplePaths(ifg.data_path, params)
orb_on_disc = MultiplePaths.orb_error_path(ifg.data_path, params)

multi_path = MultiplePaths(ifg_path, params)
fullres_ifg = ifg # keep a backup
orb_on_disc = MultiplePaths.orb_error_path(ifg_path, params)
if not ifg.is_open:
ifg.open()

shared.nan_and_mm_convert(ifg, params)
fullres_ifg = ifg # keep a backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the idea is to keep a copy of the numpy phase data before the orbital correction, this line won't achieve that - x = y for numpy arrays just creates a new pointer to the same data, so any changes to y will also show up in x. I think this still applies to attributes of the Ifg class, but it should be easy to test this in a python prompt by importing the pyrate shared module, creating an Ifg and checking its behaviour. I'm not sure what this "backup" is used for, but it seems this problem has been here since before the PR so it's worth checking it isn't breaking anything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe the intent is to have two separate yet identical versions of the data at this point.

fullres_phase = fullres_ifg.phase_data

if orb_on_disc.exists():
Expand Down Expand Up @@ -270,7 +268,6 @@ def independent_orbital_correction(ifg_path, params):

# set orbfit meta tag and save phase to file
_save_orbital_error_corrected_phase(fullres_ifg, params)
fullres_ifg.close()


def __orb_correction(fullres_dm, mlooked_dm, fullres_phase, mlooked_phase, offset=False):
Expand Down
11 changes: 5 additions & 6 deletions pyrate/core/phase_closure/closure_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core.phase_closure.sum_closure import sum_phase_closures
from pyrate.core.phase_closure.plot_closure import plot_closure
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.logger import pyratelogger as log


Expand All @@ -34,23 +34,22 @@ def mask_pixels_with_unwrapping_errors(ifgs_breach_count: NDArray[(Any, Any, Any
params: dict) -> None:
"""
Find pixels in the phase data that breach closure_thr, and mask
(assign nans) to those pixels in those ifgs.
(assign NaNs) to those pixels in those ifgs.
:param ifgs_breach_count: unwrapping issues at pixels in all loops
:param num_occurrences_each_ifg: frequency of ifgs appearing in all loops
:param params: params dict
"""
log.debug("Updating phase data of retained ifgs")
log.debug("Masking phase data of retained ifgs")

for i, m_p in enumerate(params[C.INTERFEROGRAM_FILES]):
pix_index = ifgs_breach_count[:, :, i] == num_occurrences_each_ifg[i]
ifg = Ifg(m_p.tmp_sampled_path)
ifg.open()
ifg.nodata_value = params[C.NO_DATA_VALUE]
ifg.convert_to_nans()
nan_and_mm_convert(ifg, params)
ifg.phase_data[pix_index] = np.nan
ifg.write_modified_phase()

log.info(f"Updated phase data of {i + 1} retained ifgs after phase closure")
log.info(f"Masked phase data of {i + 1} retained ifgs after phase closure")
return None


Expand Down
7 changes: 3 additions & 4 deletions pyrate/core/ref_phs_est.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pyrate.constants as C
from pyrate.core import ifgconstants as ifc, shared
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg, nan_and_mm_convert
from pyrate.core import mpiops
from pyrate.configuration import Configuration
from pyrate.core.logger import pyratelogger as log
Expand Down Expand Up @@ -172,9 +172,8 @@ def _update_phase_and_metadata(ifgs, ref_phs, params):
"""
def __inner(ifg, ref_ph):
ifg.open()
# nan-convert before subtracting ref phase
ifg.nodata_value = params["noDataValue"]
ifg.convert_to_nans()
# nan-convert and mm-convert before subtracting ref phase
nan_and_mm_convert(ifg, params)
# add 1e-20 to avoid 0.0 values being converted to NaN downstream (Github issue #310)
# TODO: implement a more robust way of avoiding this issue, e.g. using numpy masked
# arrays to mark invalid pixel values rather than directly changing values to NaN
Expand Down
11 changes: 3 additions & 8 deletions pyrate/core/refpixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import pyrate.constants as C
from pyrate.core import ifgconstants as ifc
from pyrate.core import mpiops
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.shared import joblib_log_level
from pyrate.core.logger import pyratelogger as log
from pyrate.core import prepifg_helper
Expand All @@ -53,12 +53,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifg = Ifg(ifg_file)
log.debug("Open dataset")
ifg.open(readonly=True)
log.debug("Set no data value")
ifg.nodata_value = params["noDataValue"]
log.debug("Update no data values in dataset")
ifg.convert_to_nans()
log.debug("Convert mm")
ifg.convert_to_mm()
nan_and_mm_convert(ifg, params)
half_patch_size = params["refchipsize"] // 2
x, y = refx, refy
log.debug("Extract reference pixel windows")
Expand All @@ -76,7 +71,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifc.PYRATE_MEAN_REF_AREA: str(mean_ref_area),
ifc.PYRATE_STDDEV_REF_AREA: str(stddev_ref_area)
})

ifg.write_modified_phase()
ifg.close()


Expand Down
34 changes: 21 additions & 13 deletions pyrate/core/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,42 +405,49 @@ def phase_data(self):

def convert_to_mm(self):
"""
Convert phase data units from radians to millimetres.
Convert phase_data units from radians to millimetres.
Note: converted phase_data held in memory and not written to disc
(see shared.write_modified_phase)
"""
self.mm_converted = True
if self.dataset.GetMetadataItem(ifc.DATA_UNITS) == MILLIMETRES:
msg = '{}: ignored as previous phase unit conversion ' \
'already applied'.format(self.data_path)
self.mm_converted = True
msg = '{}: ignored as phase units are already ' \
'millimetres'.format(self.data_path)
log.debug(msg)
self.phase_data = self.phase_data
return
elif self.dataset.GetMetadataItem(ifc.DATA_UNITS) == RADIANS:
self.phase_data = convert_radians_to_mm(self.phase_data, self.wavelength)
self.meta_data[ifc.DATA_UNITS] = MILLIMETRES
self.mm_converted = True
# self.write_modified_phase()
# otherwise NaN's don't write to bytecode properly
# and numpy complains
# self.dataset.FlushCache()
msg = '{}: converted phase units to millimetres'.format(self.data_path)
log.debug(msg)
return
else: # pragma: no cover
msg = 'Phase units are not millimetres or radians'
raise IfgException(msg)

def convert_to_radians(self):
"""
return mm converted phase data into radians
In memory conversion but don't write on disc
Convert phase_data units from millimetres to radians.
Note: converted phase_data held in memory and not written to disc
(see shared.write_modified_phase)
"""
if self.meta_data[ifc.DATA_UNITS] == MILLIMETRES:
msg = '{}: ignored as previous phase unit conversion ' \
'already applied'.format(self.data_path)
log.debug(msg)
self.phase_data = convert_mm_to_radians(self.phase_data, wavelength=self.wavelength)
self.meta_data[ifc.DATA_UNITS] = RADIANS
self.mm_converted = False
msg = '{}: converted phase units to radians'.format(self.data_path)
log.debug(msg)
return
elif self.meta_data[ifc.DATA_UNITS] == RADIANS:
self.mm_converted = False
msg = '{}: ignored as phase units are already ' \
'radians'.format(self.data_path)
log.debug(msg)
return
else: # pragma: no cover
msg = 'Phase units are not millimetres or radians'
Expand Down Expand Up @@ -1256,13 +1263,13 @@ def __init__(self, path, tmp_path, nan_fraction, first, second, time_span,

def save_numpy_phase(ifg_paths, params):
"""
Save interferogram phase data as numpy array file on disk.
Split interferogram phase data in to tiles (if they exist in the params
dict) and save as numpy array files on disk.

:param list ifg_paths: List of strings for interferogram paths
:param list tiles: List of pyrate.shared.Tile instances
:param dict params: Dictionary of configuration parameters

:return: None, file saved to disk
:return: None, numpy file saved to disk
"""
tiles = params['tiles']
outdir = params[C.TMPDIR]
Expand All @@ -1281,6 +1288,7 @@ def save_numpy_phase(ifg_paths, params):
arr=p_data)
ifg.close()
mpiops.comm.barrier()
log.debug(f'Finished writing phase_data to numpy files in {outdir}')


def get_geotiff_header_info(ifg_path):
Expand Down
Loading