Source code for stsci.skypac.skyline

This module provides support for working with footprints
on the sky. Primary use case would use the following
generalized steps:

#. Initialize `SkyLine` objects for each input image.
   This object would be the union of all the input
   image's individual chips WCS footprints.

#. Determine overlap between all images. The
   determination would employ a recursive operation
   to return the extended list of all overlap values
   computed as [img1 vs [img2,img3,...,imgN],img2 vs

#. Select the pair with the largest overlap, or the
   pair which produces the largest overlap with the
   first input image. This defines the initial
   reference `SkyLine` object.

#. Perform some operation on the 2 images: for example,
   match sky in intersecting regions, or aligning
   second image with the first (reference) image.

#. Update the second image, either apply the sky value
   or correct the WCS, then generate a new `SkyLine`
   object for that image.

#. Create a new reference `SkyLine` object as the union
   of the initial reference object and the newly
   updated `SkyLine` object.

#. Repeat Steps 2-6 for all remaining input images.

This process will work reasonably fast as most operations
are performed using the `SkyLine` objects and WCS information
solely, not image data itself.

:Authors: Mihai Cara, Warren Hack, Pey-Lian Lim

:License: :doc:`LICENSE`

import sys
import os
from copy import copy, deepcopy
from os.path import basename
import numpy as np
import math

from stwcs import wcsutil
from astropy import wcs as pywcs
from stwcs.distortion.utils import output_wcs
from spherical_geometry.polygon import SphericalPolygon
    from import bitfield_to_boolean_mask
except ImportError:
    from import bitmask2mask as bitfield_to_boolean_mask

from .utils import (is_countrate, ext2str, MultiFileLog, ImageRef,
                    file_name_components, temp_mask_file, in_memory_mask,
from .parseat import FileExtMaskInfo, _Stat
from .hstinfo import supported_telescopes, supported_instruments, photcorr_kwd


__all__ = ['SkyLineMember', 'SkyLine']

[docs]class SkyLineMember(object): """ Container for :py:class:`SkyLine` members that holds information about properties of a *single* extension (chip) in a FITS image such as: * WCS of the chip image; * bounding spherical polygon; * file name and extension from which the chip's image has originated; * information required for unit conversions (``EXPTIME``, ``PHOTFLAM``, ``BUNIT``, etc.); * user mask and DQ array associated with chip's image data. """ def __init__(self, image, ext, dq_bits=0, dqimage=None, dqext=None, usermask=None, usermask_ext=None): """ Parameters ---------- image: ImageRef An :py:class:`~stsci.skypac.utils.ImageRef` object that refers to an open FITS file ext: tuple, int, str Extension specification in the `image` the `SkyLineMember` object will be associated with. An int `ext` specifies extension number. A tuple in the form (str, int) specifies extension name and number. A string `ext` specifies extension name and the extension version is assumed to be 1. See documentation for `` for examples. dq_bits: int, None (Default = 0) Integer sum of all the DQ bit values from the input `image`'s DQ array that should be considered "good" when building masks for sky computations. For example, if pixels in the DQ array can be combinations of 1, 2, 4, and 8 flags and one wants to consider DQ "defects" having flags 2 and 4 as being acceptable for sky computations, then `dq_bits` should be set to 2+4=6. Then a DQ pixel having values 2,4, or 6 will be considered a good pixel, while a DQ pixel with a value, e.g., 1+2=3, 4+8=12, etc. will be flagged as a "bad" pixel. | Default value (0) will make *all* non-zero pixels in the DQ mask to be considered "bad" pixels, and the corresponding image pixels will not be used for sky computations. | Set `dq_bits` to `None` to turn off the use of image's DQ array for sky computations. .. note:: DQ masks (if used), *will be* combined with user masks specified by the `usermask` parameter. dqimage: ImageRef An :py:class:`~stsci.skypac.utils.ImageRef` object that refers to an open FITS file that has DQ data of the input `image`. .. note:: When DQ data are located in the same FITS file as the science image data (e.g., HST/ACS, HST/WFC3, etc.), `dqimage` may point to the same :py:class:`~stsci.skypac.utils.ImageRef` object. In this case the reference count of the :py:class:`~stsci.skypac.utils.ImageRef` object must be increased adequately. dqext: tuple, int, str Extension specification of the `dqimage` that contains `image`'s DQ information. See help for `ext` for more details on acceptable formats for this parameter. usermask: ImageRef An :py:class:`~stsci.skypac.utils.ImageRef` object that refers to an open FITS file that has user mask data that indicate what pixels in the input `image` should be used for sky computations (``1``) and which pixels should **not** be used for sky computations (``0``). usermask_ext: tuple, int, str Extension specification of the `usermask` mask file that contains user's mask data that should be associated with the input `image` and `ext`. See help for `ext` for more details on acceptable formats for this parameter. """ assert(hasattr(self.__class__, '_initialized') and self.__class__._initialized) self._reset() # check that input images and extensions are valid -- # either integers or tuples of strings and integers, e.g., ('sci',1): _check_valid_imgext(image, 'image', ext, 'ext', can_img_be_None=False) if dq_bits is not None: if dqimage is None: dq_bits = 0 else: _check_valid_imgext(dqimage, 'dqimage', dqext, 'dqext') _check_valid_imgext(usermask, 'usermask', usermask_ext, 'usermask_ext') # get telescope, instrument, and detector info: self.telescope, self.instrument, self.detector = get_instrument_info( image, ext) # check dq_bits: if dq_bits is not None and not isinstance(dq_bits, int): if image: dqimage.release() if usermask: usermask.release() if dqimage: dqimage.release() raise TypeError( "Argument 'dq_bits' must be either an integer or None." ) # buld mask: self._buildMask(image.original_fname, ext, dq_bits, dqimage, dqext, usermask, usermask_ext) if dqimage: dqimage.release() if usermask: usermask.release() # save file, user mask, and DQ extension info: self._fname = image.original_fname self._basefname = basename(self._fname) self._image = image self._ext = ext self._can_free_image = (image.can_reload_data and self.optimize != 'speed') # check extension and create a string representation: try: extstr = ext2str(ext) except ValueError: raise ValueError("Unexpected extension type '{}' for file {}.". format(ext, self._basefname)) self._id = "{:s}[{:s}]".format(self._basefname, extstr) # extract WCS for bounding-box computation try: if hasattr(image.hdu[ext], 'wcs'): self._wcs = image.hdu[ext].wcs else: if self.telescope in supported_telescopes: self._wcs = wcsutil.HSTWCS(image.hdu, ext) else: self._wcs = pywcs.WCS(image.hdu[ext].header, image.hdu) if self._wcs is None: raise Exception("Invalid WCS.") except Exception as e: msg = "Unable to obtain WCS information for the file {:s}." \ .format(self._id) self._ml.error(msg) self._ml.flush() self._release_all() raise e # determine pixel scale: self._get_pixel_scale() # see if image data are in counts or count-rate # and compute count(-rate) to flux (per arcsec^2) conversion factor: self._brightness_conv_from_hdu(image.hdu, self._idcscale) # process Sky user's keyword and its value: self._init_skyuser(image.hdu[ext].header) # Set polygon to be the bounding box of the chip: self._polygon = SphericalPolygon.from_wcs(self.wcs, steps=1) @classmethod def init_class(cls, skyuser_kwd='SKYUSER', units_kwd='BUNIT', invsens_kwd=None, verbose=True, logfile=sys.stdout, clobber=False, optimize='balanced'): cls._skyuser_header_keyword = skyuser_kwd cls._units_header_keyword = units_kwd cls._inv_sensitivity_kwd = invsens_kwd cls._verbose = verbose cls._clobber = clobber if isinstance(optimize, str): cls._optimize = optimize.lower() else: raise TypeError("The 'optimize' argument must be a string object.") if cls._optimize not in ['balanced', 'speed', 'inmemory']: raise ValueError("Currently supported values for the " "'optimize' argument are: " "'balanced', 'speed', or 'inmemory'.") # Set-up log files: if isinstance(logfile, MultiFileLog): cls._ml = logfile else: cls._ml = MultiFileLog(console=verbose) if logfile not in ('', None): cls._ml.add_logfile(logfile) cls._ml.skip(2) cls._initialized = True def _clean(self): if len(self._files2clean) > 0: for f in self._files2clean: try: os.remove(f) except Exception: self._ml.warning("Unable to remove temporary file " "'{:s}'.", f) def _reset(self): self._image = None self._ext = None self._mask = None self._maskext = None self._files2clean = [] self._can_free_mask = False self._mask_is_imref = False self._id = '' self._pixscale = 1.0 self._is_countrate = False self._skyuser = 0.0 self._skyuser_delta = 0.0 self._polygon = None self._wcs = None self._inv_sensitivity = None self._exptime = 1.0 self._data2brightness_conv = 1.0 def _release_all(self): if self._image is not None: self._image.release() if self._mask_is_imref: self._mask.release() def close(self, clean=True): self._release_all() if clean: self._clean() self._reset() def _buildMask(self, image_fname, ext, dq_bits, dq, dqext, msk, mskext): if dq_bits is None or dq is None: # we will use only the user mask: if msk is not None: if (self._optimize == 'balanced' and msk.can_reload_data) or \ self._optimize == 'speed' or self._optimize == 'inmemory': # nothing to do: simply re-use the user mask: self._mask = msk self._maskext = mskext self._mask.hold() else: # self._optimize == 'balanced' but the mask cannot be freed # so we will create a temporary fits file to hold mask # data: maskdata = (msk.hdu[mskext].data != 0).astype(np.uint8) root, suffix, fext = file_name_components(image_fname) mfname, self._mask = temp_mask_file( maskdata, root, prefix='', suffix='skymatch_mask', ext=ext, randomize_prefix=False) self._files2clean.append(mfname) self._maskext = 0 self._can_free_mask = self._mask.can_reload_data self._mask_is_imref = True else: # no mask will be used in sky computations: self._mask = None self._can_free_mask = False self._mask_is_imref = False else: # compute a new mask by: # 1. applying dq_bits to DQ array # 2. combining previous array with the user mask data # # If dq_bits show the "bad" bits then DQ mask should be computed # using: # # dqmskarr = np.logical_not( # np.bitwise_and(dq.hdu[dqext].data,dq_bits) # ) # # However, to keep the same convention with astrodrizzle, dq_bits # will show the "good" bits that should be removed from the DQ # array: dqmskarr = bitfield_to_boolean_mask(dq.hdu[dqext].data, dq_bits, dtype=np.bool_) # 2. combine with user mask: if msk is not None: maskdata = (msk.hdu[mskext].data != 0) dqmskarr = np.logical_and(dqmskarr, maskdata) # create a temporary file with the combined mask: (root, suffix, fext) = file_name_components(image_fname) if self._optimize == 'inmemory': self._mask = in_memory_mask(dqmskarr.astype(np.uint8)) # strext = ext2str(ext, compact=True, default_extver=None) self._mask.original_fname = "{1:s}{0:s}{2:s}{0:s}{3:s}" \ .format('_', root, suffix, 'in-memory_skymatch_mask') else: mfname, self._mask = temp_mask_file( dqmskarr.astype(np.uint8), root, prefix='', suffix='skymatch_mask', ext=ext, randomize_prefix=False ) self._files2clean.append(mfname) self._maskext = 0 self._can_free_mask = self._mask.can_reload_data self._mask_is_imref = True def _init_skyuser(self, image_header): skyuser_kwd = self.get_skyuser_kwd() if skyuser_kwd in image_header: self._skyuser = image_header[skyuser_kwd] else: self._skyuser = 0.0 self._skyuser_delta = 0.0 def reset_skyuser_from_header(self): self._init_skyuser(self._image.hdu[self._ext].header) def _get_pixel_scale(self): self._idcscale = None nominal_pscale = None if hasattr(self._wcs, 'idcscale') and self._wcs.idcscale is not None: self._idcscale = self._wcs.idcscale nominal_pscale = 'IDCSCALE' elif 'PAMSCALE' in self._image.hdu[self._ext].header: self._idcscale = float(self._image.hdu[self._ext] .header['PAMSCALE']) nominal_pscale = 'PAMSCALE' elif 'IDCSCALE' in self._image.hdu[self._ext].header: self._idcscale = float(self._image.hdu[self._ext] .header['IDCSCALE']) nominal_pscale = 'IDCSCALE' elif 'PAMSCALE' in self._image.hdu[0].header: self._idcscale = float(self._image.hdu[0].header['PAMSCALE']) nominal_pscale = 'PAMSCALE' elif 'IDCSCALE' in self._image.hdu[0].header: self._idcscale = float(self._image.hdu[0].header['IDCSCALE']) nominal_pscale = 'IDCSCALE' # try to compute "actual" pixel scale from the CD matrix. #TODO: Add support for more WCS representations (PC, CDELT, CROTA) self._pixscale = None if self._wcs.wcs.has_cd(): self._pixscale = 3600.0 * math.sqrt( np.abs(np.linalg.det( ) else: if self._idcscale is not None: self._pixscale = self._idcscale self._ml.warning( "Unable to compute \"actual\" pixel scale for image " "{:s}[{:s}].\nWCS object does not have a CD matrix.\n" "Using the value of '{:s}' pixel scale for actual " "pixel scale: {:g}", self._basefname, ext2str(self._ext), nominal_pscale, self._idcscale) else: self._pixscale = 1.0 self._idcscale = 1.0 self._ml.warning( "Unable to determine pixel scale of image in file " "{:s}[{:s}].\nSetting pixel scale to 1.", self._basefname, ext2str(self._ext)) if self._idcscale is None: self._idcscale = self._pixscale self._ml.warning( "Unable to determine \"nominal\" pixel scale of image in " "file {:s}[{:s}].\nSetting pixel scale to \"actual\" pixel " "scale computed from CD matrix: {:g}", self._basefname, ext2str(self._ext), self._pixscale) def _brightness_conv_from_hdu(self, hdulist, pscale, primHDUname='PRIMARY'): #TODO: remove primHDUname from the argument list and # replace it with 0 once the bug in fileutil.getExtn() is fixed. # The bug causes imageObject[0] to return first image HDU instead # of the PRIMARY HDU. assert(pscale > 0.0) sci_header = hdulist[self.ext].header primary_header = hdulist[primHDUname].header inv_sensitivity_kwd = self.get_inv_sensitivity_kwd() # start with pixel-area scaling and add other conversion factors later self._data2brightness_conv = 1.0 / (pscale**2) # check if image data are in counts or count-rate self._is_countrate = is_countrate( hdulist, self.ext, units_kwd=self.get_units_kwd(), guess_if_missing=True, verbose=self.is_verbose(), flog=self._ml.unclose_copy() ) # Check that all the necessary information for conversion # to flux (brightness) units is available: if self.is_countrate is None: self._ml.warning( "Unable to determine units of data in file {0}.\n" "*ASSUMING* image data are \"COUNT-RATE\".", self._basefname ) # retrieve PHOTFLAM: # Initially assume that PHOTCORR was performed. For non-HST images # or unsupported instruments, we assume that PHOTCORR was performed, # and in the end we will check if PHOTFLAM (inverse sensitivity) # has a reasonable value (>0) if present. photcorr = inv_sensitivity_kwd is not None if 'TELESCOP' in primary_header: telescope = primary_header['TELESCOP'].strip().upper() else: telescope = None if telescope not in supported_telescopes: self._ml.warning("Skipping check of photometric correction " "step for\nnon-HST file '{:s}': " "Unsupported telescope.", self._basefname) telescope = None if telescope is not None: if 'INSTRUME' in primary_header: instrument = primary_header['INSTRUME'].strip().upper() if instrument not in supported_instruments: self._ml.warning("Skipping check of photometric " "correction step for\nfile '{:s}': " "Unsupported instrument '{:s}'.", self._basefname, instrument) instrument = None else: # For HST instruments we expect 'INSTRUME' to be present # in the primary header. errmsg = "Missing instrument information in '{:s}' data " \ "file '{:s}'.".format(telescope, self._basefname) self._ml.error(errmsg) self._ml.close() raise KeyError(errmsg) # see if photometric correction step was performed: if telescope is not None and instrument is not None and \ inv_sensitivity_kwd is not None: phot_switch = photcorr_kwd[instrument][0] phot_done = photcorr_kwd[instrument][1] if phot_switch in primary_header: phot_switch_val = primary_header[phot_switch].strip() if phot_switch_val.upper() != phot_done: photcorr = False self._ml.warning( "Photometric correction was not performed for data " "file {:s}.\nVariations in detector sensitivity " "WILL NOT be accounted for.", self._basefname ) self._inv_sensitivity = None else: # For HST instruments we expect ~'PHOTCORR' to be present # in the primary header. errmsg = "Missing photometry switch keyword '{:s}' in " \ "{:s}-{:s} data file '{:s}'." \ .format(phot_switch, telescope, instrument, self._basefname) self._ml.error(errmsg) self._ml.close() raise KeyError(errmsg) if photcorr: if inv_sensitivity_kwd in sci_header: self._inv_sensitivity = sci_header[inv_sensitivity_kwd] if self._inv_sensitivity > 0.0: self._data2brightness_conv *= self._inv_sensitivity else: self._ml.warning( "'{3:s}' value must be a *strictly* positive " "number.\nFound: '{3:s}' = {0} in extension " "{1:s} in data file '{2:s}'.\n'{3:s}' " "value will be ignored.\nVariations in detector " "sensitivity WILL NOT be accounted for.", self._inv_sensitivity, ext2str(self.ext), self._basefname, inv_sensitivity_kwd) self._inv_sensitivity = None elif inv_sensitivity_kwd in primary_header: self._inv_sensitivity = primary_header[inv_sensitivity_kwd] if self._inv_sensitivity > 0.0: self._data2brightness_conv *= self._inv_sensitivity else: self._ml.warning( "'{2:s}' value must be a *strictly* positive " "number.\nFound: '{2:s}' = {0} in the primary " "header in data file '{1:s}'.\n'{2:s}' " "value will be ignored.\nVariations in detector " "sensitivity WILL NOT be accounted for.", self._inv_sensitivity, self._basefname, inv_sensitivity_kwd) self._inv_sensitivity = None else: self._inv_sensitivity = None self._ml.warning( "Keyword '{2:s}' not found neither in the " "Primary header nor\nin the header of " "extension ({0:s}) in file {1:s}.\n" "Variations in detector sensitivity WILL NOT " "be accounted for.", ext2str(self.ext), self._basefname, inv_sensitivity_kwd) # retrieve EXPTIME: if 'EXPTIME' in primary_header: self._exptime = primary_header['EXPTIME'] if not self.is_countrate: if self._exptime > 0.0: self._data2brightness_conv /= self._exptime else: self._ml.warning( "'EXPTIME' value must be a *strictly* positive " "number.\nFound: 'EXPTIME' = {} in the primary " "header in data file '{:s}'.\n'EXPTIME' " "value will be ignored.\nVariations in exposure " "time WILL NOT be accounted for.", self._exptime, self._basefname ) self._exptime = None elif 'EXPTIME' in sci_header: self._exptime = sci_header['EXPTIME'] if not self.is_countrate: if self._exptime > 0.0: self._data2brightness_conv /= self._exptime else: self._ml.warning( "'EXPTIME' value must be a *strictly* positive " "number.\nFound: 'EXPTIME' = {} in extension " "{:s} in data file '{:s}'.\n'EXPTIME' " "value will be ignored.\nVariations in exposure " "time WILL NOT be accounted for.", self._exptime, ext2str(self.ext), self._basefname ) self._exptime = None else: self._exptime = None if not self.is_countrate: self._ml.warning( "Keyword 'EXPTIME' not found neither in the " "Primary header nor\nin the header of " "extension ({:s}) in file {:s}.\n" "Variations in exposure time WILL NOT be accounted for.", ext2str(self.ext), self._basefname ) def __repr__(self): return '%s(%r, %r, %r, %r)' % (self.__class__.__name__, self.fname, self.ext, self.wcs, self.polygon) @classmethod def get_skyuser_kwd(cls): return cls._skyuser_header_keyword @classmethod def set_skyuser_kwd(cls, kwd): cls._skyuser_header_keyword = kwd @classmethod def get_units_kwd(cls): return cls._units_header_keyword @classmethod def set_units_kwd(cls, kwd): cls._units_header_keyword = kwd @classmethod def get_inv_sensitivity_kwd(cls): return cls._inv_sensitivity_kwd @classmethod def set_inv_sensitivity_kwd(cls, kwd): cls._inv_sensitivity_kwd = kwd @classmethod def is_verbose(cls): return cls._verbose @classmethod def set_verbose(cls, verbose): cls._verbose = verbose @classmethod def optimize(cls): return cls._optimize @property def fname(self): return self._fname @property def basefname(self): return self._basefname @property def image_hdulist(self): return self._image.hdu @property def image_header(self): return self._image.hdu[self._ext].header @property def image_data(self): data = self._image.hdu[self._ext].data return data @image_data.setter def image_data(self, newdata): self._image.hdu[self._ext].data = newdata def free_image_data(self): if self._can_free_image: del self._image.hdu[self._ext].data @property def ext(self): return self._ext @property def maskext(self): return self._maskext @property def mask_data(self): if self._mask_is_imref: data = self._mask.hdu[self._maskext].data return data else: return self._mask def free_mask_data(self): if self._mask_is_imref and self._can_free_mask: del self._mask.hdu[self._maskext].data @property def wcs(self): return self._wcs @property def polygon(self): return self._polygon @property def inv_sensitivity(self): return self._inv_sensitivity @property def exptime(self): return self._exptime @property def data2brightness_conv(self): return self._data2brightness_conv @property def id(self): return self._id @property def skyuser(self): return self._skyuser @property def skyuser_delta(self): return self._skyuser_delta @property def skyuser_total(self): return self._skyuser + self._skyuser_delta @property def is_countrate(self): return self._is_countrate def update_skyuser(self, delta_skyval=None): if delta_skyval is None: self._skyuser += self._skyuser_delta else: self._skyuser += delta_skyval self._skyuser_delta = 0.0 def update_skydelta(self, delta_skyval): self._skyuser_delta += delta_skyval def set_skydelta(self, delta_skyval): self._skyuser_delta = delta_skyval def set_skyuser(self, skyval): self._skyuser = skyval self._skyuser_delta = 0.0 def data2brightness(self, data): return (self.data2brightness_conv * data) def brightness2data(self, brightness): return (brightness / self.data2brightness_conv)
[docs]class SkyLine(object): """ Manage outlines on the sky. Skylines are designed to capture and manipulate HST WCS image information as spherical polygons. They are represented by the :py:class:`~stsci.skypac.skyline.SkyLine` class, which is an extension of :py:class:`~spherical_geometry.polygon.SphericalPolygon` class. Each skyline has a list of members, `~stsci.skypac.skyline.SkyLine.members`, and a composite spherical polygon, `~stsci.skypac.skyline.SkyLine.polygon`, members. The polygon has all the functionalities of `~spherical_geometry.polygon.SphericalPolygon`. Each `SkyLine` has a list of `~SkyLine.members` and a composite `~SkyLine.polygon` with all the functionalities of `~spherical_geometry.polygon.SphericalPolygon`. Each member in `~stsci.skypac.skyline.SkyLine.members` belongs to the `~stsci.skyline.skyline.SkyLineMember` class, which contains image name (with path if given), science extension(s), and composite WCS and polygon of the extension(s). All skylines start out with a single member from a single image. When operations are used to find composite or intersecting skylines, the resulting skyline can have multiple members. For example, a skyline from an ACS/WFC full-frame image would give 1 member, which is a composite of extensions 1 and 4. A skyline from the union of 2 such images would have 2 members, and so forth. """ def __init__(self, mlist): """ Parameters ---------- fname: str FITS image. `None` to create empty `SkyLine`. ext: a list of tuples ('extname',extver). """ if isinstance(mlist, FileExtMaskInfo): self.init_from_imextmask_info(mlist) else: self.members = mlist self._skydiff = None def init_from_imextmask_info(self, fi): if fi.fnamesOnly: fi.convert2ImageRef() if not fi.finalized: fi.finalize() n = fi.count if n < 1: raise ValueError("Input 'FileExtMaskInfo' object must contain " "at least one valid extension specification.") im = fi.image if im.closed: raise ValueError("Input 'FileExtMaskInfo' object must contain " "at an opened image object.") if fi.DQimage.closed: dq = None dqext = n * [None] else: dq = fi.DQimage dqext = fi.dqext slm = [] for i in range(n): um = fi.mask_images[i] ume = fi.maskext[i] if um.closed: um = None ume = None im.hold() if dq is not None: dq.hold() if um is not None: um.hold() m = SkyLineMember(im, fi.fext[i], dq_bits=fi.dq_bits, dqimage=dq, dqext=dqext[i], usermask=um, usermask_ext=ume) if um is not None: um.release() slm.append(m) # SkyLine does not need im, dq anymore: im.release() if dq is not None: dq.release() self.members = slm def __getattr__(self, what): """ Control attribute access to `~spherical_geometry.polygon.SphericalPolygon`. """ if what in ('from_radec', 'from_cone', 'from_wcs', 'multi_union', 'multi_intersection', '_find_new_inside',): raise AttributeError("'%s' object has no attribute '%s'" (self.__class__.__name__, what)) else: return getattr(self.polygon, what) def __copy__(self): return deepcopy(self) def __repr__(self): return '%s(%r, %r)' % (self.__class__.__name__, self.polygon, self.members) def close(self, clean=True): for m in self.members: m.close(clean=clean) @property def skyuser_hdr_keyword(self): return self._skyuser_hdr_keyword @property def is_mf_mosaic(self): """ returns *True* if `SkyLine` members are from distinct image files (multi-file mosaic) and *False* otherwise. """ return self._is_mf_mosaic @property def polygon(self): """ `~spherical_geometry.polygon.SphericalPolygon` portion of `SkyLine` that contains the composite skyline from `members` belonging to *self*. """ return self._polygon @polygon.setter def polygon(self, value): assert isinstance(value, SphericalPolygon) self._polygon = copy(value) # Deep copy @property def skydiff(self): return self._skydiff @skydiff.setter def skydiff(self, sky): self._skydiff = sky @property def id(self): return self._id @property def members(self): """ List of `SkyLineMember` objects that belong to *self*. Duplicate members are discarded. Members are kept in the order of their additions to *self*. """ return self._members @members.setter def members(self, mlist): self.set_members(mlist=mlist, polygon=None) def set_members(self, mlist, polygon): if mlist is None: member_list = [] elif isinstance(mlist, list): member_list = mlist if [1 for m in mlist if not isinstance(m, SkyLineMember)]: raise ValueError("The 'mlist' argument must be either " "a single 'SkyLineMember' object or a " "Python list of 'SkyLineMember' objects.") elif isinstance(mlist, SkyLineMember): member_list = [mlist] else: raise ValueError("The 'mlist' argument must be either " "a single 'SkyLineMember' object or a " "Python list of 'SkyLineMember' objects.") self._members = [] # Not using set to preserve order n = len(member_list) if n == 0: if polygon is None: self.polygon = SphericalPolygon([]) else: self.polygon = deepcopy(polygon) self._id = '' self._is_mf_mosaic = False elif n == 1: assert isinstance(member_list[0], SkyLineMember) if polygon is None: self.polygon = deepcopy(member_list[0].polygon) else: self.polygon = deepcopy(polygon) self._id = member_list[0].id self._members.append(member_list[0]) self._is_mf_mosaic = False else: assert isinstance(member_list[0], SkyLineMember) if polygon is None: mpol = deepcopy(member_list[0].polygon) else: mpol = deepcopy(polygon) self._members.append(member_list[0]) for m in member_list[1:]: # Report corrupted members list instead of skipping assert isinstance(m, SkyLineMember) if m not in self._members: self._members.append(m) if polygon is None: mpol = mpol.union(m.polygon) self.polygon = mpol self._update_mosaic_flag_id() def _update_mosaic_flag_id(self, val=None): # updates _is_mf_mosaic flag and recomputes SkyLine's _id # val: # True - set to mosaic # False - set _is_mf_mosaic=False and recompute _id. # NOTE: No check will be done to see if the SkyLine is a "true" # mosaic. Therefore computed _id may be innacurate since it # will be computed based on the first member's file name. # None - autodetect status # if val is True: self._is_mf_mosaic = True self._id = 'mosaic' return elif val is False: self._is_mf_mosaic = False if len(self.members) == 0: self._id = '' return extlist = [] for m in self.members: if m.ext not in extlist: extlist.append(m.ext) self._id = self._id_from_fname_ext(self.members[0].basefname, extlist) return else: # autodetect status basefname = None fstats = [] nfnames = 0 extlist = [] for m in self.members: mstat = _Stat(m.fname) if mstat not in fstats: if basefname is None: basefname = m.basefname # store the first occurence fstats.append(mstat) nfnames += 1 if nfnames > 1: break if m.ext not in extlist: extlist.append(m.ext) if nfnames == 0: self._is_mf_mosaic = False self._id = '' elif nfnames == 1: self._is_mf_mosaic = False self._id = self._id_from_fname_ext(basefname, extlist) else: self._is_mf_mosaic = True self._id = "mosaic" def _indv_mem_wcslist(self): """List of original HSTWCS from each EXT in each member.""" wcs_list = [] for m in self.members: wcs_list.append(m.wcs) return wcs_list
[docs] def to_wcs(self): """ Combine `HSTWCS` objects from all `members` and return a new `HSTWCS` object. If no `members`, return `None`. .. warning:: This cannot return WCS of intersection. """ wcs_list = self._indv_mem_wcslist() n = len(wcs_list) if n > 1: wcs = output_wcs(wcs_list) elif n == 1: wcs = wcs_list[0] else: wcs = None return wcs
@staticmethod def _id_from_fname_ext(fname, ext_list): extdic = {} for ext in ext_list: if isinstance(ext, tuple): extname = ext[0].upper() extver = ext[1] elif isinstance(ext, int): extname = None extver = ext if extname in extdic: extdic[extname].append(extver) else: extdic.update({extname: [extver]}) strext = [] disctinct_ext = list(extdic.keys()) disctinct_ext.sort() for extname in disctinct_ext: if extname is None: strext.append("{:s}".format( ','.join(map(str, extdic[extname])))) else: strext.append("'{:s}',{:s}".format(extname, ','.join(map(str, extdic[extname])))) fextid = "{:s}[{:s}]".format(fname, ';'.join(strext)) return fextid def _draw_members(self, map, **kwargs): """ Draw individual extensions in members. Useful for debugging. Parameters ---------- map: Basemap axes object **kwargs: Any plot arguments to pass to basemap """ wcs_list = self._indv_mem_wcslist() for wcs in wcs_list: poly = SphericalPolygon.from_wcs(wcs) poly.draw(map, **kwargs) def _find_members(self, given_members): """ Find `SkyLineMember` in *given_members* that is in *self*. This is used for intersection. Parameters ---------- self: obj `SkyLine` instance. given_members: list List of `SkyLineMember` to consider. Returns ------- new_members: list List of `SkyLineMember` belonging to *self*. """ if len(list(self.points)) > 3: out_mem = [m for m in given_members if self.intersects_poly(m.polygon)] else: out_mem = [] return out_mem
[docs] def add_image(self, other): """ Return a new `SkyLine` that is the union of *self* and *other*. .. warning:: `SkyLine.union` only returns `polygon` without `members`. Parameters ---------- other: `SkyLine` object Examples -------- >>> s1 = SkyLine('image1.fits') >>> s2 = SkyLine('image2.fits') >>> s3 = s1.add_image(s2) """ newcls = self.__class__(None) newcls.polygon = self.union(other) newcls._members = [] for v in self.members: newcls._members.append(v) for v in other.members: if v not in newcls._members: newcls._members.append(v) if self.is_mf_mosaic or other.is_mf_mosaic: newcls._update_mosaic_flag_id(True) else: newcls._update_mosaic_flag_id(None) return newcls
[docs] def find_intersection(self, other): """ Return a new `SkyLine` that is the intersection of *self* and *other*. .. warning:: `SkyLine.intersection` only returns `polygon` without `members`. Parameters ---------- other: `SkyLine` object Examples -------- >>> s1 = SkyLine('image1.fits') >>> s2 = SkyLine('image2.fits') >>> s3 = s1.find_intersection(s2) """ newcls = self.__class__(None) mlist = newcls._find_members(self.members + other.members) poly = self.intersection(other) newcls.set_members(mlist, poly) return newcls
[docs] def find_max_overlap(self, skylines): """ Find `SkyLine` from a list of *skylines* that overlaps the most with *self*. Parameters ---------- skylines: list A list of `SkyLine` instances. Returns ------- max_skyline: `SkyLine` instance or `None` `SkyLine` that overlaps the most or `None` if no overlap found. This is *not* a copy. max_overlap_area: float Area of intersection. """ # from mpl_toolkits.basemap import Basemap # from matplotlib import pyplot as plt max_skyline = None max_overlap_area = 0.0 for next_s in skylines: try: intersect_poly = self.intersection(next_s) overlap_area = intersect_poly.area() except (ValueError, AssertionError): # m = Basemap() # self.polygon.draw(m) # next_s.polygon.draw(m) if SKYLINE_DEBUG: print('WARNING: Intersection failed for {0} and {1}. ' 'Ignoring {1}...'.format(, overlap_area = 0.0 else: raise if overlap_area > max_overlap_area: max_overlap_area = overlap_area max_skyline = next_s return max_skyline, max_overlap_area
[docs] @staticmethod def max_overlap_pair(skylines): """ Find a pair of skylines with maximum overlap. Parameters ---------- skylines: list A list of `SkyLine` instances. Returns ------- max_pair: tuple Pair of `SkyLine` objects with max overlap among given *skylines*. If no overlap found, return `None`. These are *not* copies. """ max_pair = None max_overlap_area = 0.0 for i in range(len(skylines) - 1): curr_s = skylines[i] next_s, i_area = curr_s.find_max_overlap(skylines[i + 1:]) if i_area > max_overlap_area: max_overlap_area = i_area max_pair = (curr_s, next_s) return max_pair
def _is_valid_ext(ext): if isinstance(ext, int): return True v = isinstance(ext, tuple) and len(ext) == 2 and \ isinstance(ext[0], str) and isinstance(ext[1], int) return v def _check_valid_imgext(image, imgargname, ext, extargname, can_img_be_None=True): # check image object: if (image is None and not can_img_be_None) or \ (image is not None and not isinstance(image, ImageRef)): raise ValueError("Input argument '{:s}' must be a valid " "'ImageRef' object.".format(imgargname)) # check extension: if image is not None and (ext is None or not _is_valid_ext(ext)): raise TypeError("Input argument '{:s}' must be either an " "integer or a tuple of the form ('str', int)." .format(extargname))