"""
This module provides support for working with footprints
on the sky. Primary use case would use the following
generalized steps:
#. Initialize `SkyLine` objects for each input image.
This object would be the union of all the input
image's individual chips WCS footprints.
#. Determine overlap between all images. The
determination would employ a recursive operation
to return the extended list of all overlap values
computed as [img1 vs [img2,img3,...,imgN],img2 vs
[img3,...,imgN],...]
#. Select the pair with the largest overlap, or the
pair which produces the largest overlap with the
first input image. This defines the initial
reference `SkyLine` object.
#. Perform some operation on the 2 images: for example,
match sky in intersecting regions, or aligning
second image with the first (reference) image.
#. Update the second image, either apply the sky value
or correct the WCS, then generate a new `SkyLine`
object for that image.
#. Create a new reference `SkyLine` object as the union
of the initial reference object and the newly
updated `SkyLine` object.
#. Repeat Steps 2-6 for all remaining input images.
This process will work reasonably fast as most operations
are performed using the `SkyLine` objects and WCS information
solely, not image data itself.
:Authors: Mihai Cara, Warren Hack, Pey-Lian Lim
:License: :doc:`LICENSE`
"""
# STDLIB
import sys
import os
from copy import copy, deepcopy
from os.path import basename
import numpy as np
import math
# THIRD-PARTY
from stwcs import wcsutil
from astropy import wcs as pywcs
from stwcs.distortion.utils import output_wcs
from spherical_geometry.polygon import SphericalPolygon
try:
from stsci.tools.bitmask import bitfield_to_boolean_mask
except ImportError:
from stsci.tools.bitmask import bitmask2mask as bitfield_to_boolean_mask
# LOCAL
from .utils import (is_countrate, ext2str, MultiFileLog, ImageRef,
file_name_components, temp_mask_file, in_memory_mask,
get_instrument_info)
from .parseat import FileExtMaskInfo, _Stat
from .hstinfo import supported_telescopes, supported_instruments, photcorr_kwd
# DEBUG
SKYLINE_DEBUG = False
__all__ = ['SkyLineMember', 'SkyLine']
[docs]class SkyLineMember(object):
"""
Container for :py:class:`SkyLine` members that holds information about
properties of a *single* extension (chip) in a FITS image such as:
* WCS of the chip image;
* bounding spherical polygon;
* file name and extension from which the chip's image has originated;
* information required for unit conversions (``EXPTIME``,
``PHOTFLAM``, ``BUNIT``, etc.);
* user mask and DQ array associated with chip's image data.
"""
def __init__(self, image, ext, dq_bits=0, dqimage=None, dqext=None,
usermask=None, usermask_ext=None):
"""
Parameters
----------
image: ImageRef
An :py:class:`~stsci.skypac.utils.ImageRef` object that refers
to an open FITS file
ext: tuple, int, str
Extension specification in the `image` the `SkyLineMember`
object will be associated with.
An int `ext` specifies extension number. A tuple in the form
(str, int) specifies extension name and number. A string `ext`
specifies extension name and the extension version is assumed
to be 1. See documentation for `astropy.io.fits.getData`
for examples.
dq_bits: int, None (Default = 0)
Integer sum of all the DQ bit values from the
input `image`'s DQ array that should be considered "good"
when building masks for sky computations. For example,
if pixels in the DQ array can be combinations of 1, 2, 4,
and 8 flags and one wants to consider DQ "defects" having
flags 2 and 4 as being acceptable for sky computations,
then `dq_bits` should be set to 2+4=6. Then a DQ pixel
having values 2,4, or 6 will be considered a good pixel,
while a DQ pixel with a value, e.g., 1+2=3, 4+8=12, etc.
will be flagged as a "bad" pixel.
| Default value (0) will make *all* non-zero
pixels in the DQ mask to be considered "bad" pixels,
and the corresponding image pixels will not be used
for sky computations.
| Set `dq_bits` to `None` to turn off the use of
image's DQ array for sky computations.
.. note::
DQ masks (if used), *will be* combined with user masks
specified by the `usermask` parameter.
dqimage: ImageRef
An :py:class:`~stsci.skypac.utils.ImageRef` object that refers
to an open FITS file that has DQ data of the input `image`.
.. note::
When DQ data are located in the same FITS file as the
science image data (e.g., HST/ACS, HST/WFC3, etc.),
`dqimage` may point to the
same :py:class:`~stsci.skypac.utils.ImageRef` object.
In this case the reference count of the
:py:class:`~stsci.skypac.utils.ImageRef` object must be
increased adequately.
dqext: tuple, int, str
Extension specification of the `dqimage` that contains
`image`'s DQ information. See help for `ext` for more
details on acceptable formats for this parameter.
usermask: ImageRef
An :py:class:`~stsci.skypac.utils.ImageRef` object that refers
to an open FITS file that has user mask data that indicate
what pixels in the input `image` should be used for sky
computations (``1``) and which pixels should **not** be used
for sky computations (``0``).
usermask_ext: tuple, int, str
Extension specification of the `usermask` mask file that
contains user's mask data that should be associated with
the input `image` and `ext`. See help for `ext` for more
details on acceptable formats for this parameter.
"""
assert(hasattr(self.__class__, '_initialized') and
self.__class__._initialized)
self._reset()
# check that input images and extensions are valid --
# either integers or tuples of strings and integers, e.g., ('sci',1):
_check_valid_imgext(image, 'image', ext, 'ext', can_img_be_None=False)
if dq_bits is not None:
if dqimage is None:
dq_bits = 0
else:
_check_valid_imgext(dqimage, 'dqimage', dqext, 'dqext')
_check_valid_imgext(usermask, 'usermask', usermask_ext, 'usermask_ext')
# get telescope, instrument, and detector info:
self.telescope, self.instrument, self.detector = get_instrument_info(
image, ext)
# check dq_bits:
if dq_bits is not None and not isinstance(dq_bits, int):
if image:
dqimage.release()
if usermask:
usermask.release()
if dqimage:
dqimage.release()
raise TypeError(
"Argument 'dq_bits' must be either an integer or None."
)
# buld mask:
self._buildMask(image.original_fname, ext, dq_bits,
dqimage, dqext, usermask, usermask_ext)
if dqimage:
dqimage.release()
if usermask:
usermask.release()
# save file, user mask, and DQ extension info:
self._fname = image.original_fname
self._basefname = basename(self._fname)
self._image = image
self._ext = ext
self._can_free_image = (image.can_reload_data and
self.optimize != 'speed')
# check extension and create a string representation:
try:
extstr = ext2str(ext)
except ValueError:
raise ValueError("Unexpected extension type '{}' for file {}.".
format(ext, self._basefname))
self._id = "{:s}[{:s}]".format(self._basefname, extstr)
# extract WCS for bounding-box computation
try:
if hasattr(image.hdu[ext], 'wcs'):
self._wcs = image.hdu[ext].wcs
else:
if self.telescope in supported_telescopes:
self._wcs = wcsutil.HSTWCS(image.hdu, ext)
else:
self._wcs = pywcs.WCS(image.hdu[ext].header, image.hdu)
if self._wcs is None:
raise Exception("Invalid WCS.")
except Exception as e:
msg = "Unable to obtain WCS information for the file {:s}." \
.format(self._id)
self._ml.error(msg)
self._ml.flush()
self._release_all()
raise e
# determine pixel scale:
self._get_pixel_scale()
# see if image data are in counts or count-rate
# and compute count(-rate) to flux (per arcsec^2) conversion factor:
self._brightness_conv_from_hdu(image.hdu, self._idcscale)
# process Sky user's keyword and its value:
self._init_skyuser(image.hdu[ext].header)
# Set polygon to be the bounding box of the chip:
self._polygon = SphericalPolygon.from_wcs(self.wcs, steps=1)
@classmethod
def init_class(cls, skyuser_kwd='SKYUSER', units_kwd='BUNIT',
invsens_kwd=None, verbose=True, logfile=sys.stdout,
clobber=False, optimize='balanced'):
cls._skyuser_header_keyword = skyuser_kwd
cls._units_header_keyword = units_kwd
cls._inv_sensitivity_kwd = invsens_kwd
cls._verbose = verbose
cls._clobber = clobber
if isinstance(optimize, str):
cls._optimize = optimize.lower()
else:
raise TypeError("The 'optimize' argument must be a string object.")
if cls._optimize not in ['balanced', 'speed', 'inmemory']:
raise ValueError("Currently supported values for the "
"'optimize' argument are: "
"'balanced', 'speed', or 'inmemory'.")
# Set-up log files:
if isinstance(logfile, MultiFileLog):
cls._ml = logfile
else:
cls._ml = MultiFileLog(console=verbose)
if logfile not in ('', None):
cls._ml.add_logfile(logfile)
cls._ml.skip(2)
cls._initialized = True
def _clean(self):
if len(self._files2clean) > 0:
for f in self._files2clean:
try:
os.remove(f)
except Exception:
self._ml.warning("Unable to remove temporary file "
"'{:s}'.", f)
def _reset(self):
self._image = None
self._ext = None
self._mask = None
self._maskext = None
self._files2clean = []
self._can_free_mask = False
self._mask_is_imref = False
self._id = ''
self._pixscale = 1.0
self._is_countrate = False
self._skyuser = 0.0
self._skyuser_delta = 0.0
self._polygon = None
self._wcs = None
self._inv_sensitivity = None
self._exptime = 1.0
self._data2brightness_conv = 1.0
def _release_all(self):
if self._image is not None:
self._image.release()
if self._mask_is_imref:
self._mask.release()
def close(self, clean=True):
self._release_all()
if clean:
self._clean()
self._reset()
def _buildMask(self, image_fname, ext, dq_bits, dq, dqext, msk, mskext):
if dq_bits is None or dq is None:
# we will use only the user mask:
if msk is not None:
if (self._optimize == 'balanced' and msk.can_reload_data) or \
self._optimize == 'speed' or self._optimize == 'inmemory':
# nothing to do: simply re-use the user mask:
self._mask = msk
self._maskext = mskext
self._mask.hold()
else:
# self._optimize == 'balanced' but the mask cannot be freed
# so we will create a temporary fits file to hold mask
# data:
maskdata = (msk.hdu[mskext].data != 0).astype(np.uint8)
root, suffix, fext = file_name_components(image_fname)
mfname, self._mask = temp_mask_file(
maskdata, root, prefix='',
suffix='skymatch_mask',
ext=ext, randomize_prefix=False)
self._files2clean.append(mfname)
self._maskext = 0
self._can_free_mask = self._mask.can_reload_data
self._mask_is_imref = True
else:
# no mask will be used in sky computations:
self._mask = None
self._can_free_mask = False
self._mask_is_imref = False
else:
# compute a new mask by:
# 1. applying dq_bits to DQ array
# 2. combining previous array with the user mask data
#
# If dq_bits show the "bad" bits then DQ mask should be computed
# using:
#
# dqmskarr = np.logical_not(
# np.bitwise_and(dq.hdu[dqext].data,dq_bits)
# )
#
# However, to keep the same convention with astrodrizzle, dq_bits
# will show the "good" bits that should be removed from the DQ
# array:
dqmskarr = bitfield_to_boolean_mask(dq.hdu[dqext].data, dq_bits,
dtype=np.bool_)
# 2. combine with user mask:
if msk is not None:
maskdata = (msk.hdu[mskext].data != 0)
dqmskarr = np.logical_and(dqmskarr, maskdata)
# create a temporary file with the combined mask:
(root, suffix, fext) = file_name_components(image_fname)
if self._optimize == 'inmemory':
self._mask = in_memory_mask(dqmskarr.astype(np.uint8))
# strext = ext2str(ext, compact=True, default_extver=None)
self._mask.original_fname = "{1:s}{0:s}{2:s}{0:s}{3:s}" \
.format('_', root, suffix, 'in-memory_skymatch_mask')
else:
mfname, self._mask = temp_mask_file(
dqmskarr.astype(np.uint8), root,
prefix='', suffix='skymatch_mask', ext=ext,
randomize_prefix=False
)
self._files2clean.append(mfname)
self._maskext = 0
self._can_free_mask = self._mask.can_reload_data
self._mask_is_imref = True
def _init_skyuser(self, image_header):
skyuser_kwd = self.get_skyuser_kwd()
if skyuser_kwd in image_header:
self._skyuser = image_header[skyuser_kwd]
else:
self._skyuser = 0.0
self._skyuser_delta = 0.0
def reset_skyuser_from_header(self):
self._init_skyuser(self._image.hdu[self._ext].header)
def _get_pixel_scale(self):
self._idcscale = None
nominal_pscale = None
if hasattr(self._wcs, 'idcscale') and self._wcs.idcscale is not None:
self._idcscale = self._wcs.idcscale
nominal_pscale = 'IDCSCALE'
elif 'PAMSCALE' in self._image.hdu[self._ext].header:
self._idcscale = float(self._image.hdu[self._ext]
.header['PAMSCALE'])
nominal_pscale = 'PAMSCALE'
elif 'IDCSCALE' in self._image.hdu[self._ext].header:
self._idcscale = float(self._image.hdu[self._ext]
.header['IDCSCALE'])
nominal_pscale = 'IDCSCALE'
elif 'PAMSCALE' in self._image.hdu[0].header:
self._idcscale = float(self._image.hdu[0].header['PAMSCALE'])
nominal_pscale = 'PAMSCALE'
elif 'IDCSCALE' in self._image.hdu[0].header:
self._idcscale = float(self._image.hdu[0].header['IDCSCALE'])
nominal_pscale = 'IDCSCALE'
# try to compute "actual" pixel scale from the CD matrix.
#TODO: Add support for more WCS representations (PC, CDELT, CROTA)
self._pixscale = None
if self._wcs.wcs.has_cd():
self._pixscale = 3600.0 * math.sqrt(
np.abs(np.linalg.det(self._wcs.wcs.cd))
)
else:
if self._idcscale is not None:
self._pixscale = self._idcscale
self._ml.warning(
"Unable to compute \"actual\" pixel scale for image "
"{:s}[{:s}].\nWCS object does not have a CD matrix.\n"
"Using the value of '{:s}' pixel scale for actual "
"pixel scale: {:g}",
self._basefname, ext2str(self._ext), nominal_pscale,
self._idcscale)
else:
self._pixscale = 1.0
self._idcscale = 1.0
self._ml.warning(
"Unable to determine pixel scale of image in file "
"{:s}[{:s}].\nSetting pixel scale to 1.",
self._basefname, ext2str(self._ext))
if self._idcscale is None:
self._idcscale = self._pixscale
self._ml.warning(
"Unable to determine \"nominal\" pixel scale of image in "
"file {:s}[{:s}].\nSetting pixel scale to \"actual\" pixel "
"scale computed from CD matrix: {:g}",
self._basefname, ext2str(self._ext), self._pixscale)
def _brightness_conv_from_hdu(self, hdulist, pscale,
primHDUname='PRIMARY'):
#TODO: remove primHDUname from the argument list and
# replace it with 0 once the bug in fileutil.getExtn() is fixed.
# The bug causes imageObject[0] to return first image HDU instead
# of the PRIMARY HDU.
assert(pscale > 0.0)
sci_header = hdulist[self.ext].header
primary_header = hdulist[primHDUname].header
inv_sensitivity_kwd = self.get_inv_sensitivity_kwd()
# start with pixel-area scaling and add other conversion factors later
self._data2brightness_conv = 1.0 / (pscale**2)
# check if image data are in counts or count-rate
self._is_countrate = is_countrate(
hdulist, self.ext, units_kwd=self.get_units_kwd(),
guess_if_missing=True, verbose=self.is_verbose(),
flog=self._ml.unclose_copy()
)
# Check that all the necessary information for conversion
# to flux (brightness) units is available:
if self.is_countrate is None:
self._ml.warning(
"Unable to determine units of data in file {0}.\n"
"*ASSUMING* image data are \"COUNT-RATE\".",
self._basefname
)
# retrieve PHOTFLAM:
# Initially assume that PHOTCORR was performed. For non-HST images
# or unsupported instruments, we assume that PHOTCORR was performed,
# and in the end we will check if PHOTFLAM (inverse sensitivity)
# has a reasonable value (>0) if present.
photcorr = inv_sensitivity_kwd is not None
if 'TELESCOP' in primary_header:
telescope = primary_header['TELESCOP'].strip().upper()
else:
telescope = None
if telescope not in supported_telescopes:
self._ml.warning("Skipping check of photometric correction "
"step for\nnon-HST file '{:s}': "
"Unsupported telescope.", self._basefname)
telescope = None
if telescope is not None:
if 'INSTRUME' in primary_header:
instrument = primary_header['INSTRUME'].strip().upper()
if instrument not in supported_instruments:
self._ml.warning("Skipping check of photometric "
"correction step for\nfile '{:s}': "
"Unsupported instrument '{:s}'.",
self._basefname, instrument)
instrument = None
else:
# For HST instruments we expect 'INSTRUME' to be present
# in the primary header.
errmsg = "Missing instrument information in '{:s}' data " \
"file '{:s}'.".format(telescope, self._basefname)
self._ml.error(errmsg)
self._ml.close()
raise KeyError(errmsg)
# see if photometric correction step was performed:
if telescope is not None and instrument is not None and \
inv_sensitivity_kwd is not None:
phot_switch = photcorr_kwd[instrument][0]
phot_done = photcorr_kwd[instrument][1]
if phot_switch in primary_header:
phot_switch_val = primary_header[phot_switch].strip()
if phot_switch_val.upper() != phot_done:
photcorr = False
self._ml.warning(
"Photometric correction was not performed for data "
"file {:s}.\nVariations in detector sensitivity "
"WILL NOT be accounted for.", self._basefname
)
self._inv_sensitivity = None
else:
# For HST instruments we expect ~'PHOTCORR' to be present
# in the primary header.
errmsg = "Missing photometry switch keyword '{:s}' in " \
"{:s}-{:s} data file '{:s}'." \
.format(phot_switch, telescope, instrument,
self._basefname)
self._ml.error(errmsg)
self._ml.close()
raise KeyError(errmsg)
if photcorr:
if inv_sensitivity_kwd in sci_header:
self._inv_sensitivity = sci_header[inv_sensitivity_kwd]
if self._inv_sensitivity > 0.0:
self._data2brightness_conv *= self._inv_sensitivity
else:
self._ml.warning(
"'{3:s}' value must be a *strictly* positive "
"number.\nFound: '{3:s}' = {0} in extension "
"{1:s} in data file '{2:s}'.\n'{3:s}' "
"value will be ignored.\nVariations in detector "
"sensitivity WILL NOT be accounted for.",
self._inv_sensitivity, ext2str(self.ext),
self._basefname, inv_sensitivity_kwd)
self._inv_sensitivity = None
elif inv_sensitivity_kwd in primary_header:
self._inv_sensitivity = primary_header[inv_sensitivity_kwd]
if self._inv_sensitivity > 0.0:
self._data2brightness_conv *= self._inv_sensitivity
else:
self._ml.warning(
"'{2:s}' value must be a *strictly* positive "
"number.\nFound: '{2:s}' = {0} in the primary "
"header in data file '{1:s}'.\n'{2:s}' "
"value will be ignored.\nVariations in detector "
"sensitivity WILL NOT be accounted for.",
self._inv_sensitivity, self._basefname,
inv_sensitivity_kwd)
self._inv_sensitivity = None
else:
self._inv_sensitivity = None
self._ml.warning(
"Keyword '{2:s}' not found neither in the "
"Primary header nor\nin the header of "
"extension ({0:s}) in file {1:s}.\n"
"Variations in detector sensitivity WILL NOT "
"be accounted for.",
ext2str(self.ext), self._basefname, inv_sensitivity_kwd)
# retrieve EXPTIME:
if 'EXPTIME' in primary_header:
self._exptime = primary_header['EXPTIME']
if not self.is_countrate:
if self._exptime > 0.0:
self._data2brightness_conv /= self._exptime
else:
self._ml.warning(
"'EXPTIME' value must be a *strictly* positive "
"number.\nFound: 'EXPTIME' = {} in the primary "
"header in data file '{:s}'.\n'EXPTIME' "
"value will be ignored.\nVariations in exposure "
"time WILL NOT be accounted for.",
self._exptime, self._basefname
)
self._exptime = None
elif 'EXPTIME' in sci_header:
self._exptime = sci_header['EXPTIME']
if not self.is_countrate:
if self._exptime > 0.0:
self._data2brightness_conv /= self._exptime
else:
self._ml.warning(
"'EXPTIME' value must be a *strictly* positive "
"number.\nFound: 'EXPTIME' = {} in extension "
"{:s} in data file '{:s}'.\n'EXPTIME' "
"value will be ignored.\nVariations in exposure "
"time WILL NOT be accounted for.",
self._exptime, ext2str(self.ext), self._basefname
)
self._exptime = None
else:
self._exptime = None
if not self.is_countrate:
self._ml.warning(
"Keyword 'EXPTIME' not found neither in the "
"Primary header nor\nin the header of "
"extension ({:s}) in file {:s}.\n"
"Variations in exposure time WILL NOT be accounted for.",
ext2str(self.ext), self._basefname
)
def __repr__(self):
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.fname,
self.ext, self.wcs, self.polygon)
@classmethod
def get_skyuser_kwd(cls):
return cls._skyuser_header_keyword
@classmethod
def set_skyuser_kwd(cls, kwd):
cls._skyuser_header_keyword = kwd
@classmethod
def get_units_kwd(cls):
return cls._units_header_keyword
@classmethod
def set_units_kwd(cls, kwd):
cls._units_header_keyword = kwd
@classmethod
def get_inv_sensitivity_kwd(cls):
return cls._inv_sensitivity_kwd
@classmethod
def set_inv_sensitivity_kwd(cls, kwd):
cls._inv_sensitivity_kwd = kwd
@classmethod
def is_verbose(cls):
return cls._verbose
@classmethod
def set_verbose(cls, verbose):
cls._verbose = verbose
@classmethod
def optimize(cls):
return cls._optimize
@property
def fname(self):
return self._fname
@property
def basefname(self):
return self._basefname
@property
def image_hdulist(self):
return self._image.hdu
@property
def image_header(self):
return self._image.hdu[self._ext].header
@property
def image_data(self):
data = self._image.hdu[self._ext].data
return data
@image_data.setter
def image_data(self, newdata):
self._image.hdu[self._ext].data = newdata
def free_image_data(self):
if self._can_free_image:
del self._image.hdu[self._ext].data
@property
def ext(self):
return self._ext
@property
def maskext(self):
return self._maskext
@property
def mask_data(self):
if self._mask_is_imref:
data = self._mask.hdu[self._maskext].data
return data
else:
return self._mask
def free_mask_data(self):
if self._mask_is_imref and self._can_free_mask:
del self._mask.hdu[self._maskext].data
@property
def wcs(self):
return self._wcs
@property
def polygon(self):
return self._polygon
@property
def inv_sensitivity(self):
return self._inv_sensitivity
@property
def exptime(self):
return self._exptime
@property
def data2brightness_conv(self):
return self._data2brightness_conv
@property
def id(self):
return self._id
@property
def skyuser(self):
return self._skyuser
@property
def skyuser_delta(self):
return self._skyuser_delta
@property
def skyuser_total(self):
return self._skyuser + self._skyuser_delta
@property
def is_countrate(self):
return self._is_countrate
def update_skyuser(self, delta_skyval=None):
if delta_skyval is None:
self._skyuser += self._skyuser_delta
else:
self._skyuser += delta_skyval
self._skyuser_delta = 0.0
def update_skydelta(self, delta_skyval):
self._skyuser_delta += delta_skyval
def set_skydelta(self, delta_skyval):
self._skyuser_delta = delta_skyval
def set_skyuser(self, skyval):
self._skyuser = skyval
self._skyuser_delta = 0.0
def data2brightness(self, data):
return (self.data2brightness_conv * data)
def brightness2data(self, brightness):
return (brightness / self.data2brightness_conv)
[docs]class SkyLine(object):
"""
Manage outlines on the sky.
Skylines are designed to capture and manipulate HST WCS image
information as spherical polygons. They are represented by
the :py:class:`~stsci.skypac.skyline.SkyLine` class, which is an
extension of :py:class:`~spherical_geometry.polygon.SphericalPolygon`
class.
Each skyline has a list of members,
`~stsci.skypac.skyline.SkyLine.members`, and a composite spherical polygon,
`~stsci.skypac.skyline.SkyLine.polygon`, members. The polygon has all
the functionalities of `~spherical_geometry.polygon.SphericalPolygon`.
Each `SkyLine` has a list of `~SkyLine.members` and
a composite `~SkyLine.polygon` with all the
functionalities of `~spherical_geometry.polygon.SphericalPolygon`.
Each member in `~stsci.skypac.skyline.SkyLine.members` belongs
to the `~stsci.skyline.skyline.SkyLineMember` class, which contains
image name (with path if given), science extension(s),
and composite WCS and polygon of the extension(s). All skylines start
out with a single member from a single image. When operations are used
to find composite or intersecting skylines, the
resulting skyline can have multiple members.
For example, a skyline from an ACS/WFC full-frame image would give 1
member, which is a composite of extensions 1 and 4. A skyline from the
union of 2 such images would have 2 members, and so forth.
"""
def __init__(self, mlist):
"""
Parameters
----------
fname: str
FITS image. `None` to create empty `SkyLine`.
ext: a list of tuples ('extname',extver).
"""
if isinstance(mlist, FileExtMaskInfo):
self.init_from_imextmask_info(mlist)
else:
self.members = mlist
self._skydiff = None
def init_from_imextmask_info(self, fi):
if fi.fnamesOnly:
fi.convert2ImageRef()
if not fi.finalized:
fi.finalize()
n = fi.count
if n < 1:
raise ValueError("Input 'FileExtMaskInfo' object must contain "
"at least one valid extension specification.")
im = fi.image
if im.closed:
raise ValueError("Input 'FileExtMaskInfo' object must contain "
"at an opened image object.")
if fi.DQimage.closed:
dq = None
dqext = n * [None]
else:
dq = fi.DQimage
dqext = fi.dqext
slm = []
for i in range(n):
um = fi.mask_images[i]
ume = fi.maskext[i]
if um.closed:
um = None
ume = None
im.hold()
if dq is not None:
dq.hold()
if um is not None:
um.hold()
m = SkyLineMember(im, fi.fext[i], dq_bits=fi.dq_bits,
dqimage=dq, dqext=dqext[i],
usermask=um, usermask_ext=ume)
if um is not None:
um.release()
slm.append(m)
# SkyLine does not need im, dq anymore:
im.release()
if dq is not None:
dq.release()
self.members = slm
def __getattr__(self, what):
"""
Control attribute access to
`~spherical_geometry.polygon.SphericalPolygon`.
"""
if what in ('from_radec', 'from_cone', 'from_wcs',
'multi_union', 'multi_intersection',
'_find_new_inside',):
raise AttributeError("'%s' object has no attribute '%s'"
(self.__class__.__name__, what))
else:
return getattr(self.polygon, what)
def __copy__(self):
return deepcopy(self)
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.polygon,
self.members)
def close(self, clean=True):
for m in self.members:
m.close(clean=clean)
@property
def skyuser_hdr_keyword(self):
return self._skyuser_hdr_keyword
@property
def is_mf_mosaic(self):
"""
returns *True* if `SkyLine` members are from distinct image files
(multi-file mosaic) and *False* otherwise.
"""
return self._is_mf_mosaic
@property
def polygon(self):
"""
`~spherical_geometry.polygon.SphericalPolygon` portion of `SkyLine`
that contains the composite skyline from `members`
belonging to *self*.
"""
return self._polygon
@polygon.setter
def polygon(self, value):
assert isinstance(value, SphericalPolygon)
self._polygon = copy(value) # Deep copy
@property
def skydiff(self):
return self._skydiff
@skydiff.setter
def skydiff(self, sky):
self._skydiff = sky
@property
def id(self):
return self._id
@property
def members(self):
"""
List of `SkyLineMember` objects that belong to *self*.
Duplicate members are discarded. Members are kept in
the order of their additions to *self*.
"""
return self._members
@members.setter
def members(self, mlist):
self.set_members(mlist=mlist, polygon=None)
def set_members(self, mlist, polygon):
if mlist is None:
member_list = []
elif isinstance(mlist, list):
member_list = mlist
if [1 for m in mlist if not isinstance(m, SkyLineMember)]:
raise ValueError("The 'mlist' argument must be either "
"a single 'SkyLineMember' object or a "
"Python list of 'SkyLineMember' objects.")
elif isinstance(mlist, SkyLineMember):
member_list = [mlist]
else:
raise ValueError("The 'mlist' argument must be either "
"a single 'SkyLineMember' object or a "
"Python list of 'SkyLineMember' objects.")
self._members = []
# Not using set to preserve order
n = len(member_list)
if n == 0:
if polygon is None:
self.polygon = SphericalPolygon([])
else:
self.polygon = deepcopy(polygon)
self._id = ''
self._is_mf_mosaic = False
elif n == 1:
assert isinstance(member_list[0], SkyLineMember)
if polygon is None:
self.polygon = deepcopy(member_list[0].polygon)
else:
self.polygon = deepcopy(polygon)
self._id = member_list[0].id
self._members.append(member_list[0])
self._is_mf_mosaic = False
else:
assert isinstance(member_list[0], SkyLineMember)
if polygon is None:
mpol = deepcopy(member_list[0].polygon)
else:
mpol = deepcopy(polygon)
self._members.append(member_list[0])
for m in member_list[1:]:
# Report corrupted members list instead of skipping
assert isinstance(m, SkyLineMember)
if m not in self._members:
self._members.append(m)
if polygon is None:
mpol = mpol.union(m.polygon)
self.polygon = mpol
self._update_mosaic_flag_id()
def _update_mosaic_flag_id(self, val=None):
# updates _is_mf_mosaic flag and recomputes SkyLine's _id
# val:
# True - set to mosaic
# False - set _is_mf_mosaic=False and recompute _id.
# NOTE: No check will be done to see if the SkyLine is a "true"
# mosaic. Therefore computed _id may be innacurate since it
# will be computed based on the first member's file name.
# None - autodetect status
#
if val is True:
self._is_mf_mosaic = True
self._id = 'mosaic'
return
elif val is False:
self._is_mf_mosaic = False
if len(self.members) == 0:
self._id = ''
return
extlist = []
for m in self.members:
if m.ext not in extlist:
extlist.append(m.ext)
self._id = self._id_from_fname_ext(self.members[0].basefname,
extlist)
return
else:
# autodetect status
basefname = None
fstats = []
nfnames = 0
extlist = []
for m in self.members:
mstat = _Stat(m.fname)
if mstat not in fstats:
if basefname is None:
basefname = m.basefname # store the first occurence
fstats.append(mstat)
nfnames += 1
if nfnames > 1:
break
if m.ext not in extlist:
extlist.append(m.ext)
if nfnames == 0:
self._is_mf_mosaic = False
self._id = ''
elif nfnames == 1:
self._is_mf_mosaic = False
self._id = self._id_from_fname_ext(basefname, extlist)
else:
self._is_mf_mosaic = True
self._id = "mosaic"
def _indv_mem_wcslist(self):
"""List of original HSTWCS from each EXT in each member."""
wcs_list = []
for m in self.members:
wcs_list.append(m.wcs)
return wcs_list
[docs] def to_wcs(self):
"""
Combine `HSTWCS` objects from all `members` and return
a new `HSTWCS` object. If no `members`, return `None`.
.. warning:: This cannot return WCS of intersection.
"""
wcs_list = self._indv_mem_wcslist()
n = len(wcs_list)
if n > 1:
wcs = output_wcs(wcs_list)
elif n == 1:
wcs = wcs_list[0]
else:
wcs = None
return wcs
@staticmethod
def _id_from_fname_ext(fname, ext_list):
extdic = {}
for ext in ext_list:
if isinstance(ext, tuple):
extname = ext[0].upper()
extver = ext[1]
elif isinstance(ext, int):
extname = None
extver = ext
if extname in extdic:
extdic[extname].append(extver)
else:
extdic.update({extname: [extver]})
strext = []
disctinct_ext = list(extdic.keys())
disctinct_ext.sort()
for extname in disctinct_ext:
if extname is None:
strext.append("{:s}".format(
','.join(map(str, extdic[extname]))))
else:
strext.append("'{:s}',{:s}".format(extname,
','.join(map(str, extdic[extname]))))
fextid = "{:s}[{:s}]".format(fname, ';'.join(strext))
return fextid
def _draw_members(self, map, **kwargs):
"""
Draw individual extensions in members.
Useful for debugging.
Parameters
----------
map: Basemap axes object
**kwargs: Any plot arguments to pass to basemap
"""
wcs_list = self._indv_mem_wcslist()
for wcs in wcs_list:
poly = SphericalPolygon.from_wcs(wcs)
poly.draw(map, **kwargs)
def _find_members(self, given_members):
"""
Find `SkyLineMember` in *given_members* that is in
*self*. This is used for intersection.
Parameters
----------
self: obj
`SkyLine` instance.
given_members: list
List of `SkyLineMember` to consider.
Returns
-------
new_members: list
List of `SkyLineMember` belonging to *self*.
"""
if len(list(self.points)) > 3:
out_mem = [m for m in given_members if
self.intersects_poly(m.polygon)]
else:
out_mem = []
return out_mem
[docs] def add_image(self, other):
"""
Return a new `SkyLine` that is the union of *self*
and *other*.
.. warning::
`SkyLine.union` only returns `polygon` without `members`.
Parameters
----------
other: `SkyLine` object
Examples
--------
>>> s1 = SkyLine('image1.fits')
>>> s2 = SkyLine('image2.fits')
>>> s3 = s1.add_image(s2)
"""
newcls = self.__class__(None)
newcls.polygon = self.union(other)
newcls._members = []
for v in self.members:
newcls._members.append(v)
for v in other.members:
if v not in newcls._members:
newcls._members.append(v)
if self.is_mf_mosaic or other.is_mf_mosaic:
newcls._update_mosaic_flag_id(True)
else:
newcls._update_mosaic_flag_id(None)
return newcls
[docs] def find_intersection(self, other):
"""
Return a new `SkyLine` that is the intersection of
*self* and *other*.
.. warning:: `SkyLine.intersection` only returns
`polygon` without `members`.
Parameters
----------
other: `SkyLine` object
Examples
--------
>>> s1 = SkyLine('image1.fits')
>>> s2 = SkyLine('image2.fits')
>>> s3 = s1.find_intersection(s2)
"""
newcls = self.__class__(None)
mlist = newcls._find_members(self.members + other.members)
poly = self.intersection(other)
newcls.set_members(mlist, poly)
return newcls
[docs] def find_max_overlap(self, skylines):
"""
Find `SkyLine` from a list of *skylines* that overlaps
the most with *self*.
Parameters
----------
skylines: list
A list of `SkyLine` instances.
Returns
-------
max_skyline: `SkyLine` instance or `None`
`SkyLine` that overlaps the most or `None` if no
overlap found. This is *not* a copy.
max_overlap_area: float
Area of intersection.
"""
# from mpl_toolkits.basemap import Basemap
# from matplotlib import pyplot as plt
max_skyline = None
max_overlap_area = 0.0
for next_s in skylines:
try:
intersect_poly = self.intersection(next_s)
overlap_area = intersect_poly.area()
except (ValueError, AssertionError):
# m = Basemap()
# self.polygon.draw(m)
# next_s.polygon.draw(m)
if SKYLINE_DEBUG:
print('WARNING: Intersection failed for {0} and {1}. '
'Ignoring {1}...'.format(self.id, next_s.id))
overlap_area = 0.0
else:
raise
if overlap_area > max_overlap_area:
max_overlap_area = overlap_area
max_skyline = next_s
return max_skyline, max_overlap_area
[docs] @staticmethod
def max_overlap_pair(skylines):
"""
Find a pair of skylines with maximum overlap.
Parameters
----------
skylines: list
A list of `SkyLine` instances.
Returns
-------
max_pair: tuple
Pair of `SkyLine` objects with max overlap
among given *skylines*. If no overlap found,
return `None`. These are *not* copies.
"""
max_pair = None
max_overlap_area = 0.0
for i in range(len(skylines) - 1):
curr_s = skylines[i]
next_s, i_area = curr_s.find_max_overlap(skylines[i + 1:])
if i_area > max_overlap_area:
max_overlap_area = i_area
max_pair = (curr_s, next_s)
return max_pair
def _is_valid_ext(ext):
if isinstance(ext, int):
return True
v = isinstance(ext, tuple) and len(ext) == 2 and \
isinstance(ext[0], str) and isinstance(ext[1], int)
return v
def _check_valid_imgext(image, imgargname, ext, extargname,
can_img_be_None=True):
# check image object:
if (image is None and not can_img_be_None) or \
(image is not None and not isinstance(image, ImageRef)):
raise ValueError("Input argument '{:s}' must be a valid "
"'ImageRef' object.".format(imgargname))
# check extension:
if image is not None and (ext is None or not _is_valid_ext(ext)):
raise TypeError("Input argument '{:s}' must be either an "
"integer or a tuple of the form ('str', int)."
.format(extargname))