Skip to content

Bugfix phase_data unit conversion in correct step #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions input_parameters.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ nan_conversion: 1
# refnx/y: number of search grid points in x/y image dimensions
# refchipsize: size of the data window at each search grid point
# refminfrac: minimum fraction of valid (non-NaN) pixels in the data window
refx: -99.083
refy: 19.441
refx: -99.18
refy: 19.44
refnx: 5
refny: 5
refchipsize: 7
Expand Down
119 changes: 62 additions & 57 deletions pyrate/core/aps.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@
from pyrate.core import shared, ifgconstants as ifc, mpiops
from pyrate.core.covariance import cvd_from_phase, RDist
from pyrate.core.algorithm import get_epochs
from pyrate.core.shared import Ifg, Tile, EpochList
from pyrate.core.shared import Ifg, Tile, EpochList, nan_and_mm_convert
from pyrate.core.timeseries import time_series
from pyrate.merge import assemble_tiles
from pyrate.configuration import MultiplePaths, Configuration


def wrap_spatio_temporal_filter(params: dict) -> None:
def spatio_temporal_filter(params: dict) -> None:
"""
A wrapper for the spatio-temporal filter so it can be tested.
See docstring for spatio_temporal_filter.
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Firstly the incremental
time series is computed using the SVD method, before a cascade of temporal
then spatial Gaussian filters is applied. The resulting APS corrections are
saved to disc before being subtracted from each interferogram.

:param params: Dictionary of PyRate configuration parameters.
"""
if params[C.APSEST]:
Expand All @@ -61,44 +65,38 @@ def wrap_spatio_temporal_filter(params: dict) -> None:
log.debug('Finished APS correction')
return # return if True condition returned

aps_error_files_on_disc = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_error_files_on_disc):
log.warning("Reusing APS errors from previous run!!!")
for ifg_path, a in mpiops.array_split(list(zip(ifg_paths, aps_error_files_on_disc))):
phase = np.load(a)
_save_aps_corrected_phase(ifg_path, phase)
else:
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
aps_paths = [MultiplePaths.aps_error_path(i, params) for i in ifg_paths]
if all(a.exists() for a in aps_paths):
log.warning('Reusing APS errors from previous run')
_apply_aps_correction(ifg_paths, aps_paths, params)
return

spatio_temporal_filter(tsincr, ifg_paths, params, preread_ifgs)
# obtain the incremental time series using SVD
tsincr = _calc_svd_time_series(ifg_paths, params, preread_ifgs, tiles)
mpiops.comm.barrier()
shared.save_numpy_phase(ifg_paths, params)

# get lists of epochs and ifgs
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
epochlist = mpiops.run_once(get_epochs, ifgs)[0]

# first perform temporal high pass filter
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)

def spatio_temporal_filter(tsincr: np.ndarray, ifg_paths: List[str], params: dict,
preread_ifgs: dict) -> None:
"""
Applies a spatio-temporal filter to remove the atmospheric phase screen
(APS) and saves the corrected interferograms. Before performing this step,
the time series is computed using the SVD method. This function then
performs temporal and spatial filtering.

:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param ifg_paths: List of interferogram file paths
:param params: Dictionary of PyRate configuration parameters
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
"""
# second perform spatial low pass filter to obtain APS correction in ts domain
ifg = Ifg(ifg_paths[0]) # just grab any for parameters in slpfilter
ifg.open()
epochlist = mpiops.run_once(get_epochs, preread_ifgs)[0]
ts_hp = temporal_high_pass_filter(tsincr, epochlist, params)
ts_aps = spatial_low_pass_filter(ts_hp, ifg, params)
tsincr -= ts_aps

_ts_to_ifgs(tsincr, preread_ifgs, params)
ifg.close()

# construct APS corrections for each ifg
_make_aps_corrections(ts_aps, ifgs, params)

# apply correction to ifgs and save ifgs to disc.
_apply_aps_correction(ifg_paths, aps_paths, params)

# update/save the phase_data in the tiled numpy files
shared.save_numpy_phase(ifg_paths, params)


def _calc_svd_time_series(ifg_paths: List[str], params: dict, preread_ifgs: dict,
tiles: List[Tile]) -> np.ndarray:
Expand Down Expand Up @@ -150,42 +148,49 @@ def _assemble_tsincr(ifg_paths: List[str], params: dict, preread_ifgs: dict,
return np.dstack([v[1] for v in sorted(tsincr_g.items())])


def _ts_to_ifgs(tsincr: np.ndarray, preread_ifgs: dict, params: dict) -> None:
def _make_aps_corrections(ts_aps: np.ndarray, ifgs: List[Ifg], params: dict) -> None:
"""
Function that converts an incremental displacement time series into
interferometric phase observations. Used to re-construct an interferogram
network from a time series.
:param tsincr: incremental time series array of size (ifg.shape, nepochs-1)
:param preread_ifgs: Dictionary of shared.PrereadIfg class instances
Function to convert the time series APS filter output into interferometric
phase corrections and save them to disc.

:param ts_aps: Incremental APS time series array.
:param ifgs: List of Ifg class objects.
:param params: Dictionary of PyRate configuration parameters.
"""
log.debug('Reconstructing interferometric observations from time series')
ifgs = list(OrderedDict(sorted(preread_ifgs.items())).values())
_, n = mpiops.run_once(get_epochs, ifgs)
# get first and second image indices
_ , n = mpiops.run_once(get_epochs, ifgs)
index_first, index_second = n[:len(ifgs)], n[len(ifgs):]

num_ifgs_tuples = mpiops.array_split(list(enumerate(ifgs)))
num_ifgs_tuples = [(int(num), ifg) for num, ifg in num_ifgs_tuples]

for i, ifg in num_ifgs_tuples:
for i, ifg in [(int(num), ifg) for num, ifg in num_ifgs_tuples]:
# sum time slice data from first to second epoch
ifg_aps = np.sum(ts_aps[:, :, index_first[i]: index_second[i]], axis=2)
aps_error_on_disc = MultiplePaths.aps_error_path(ifg.tmp_path, params)
phase = np.sum(tsincr[:, :, index_first[i]: index_second[i]], axis=2)
np.save(file=aps_error_on_disc, arr=phase)
_save_aps_corrected_phase(ifg.tmp_path, phase)
np.save(file=aps_error_on_disc, arr=ifg_aps) # save APS as numpy array

mpiops.comm.barrier()

def _save_aps_corrected_phase(ifg_path: str, phase: np.ndarray) -> None:

def _apply_aps_correction(ifg_paths: List[str], aps_paths: List[str], params: dict) -> None:
"""
Save (update) interferogram metadata and phase data after
spatio-temporal filter (APS) correction.
Function to read and apply (subtract) APS corrections from interferogram data.
"""
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
ifg.phase_data[~np.isnan(ifg.phase_data)] = phase[~np.isnan(ifg.phase_data)]
# set aps tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
ifg.write_modified_phase()
ifg.close()
for ifg_path, aps_path in mpiops.array_split(list(zip(ifg_paths, aps_paths))):
# read the APS correction from numpy array
aps_corr = np.load(aps_path)
# open the Ifg object
ifg = Ifg(ifg_path)
ifg.open(readonly=False)
# convert NaNs and convert to mm
nan_and_mm_convert(ifg, params)
# subtract the correction from the ifg phase data
ifg.phase_data[~np.isnan(ifg.phase_data)] -= aps_corr[~np.isnan(ifg.phase_data)]
# set meta-data tags after aps error correction
ifg.dataset.SetMetadataItem(ifc.PYRATE_APS_ERROR, ifc.APS_REMOVED)
# write phase data to disc and close ifg.
ifg.write_modified_phase()
ifg.close()


def spatial_low_pass_filter(ts_hp: np.ndarray, ifg: Ifg, params: dict) -> np.ndarray:
Expand Down
29 changes: 7 additions & 22 deletions pyrate/core/covariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
radial average of its 2D autocorrelation.

:param str ifg_path: An interferogram file path. OR
:param Ifg class ifg_path: A pyrate.shared.Ifg class object
:param dict params: Dictionary of configuration parameters
:param ndarray r_dist: Array of distance values from the image centre
(See Rdist class for more details)
Expand All @@ -83,13 +82,8 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
:return: alpha: the exponential length-scale of decay factor
:rtype: float
"""

if isinstance(ifg_path, str): # used during MPI
ifg = shared.Ifg(ifg_path)
ifg.open()
else:
ifg = ifg_path

ifg = shared.Ifg(ifg_path)
ifg.open()
shared.nan_and_mm_convert(ifg, params)
# calculate 2D auto-correlation of image using the
# spectral method (Wiener-Khinchin theorem)
Expand All @@ -99,26 +93,17 @@ def cvd(ifg_path, params, r_dist, calc_alpha=False, write_vals=False, save_acg=F
phase = ifg.phase_data

maxvar, alpha = cvd_from_phase(phase, ifg, r_dist, calc_alpha, save_acg=save_acg, params=params)

if write_vals:
_add_metadata(ifg, maxvar, alpha)
ifg.add_metadata(**{
ifc.PYRATE_MAXVAR: str(maxvar),
ifc.PYRATE_ALPHA: str(alpha)
})

if isinstance(ifg_path, str):
ifg.close()
ifg.close()

return maxvar, alpha


def _add_metadata(ifg, maxvar, alpha):
"""
Convenience function for saving metadata to ifg
"""
md = ifg.meta_data
md[ifc.PYRATE_MAXVAR] = str(maxvar)
md[ifc.PYRATE_ALPHA] = str(alpha)
ifg.write_modified_phase()


def _save_cvd_data(acg, r_dist, ifg_path, outdir):
"""
Function to save numpy array of autocorrelation data to disk
Expand Down
3 changes: 2 additions & 1 deletion pyrate/core/dem_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _per_tile_setup(ifgs: list) -> Tuple[np.ndarray, np.ndarray, int, int, np.nd
nifgs = len(ifgs)
ifg_data = np.zeros((nifgs, nrows, ncols), dtype=np.float32)
for ifg_num in range(nifgs):
ifg_data[ifg_num] = ifgs[ifg_num].phase_data
ifg_data[ifg_num] = ifgs[ifg_num].phase_data # phase data is read from numpy files
mst = ~np.isnan(ifg_data)
ifg_time_span = np.zeros((nifgs))
for ifg_num in range(nifgs):
Expand Down Expand Up @@ -287,6 +287,7 @@ def _write_dem_errors(ifg_paths: list, params: dict, preread_ifgs: dict) -> None
for ifg_path in ifg_paths:
ifg = Ifg(ifg_path)
ifg.open()
shared.nan_and_mm_convert(ifg, params) # ensure we have phase data in mm
# read dem error correction file from tmpdir
dem_error_correction_ifg = assemble_tiles(shape, params[C.TMPDIR], tiles, out_type='dem_error_correction',
index=idx)
Expand Down
9 changes: 3 additions & 6 deletions pyrate/core/orbital.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,13 @@ def independent_orbital_correction(ifg_path, params):
fullres_dm = get_design_matrix(ifg0, degree, intercept=intercept)

ifg = shared.dem_or_ifg(ifg_path) if isinstance(ifg_path, str) else ifg_path
ifg_path = ifg.data_path
multi_path = MultiplePaths(ifg.data_path, params)
orb_on_disc = MultiplePaths.orb_error_path(ifg.data_path, params)

multi_path = MultiplePaths(ifg_path, params)
fullres_ifg = ifg # keep a backup
orb_on_disc = MultiplePaths.orb_error_path(ifg_path, params)
if not ifg.is_open:
ifg.open()

shared.nan_and_mm_convert(ifg, params)
fullres_ifg = ifg # keep a backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the idea is to keep a copy of the numpy phase data before the orbital correction, this line won't achieve that - x = y for numpy arrays just creates a new pointer to the same data, so any changes to y will also show up in x. I think this still applies to attributes of the Ifg class, but it should be easy to test this in a python prompt by importing the pyrate shared module, creating an Ifg and checking its behaviour. I'm not sure what this "backup" is used for, but it seems this problem has been here since before the PR so it's worth checking it isn't breaking anything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe the intent is to have two separate yet identical versions of the data at this point.

fullres_phase = fullres_ifg.phase_data

if orb_on_disc.exists():
Expand Down Expand Up @@ -270,7 +268,6 @@ def independent_orbital_correction(ifg_path, params):

# set orbfit meta tag and save phase to file
_save_orbital_error_corrected_phase(fullres_ifg, params)
fullres_ifg.close()


def __orb_correction(fullres_dm, mlooked_dm, fullres_phase, mlooked_phase, offset=False):
Expand Down
11 changes: 5 additions & 6 deletions pyrate/core/phase_closure/closure_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyrate.configuration import Configuration, MultiplePaths
from pyrate.core.phase_closure.sum_closure import sum_phase_closures
from pyrate.core.phase_closure.plot_closure import plot_closure
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.logger import pyratelogger as log


Expand All @@ -34,23 +34,22 @@ def mask_pixels_with_unwrapping_errors(ifgs_breach_count: NDArray[(Any, Any, Any
params: dict) -> None:
"""
Find pixels in the phase data that breach closure_thr, and mask
(assign nans) to those pixels in those ifgs.
(assign NaNs) to those pixels in those ifgs.
:param ifgs_breach_count: unwrapping issues at pixels in all loops
:param num_occurrences_each_ifg: frequency of ifgs appearing in all loops
:param params: params dict
"""
log.debug("Updating phase data of retained ifgs")
log.debug("Masking phase data of retained ifgs")

for i, m_p in enumerate(params[C.INTERFEROGRAM_FILES]):
pix_index = ifgs_breach_count[:, :, i] == num_occurrences_each_ifg[i]
ifg = Ifg(m_p.tmp_sampled_path)
ifg.open()
ifg.nodata_value = params[C.NO_DATA_VALUE]
ifg.convert_to_nans()
nan_and_mm_convert(ifg, params)
ifg.phase_data[pix_index] = np.nan
ifg.write_modified_phase()

log.info(f"Updated phase data of {i + 1} retained ifgs after phase closure")
log.info(f"Masked phase data of {i + 1} retained ifgs after phase closure")
return None


Expand Down
7 changes: 3 additions & 4 deletions pyrate/core/ref_phs_est.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pyrate.constants as C
from pyrate.core import ifgconstants as ifc, shared
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg
from pyrate.core.shared import joblib_log_level, nanmedian, Ifg, nan_and_mm_convert
from pyrate.core import mpiops
from pyrate.configuration import Configuration
from pyrate.core.logger import pyratelogger as log
Expand Down Expand Up @@ -172,9 +172,8 @@ def _update_phase_and_metadata(ifgs, ref_phs, params):
"""
def __inner(ifg, ref_ph):
ifg.open()
# nan-convert before subtracting ref phase
ifg.nodata_value = params["noDataValue"]
ifg.convert_to_nans()
# nan-convert and mm-convert before subtracting ref phase
nan_and_mm_convert(ifg, params)
# add 1e-20 to avoid 0.0 values being converted to NaN downstream (Github issue #310)
# TODO: implement a more robust way of avoiding this issue, e.g. using numpy masked
# arrays to mark invalid pixel values rather than directly changing values to NaN
Expand Down
11 changes: 3 additions & 8 deletions pyrate/core/refpixel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import pyrate.constants as C
from pyrate.core import ifgconstants as ifc
from pyrate.core import mpiops
from pyrate.core.shared import Ifg
from pyrate.core.shared import Ifg, nan_and_mm_convert
from pyrate.core.shared import joblib_log_level
from pyrate.core.logger import pyratelogger as log
from pyrate.core import prepifg_helper
Expand All @@ -53,12 +53,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifg = Ifg(ifg_file)
log.debug("Open dataset")
ifg.open(readonly=True)
log.debug("Set no data value")
ifg.nodata_value = params["noDataValue"]
log.debug("Update no data values in dataset")
ifg.convert_to_nans()
log.debug("Convert mm")
ifg.convert_to_mm()
nan_and_mm_convert(ifg, params)
half_patch_size = params["refchipsize"] // 2
x, y = refx, refy
log.debug("Extract reference pixel windows")
Expand All @@ -76,7 +71,7 @@ def update_refpix_metadata(ifg_paths, refx, refy, transform, params):
ifc.PYRATE_MEAN_REF_AREA: str(mean_ref_area),
ifc.PYRATE_STDDEV_REF_AREA: str(stddev_ref_area)
})

ifg.write_modified_phase()
ifg.close()


Expand Down
Loading