Source code for stsci.skypac.utils

"""
This module provides utility functions for use
by :py:mod:`stsci.skypac` module.

:Authors: Mihai Cara

:License: :doc:`LICENSE`

"""
import sys
import os
import weakref
import tempfile
from os import path
from copy import copy, deepcopy

import numpy as np
from astropy.io import fits
from stsci.tools import fileutil, readgeis, convertwaiveredfits
from .hstinfo import (supported_telescopes, supported_instruments,
                      counts_only_instruments, rates_only_instruments)


__all__ = ['is_countrate', 'ext2str', 'MultiFileLog', 'ResourceRefCount',
           'ImageRef', 'openImageEx', 'count_extensions', 'get_ext_list',
           'get_extver_list', 'file_name_components', 'temp_mask_file',
           'get_instrument_info', 'almost_equal', 'skyval2txt']


[docs]def file_name_components(fname, detect_HST_FITS_suffix=True): """ Splits base file name into a root, suffix, and extension. Given a full path, this function extracts the base name, and splits it into three components: root name, suffix, and file extension. Parameters ---------- fname: str file name detect_HST_FITS_suffix: bool, optional If True, detects the suffix of most HST files by looking for the rightmost occurence of the underscore ('_') in the file name. Returns ------- root: str Root name of the file. When ``detect_HST_FITS_suffix`` is `True`, this is the part of the file name *preceding* the rightmost suffix separator ('_'). Otherwise, it is the base file name without file extension. suffix: str If ``detect_HST_FITS_suffix`` is `True`, this field will contain the suffix of most HST files, i.e., the part of the file name contained between the rightmost suffix separator ('_') and file extension separator. This return value will be an empty string if If ``detect_HST_FITS_suffix`` is `False` or if the file name has no extension separator. fext: str File extension Examples -------- >>> file_name_components('/data/m87/ua0x5001m_c0f.fits') ('ua0x5001m', 'c0f', '.fits') >>> file_name_components('/data/m87/ua0x5001m_c0f.fits',False) ('ua0x5001m_c0f', '', '.fits') """ # get root name of the image file: (root, fname) = path.split(fname) (base, fext) = path.splitext(fname) if detect_HST_FITS_suffix and fext.lower() in ['.fits', '.fit']: ind = base.rfind('_') if ind >= 0: root = base[:ind] suffix = base[ind + 1:] #TODO: Additional check may be performed here to see if # the 'suffix' is in the list of registered HST suffixes, # e.g., '_raw', '_flt', '_c0f', etc. return (root, suffix, fext) root = base suffix = '' return (root, suffix, fext)
def in_memory_mask(data): """ Creates an `ImageRef` object with that embeds an in memory (i.e., not attached to a file) ``HDUList`` containing the supplied mask data. Parameters ---------- data: numpy.ndarray Data to be used to create an in-memory FITS file. Data will be written in the primary HDU. Returns ------- ImageRef An open :py:class:`~skypac.utils.ImageRef` object of a temporary and in-memory (not attached to a file on disk) FITS file. Mask data will be in the Primary HDU. Examples -------- >>> import numpy np >>> from stsci import skypac >>> skypac.utils.in_memory_mask(data=mask) <skypac.utils.ImageRef object at 0x101f5a3d0> """ # create HDUList object hdu = fits.PrimaryHDU(data) hdulist = fits.HDUList([hdu]) imageref = ImageRef(ResourceRefCount(hdulist)) imageref.memmap = False imageref.can_reload_data = False return imageref ###################################################### ## Returned mask file is a simple FITS and the data are in the ## primary HDU (ext=0).
[docs]def temp_mask_file(data, rootname, prefix='tmp', suffix='mask', ext=('sci', 1), randomize_prefix=True, sep='_', dir=os.path.curdir, fnameOnly=False): """ temp_mask_file(rootname, suffix, ext, data, dir=os.path.curdir, \ fnameOnly=False) Saves 2D data array to temporary simple FITS file. The name of the emporary file is generated based on the input parameters. Parameters ---------- data: numpy array Data to be written to the temporary FITS file. Data will be written in the primary HDU. rootname: str Root name of the file. prefix: str, optional Prefix to be added in front of the root name. If ``randomize_prefix`` is `True`, then a random string will be added to the right of the string specified by ``prefix`` (with no separator between them). Prefix (or the randomized prefix) will be separated from the root name by the string specified in ``sep``. If ``prefix`` is an empty string (``''``) then no prefix will be prepended to the root file name. suffix: str, optional Suffix to be added to the root name. Suffix will be separated from the root name by the string specified in ``sep``. ext: int, str, or tuple of the form (str, int), optional Extention to be added to the temporary file *after* the suffix. Extension name string will be separated from the suffix by the string specified in ``sep``. sep: str, optional Separator string to be inserted between (randomized) prefix and root name, root name and suffix, and suffix and extension. randomize_prefix: bool, optional Specifies whether to add (postpend) a random string to string specified by ``prefix``. dir: str, optional Directory to which the temporary file should be written. If directory ``dir`` is `None` then the file will be written to the default (for more details, see the explanation for argument ``dir`` to the `tempfile.mstemp <http://docs.python.org/2/library/ tempfile.html#tempfile.mkstemp>`_ function). fnameOnly: bool, optional Specifies what should `temp_mask_file` return: file name of the created file (if ``fnameOnly`` is `True`), or a tuple with the file name of the created file and an open :py:class:`~skypac.utils.ImageRef` object of that file. Returns ------- fname: str File name of the temporary file. mask: ImageRef An open :py:class:`~skypac.utils.ImageRef` object of the temporary FITS file. This is returned as a tuple together with the file name only when ``fnameOnly`` is `False`. .. note:: Mask data will be in the Primary HDU. Raises ------ TypeError Extension specifier must be either an integer, a string, or a tuple of the form (str, int). Examples -------- >>> import numpy as np >>> from stsci import skypac >>> mask=np.ones((800,800),dtype=np.uint8) >>> skypac.utils.temp_mask_file(mask, 'ua0x5001m', ... suffix='skymatch_mask', ext=('sci',4), dir='/data/m87', ... fnameOnly=True) '/data/m87/tmp39gCpw_ua0x5001m_skymatch_mask_sci4.fits' >>> skypac.utils.temp_mask_file(mask, 'ua0x5001m', ... suffix='skymatch_mask', ext=('sci',4), dir='.', fnameOnly=True) 'tmpxl7LTO_ua0x5001m_skymatch_mask_sci4.fits' >>> skypac.utils.temp_mask_file(mask, 'ua0x5001m', ... suffix='skymatch_mask', ext=('sci',4), dir='.',fnameOnly=False) ('tmpxMcL5g_ua0x5001m_skymatch_mask_sci4.fits', \ <skypac.utils.ImageRef object at 0x101f5a3d0>) """ # convert extension to a string: try: strext = ext2str(ext, compact=True, default_extver=None) except Exception: raise TypeError("Extension specifier must be either an integer, " "a string, or a tuple of the form (\'str\', int).") if prefix is None: prefix = '' if not (prefix.isupper() and rootname.isupper() and suffix.isupper()): strext = strext.lower() dir = os.path.expanduser(dir) if randomize_prefix: # new suffix for the temporary file: newbase = "{0:s}{1:s}{0:s}{2:s}{0:s}{3:s}{4:s}{5:s}" \ .format(sep, rootname, suffix, strext, os.extsep, 'fits') # new temporary file handle: fh = tempfile.NamedTemporaryFile(prefix=prefix, suffix=newbase, dir=dir, delete=False, mode='wb') else: if prefix == '': fname = "{1:s}{0:s}{2:s}{0:s}{3:s}{4:s}{5:s}" \ .format(sep, rootname, suffix, strext, os.extsep, 'fits') else: fname = "{1:s}{0:s}{2:s}{0:s}{3:s}{0:s}{4:s}{5:s}{6:s}" \ .format(sep, prefix, rootname, suffix, strext, os.extsep, 'fits') full_name = os.path.join(dir, fname) fh = open(full_name, 'wb') # simplify path if 'dir' is current working directory: (pth, fname) = os.path.split(os.path.relpath(fh.name)) if dir != os.path.curdir and not (pth == os.path.curdir or pth == ''): fname = fh.name # create HDUList object hdu = fits.PrimaryHDU(data) hdulist = fits.HDUList([hdu]) # write data to the "temporary" file hdulist.writeto(fh) # clean-up hdulist.close() fh.close() del hdu, hdulist, fh if fnameOnly: return fname # open the "temporary" file and create a new ImageRef object: mask, dummy = openImageEx( fname, mode='readonly', memmap=False, saveAsMEF=False, clobber=False, imageOnly=True, openImageHDU=True, openDQHDU=False, preferMEF=True, verbose=False ) return (fname, mask)
[docs]def get_extver_list(img, extname='SCI'): """ Return a list of all extension versions with ``extname`` extension names. If ``extname`` is `None`, return extension **numbers** of all image-like extensions. .. note:: If input image is a `~skypac.utils.ImageRef`, this function will **not** modify its reference count. Parameters ---------- img: str, `astropy.io.fits.HDUList`, or `~skypac.utils.ImageRef` Input image object. If ``img`` is a string object (file name) then that file will be opened. If the file pointed to by the file name is a GEIS or WAIVER FITS file, it will be converted to a simple/MEF FITS format if ``clobber`` is `True`. extname: str, optional Indicates extension *name* for which all existing extension *versions* should be found. If ``extname`` is `None`, then `~skypac.utils.get_extver_list` will return a list of extension *numbers* of all image-like extensions. Returns ------- extver: list List of extension versions corresponding to the input ``extname``. If ``extname`` is `None`, it will return a list of extension *numbers* of all image-like extensions. Raises ------ IOError Unable to open input image file. TypeError Argument `img` must be either a file name (str), an `~.utils.ImageRef`, or a `astropy.io.fits.HDUList` object. TypeError Argument `extname` must be either a string or `None`. See Also -------- get_ext_list, count_extensions Examples -------- >>> get_extver_list('j9irw1rqq_flt.fits',extname='sci') [1, 2] >>> get_extver_list('j9irw1rqq_flt.fits',extname=None) [1, 2, 3, 4, 5, 6, 8, 9, 10, 11 """ doRelease = False if isinstance(img, fits.HDUList): hdulist = img elif isinstance(img, str): try: (img, dq) = openImageEx( img, mode='readonly', memmap=False, saveAsMEF=False, output_base_fitsname=None, clobber=False, imageOnly=True, openImageHDU=True, openDQHDU=False, preferMEF=False, verbose=False ) except IOError: raise IOError("Unable to open file: \'{:s}\'".format(img)) hdulist = img.hdu doRelease = True elif isinstance(img, ImageRef): assert(img.hdu is not None) img.hold() hdulist = img.hdu doRelease = True else: raise TypeError("Argument 'img' must be either a file name (string), " "an ImageRef, or a `astropy.io.fits.HDUList` object.") # when extver is None - return the range of all 'image'-like FITS # extensions if extname is None: extn = [] for i in range(len(hdulist)): hdr = hdulist[i].header if not ('NAXIS' in hdr and hdr['NAXIS'] == 2): continue if 'XTENSION' in hdr and \ hdr['XTENSION'].upper().strip() == 'IMAGE': extn.append(i) elif 'SIMPLE' in hdr: extn.append(i) if doRelease: img.release() return extn if not isinstance(extname, str): if doRelease: img.release() raise TypeError( "Argument 'extname' must be either a string indicating the value" "of the 'EXTNAME' keyword of the extensions whose versions are to " "be returned or None to return extension numbers of all HDUs in " "the 'img' FITS file." ) extname = extname.upper() extver = [] for e in hdulist: #if not isinstance(e, fits.ImageHDU): continue #hkeys = map(str.upper, e.header.keys()) if 'EXTNAME' in e.header and e.header['EXTNAME'].upper() == extname: extver.append(e.header['EXTVER'] if 'EXTVER' in e.header else 1) if doRelease: img.release() return extver
[docs]def get_ext_list(img, extname='SCI'): """ Return a list of all extension versions of ``extname`` extensions. ``img`` can be either a file name or a `astropy.io.fits.HDUList` object. This function is similar to :py:func:`get_extver_list`, the main difference being that it returns a list of fully qualified extensions: either tuples of the form `(extname, extver)` or integer extension numbers (when ``extname`` is `None`). See Also -------- get_extver_list, count_extensions Examples -------- >>> get_ext_list('j9irw1rqq_flt.fits',extname='SCI') [('SCI', 1), ('SCI', 2)] >>> get_ext_list('j9irw1rqq_flt.fits',extname=None) [1, 2, 3, 4, 5, 6, 8, 9, 10, 11] """ extver = get_extver_list(img=img, extname=extname) if extname is None: return extver extlist = [(extname, extv) for extv in extver] return extlist
[docs]def count_extensions(img, extname='SCI'): """ Return the number of `extname` extensions in the input image ``img``. If ``extname`` is `None`, return the number of all image-like extensions. Input parameters are identical to those of :py:func:`get_extver_list`. See Also -------- get_extver_list, get_ext_list Examples -------- >>> count_extensions('j9irw1rqq_flt.fits',extname='SCI') 2 >>> count_extensions('j9irw1rqq_flt.fits',extname=None) 10 """ return len(get_extver_list(img=img, extname=extname))
[docs]def ext2str(ext, compact=False, default_extver=1): """ Return a string representation of an extension specification. Parameters ---------- ext: tuple, int, str Extension specification can be a tuple of the form (str,int), e.g., ('sci',1), an integer (extension number), or a string (extension name). compact: bool, optional If ``compact`` is `True` the returned string will have extension name quoted and separated by a comma from the extension number, e.g., ``"'sci',1"``. If ``compact`` is `False` the returned string will have extension version immediately follow the extension name, e.g., ``'sci1'``. default_extver: int, optional Specifies the extension version to be used when the ``ext`` parameter is a string (extension name). Returns ------- strext: str String representation of extension specification ``ext``. Raises ------ TypeError Unexpected extension type. Examples -------- >>> ext2str('sci',compact=False,default_extver=6) "'sci',6" >>> ext2str(('sci',2)) "'sci',2" >>> ext2str(4) '4' >>> ext2str('dq') "'dq',1" >>> ext2str('dq',default_extver=2) "'dq',2" >>> ext2str('sci',compact=True,default_extver=2) 'sci2' """ if isinstance(ext, tuple) and len(ext) == 2 and \ isinstance(ext[0], str) and isinstance(ext[1], int): if compact: return "{:s}{:d}".format(ext[0], ext[1]) else: return "\'{:s}\',{:d}".format(ext[0], ext[1]) elif isinstance(ext, int): return "{:d}".format(ext) elif isinstance(ext, str): if default_extver is None: extver = '' else: extver = '{:d}'.format(default_extver) if compact: return "{:s}{:s}".format(ext, extver) else: return "\'{:s}\',{:s}".format(ext, extver) else: raise TypeError("Unexpected extension type.")
[docs]def is_countrate(hdulist, ext, units_kwd='BUNIT', guess_if_missing=True, telescope=None, instrument=None, verbose=True, flog=None): """ Infer the units of the data of the input image from the input image. Specifically, it tries to infer whether the units are counts (or count-like) or if the units are count-``rate``. The units of data are determined from the ``BUNIT`` header keyword by searching its value for the division sign ``'/'``. If the division sign is not found, then the units are assumed to be "counts". If the division sign is found in the ``BUNIT`` value and if the numerator is one of the following: 'ELECTRONS','COUNTS', or 'DN', and denumerator is either 'S','SEC', or 'SECOND', then the units are assumed to be count-rate. Parameters ---------- hdulist: `astropy.io.fits.HDUList` `astropy.io.fits.HDUList` of the image. ext: tuple, int, str Extension specification for whose data the units need to be inferred. An int ``ext`` specifies extension number. A tuple in the form (str, int) specifies extension name and number. A string ``ext`` specifies extension name and the extension version is assumed to be 1. See documentation for `astropy.io.fits.getData` for examples. units_kwd: str, optional FITS header keyword describing data units of the image. This keyword is assumed to be in the header of the extension specified by the ``ext`` parameter. guess_if_missing: bool, optional Instructs to try make best guess on image units when the keyword specified by ``units_kwd`` is not found in the image header. The first action will be to look for this keyword in the primary header, and if not found, infer the units based on the telescope, instrument, and detector information. telescope: str, None, optional Specifies the telescope from which the data came. If not specified, the value specified in the ``'TELESCOP'`` keyword in the primary header will be used. instrument: str, None, optional Specifies the instrument used for acquiring data. If not specified, the value specified in the ``'INSTRUME'`` keyword in the primary header will be used. verbose: bool, optional Specifies whether to print warning messages. flog: str, file, MultiFileLog, None, optional Specifies the log file to which the messages should be printed. It can be a file name, a file object, a `MultiFileLog` object, or `None`. Returns ------- bool, None Returns `True` if the units of the input image for a given extension are count-rate-like and `False` if the units are count-like. Returns `None` if the units cannot be inferred from the header. """ # Set-up log files: if isinstance(flog, MultiFileLog): ml = flog else: ml = MultiFileLog(console=verbose) if flog not in ('', None): ml.add_logfile(flog) ml.skip(2) data_units = ['ELECTRONS', 'COUNTS', 'DN'] time_units = ['S', 'SEC', 'SECOND'] fname = os.path.basename(hdulist.filename()) sci_header = hdulist[ext].header primary_header = hdulist[0].header if units_kwd is None: guess_if_missing = True bunit = None else: if units_kwd in sci_header: bunit = sci_header[units_kwd] elif guess_if_missing and units_kwd in primary_header: # look for units in the primary header bunit = primary_header[units_kwd] else: bunit = None if bunit is not None: bunit_parts = list(map(str.upper, list(map(str.strip, bunit.split('/'))))) nbunit_parts = len(bunit_parts) if nbunit_parts == 1 and bunit_parts[0] in data_units: ml.close() return False if nbunit_parts == 2 and bunit_parts[0] in data_units and \ bunit_parts[1] in time_units: ml.close() return True # else: ml.warning("Unable to infer units of data from the " "\'{:s}\' keyword in file \'{:s}\'.", units_kwd, fname) ml.close() return None if not guess_if_missing: ml.warning("Unable to infer units of data from the " "\'{:s}\' keyword in file \'{:s}\': Keyword not found.", units_kwd, fname) ml.close() return None # Now, try to guess the units from the 'INSTRUME' (and 'DETECTOR', # if necessary) keyword values. Return the default for a specific # 'INSTRUME'/'DETECTOR' combination. if telescope is None: if 'TELESCOP' in primary_header: telescope = primary_header['TELESCOP'].strip().upper() else: ml.warning("Unable to infer units of data for file " "\'{:s}\': Missing telescope information.", fname) ml.close() return None if telescope not in supported_telescopes: ml.warning("Unable to infer units of data for non-HST file " "\'{:s}\': Unsupported telescope.", fname) ml.close() return None if instrument is None: if 'INSTRUME' in primary_header: instrument = primary_header['INSTRUME'].strip().upper() else: ml.warning("Unable to infer units of data for file " "\'{:s}\': Missing instrument information.", fname) ml.close() return None if instrument not in supported_instruments: ml.warning("Unable to infer units of data for file " "\'{:s}\': Unsupported instrument.", fname) return None if instrument in counts_only_instruments: ml.close() return False if instrument in rates_only_instruments: ml.close() return True if instrument == 'WFC3': if 'DETECTOR' in sci_header: detector = sci_header['DETECTOR'].strip().upper() elif 'DETECTOR' in primary_header: detector = primary_header['DETECTOR'].strip().upper() else: ml.warning("Unable to infer units of data for WFC3 file " "\'{:s}\': Missing detector information.", fname) ml.close() return None if detector == 'UVIS': ml.close() return False elif detector == 'IR': ml.close() return True else: ml.warning("Unable to infer units of data for WFC3 file " "\'{:s}\': Unrecognized detector.", fname) ml.close() return None ml.close() if instrument == 'NICMOS': if 'UNITCORR' in sci_header and \ sci_header['UNITCORR'].strip().upper() == 'OMIT': return False else: # quietly assume units are in count-rate even when 'UNITCORR' # is not present: return True # Code should never reach this line raise RuntimeError("Logical error in \'is_countrate(...)\'.")
def get_instrument_info(image, ext): sci_header = image.hdu[ext].header primary_header = image.hdu[0].header telescope = None # check if ImageRef object has telescope info: if image.telescope is not None: telescope = image.telescope.strip().upper() if telescope in ['', 'UNKNOWN']: telescope = None # get telescope info from the primary header: if telescope is None and 'TELESCOP' in primary_header: telescope = primary_header['TELESCOP'].strip().upper() instrument = None # check if ImageRef object has instrument info: if image.instrument is not None: instrument = image.instrument.strip().upper() if instrument in ['', 'UNKNOWN']: instrument = None # get instrument info from the primary header: if instrument is None and 'INSTRUME' in primary_header: instrument = primary_header['INSTRUME'].strip().upper() # get detector info: detector = None if 'DETECTOR' in sci_header: if isinstance(sci_header['DETECTOR'], str): detector = sci_header['DETECTOR'].strip().upper() else: detector = sci_header['DETECTOR'] elif 'DETECTOR' in primary_header: if isinstance(primary_header['DETECTOR'], str): detector = primary_header['DETECTOR'].strip().upper() else: detector = primary_header['DETECTOR'] return (telescope, instrument, detector)
[docs]class MultiFileLog(object): """ This is a class that facilitates writting to multiple files. `MultiFileLog` stores multiple file objects and can write the same log entry to all of them. It also facilitates controlling when a function can close a log file. Finally, it provides some utility functions that automate such things as appending EOL at the end of the log entry, flushing the files (to avoid losing log entries in case of uncaught exceptions), displaying ``WARNING``, ``ERROR``, etc. in bold on standard streams (`sys.stdout`, etc.) Parameters ---------- console: bool, optional Enables writting to the standard output. enableBold: bool, optional Enable or disable writing bold text to console, e.g., ``'WARNING:'`` ``'ERROR:'``, etc. flog: str, file, None, list of str or file objects, optional File name or file object to be added to `MultiFileLog` during the initialization. More files can be added lated with :py:meth:`add_logfile`. append: bool, optional Default open mode for the files that need to be opened (e.g., that are passed as file names). If ``append`` is `True`, new files added to a `MultiFileLog` object will be opened in the "append" mode: new log entries will be appended to existing files -- same as mode 'a' of standard function :py:func:`open`. When ``append`` is `False`, any existing files will be overwritten. autoflush: bool, optional Indicates whether or not to flush files after each log entry. appendEOL: bool, optional Indicates whether or not to add EOL at the end of each log entry. """ def __init__(self, console=True, enableBold=True, flog=None, append=True, autoflush=True, appendEOL=True): self._console = sys.stdout if console else None self._eol = '\n' if appendEOL else '' self._fmode = 'a' if append else 'w' self._logfh = [] self._close = [] self._autoflush = autoflush self._enableBold = enableBold if flog: self.add_logfile(flog)
[docs] def unclose_copy(self): """ Return a copy of the `MultiFileLog` object with all the attached files marked as "keep open", that is, the :py:meth:`close` will not close these files. This is useful before passing the `MultiFileLog` object to a function that may add its own log files, and then attempt to close the files with the :py:meth:`close` method. Thus, by passing an "unclose copy", one can be sure that the files opened at the top level will not be closed by other functions to which the `MultiFileLog` object may be passed. """ newMultiFileLog = copy(self) newclose = self.count * [False] newMultiFileLog._close = newclose return newMultiFileLog
@property def count(self): """ Return the number of files attached to the `MultiFileLog` object excluding the `sys.stdout` file. """ return len(self._logfh)
[docs] def enable_console(self, enable=True, enableBold=True): """ Enable output to the standard console -- `sys.stdout`. Parameters ---------- enable: bool, optional Enable or disable output to `sys.stdout`. enableBold: bool, optional Enable or disable writing bold text to console, e.g., ``'WARNING:'``, ``'ERROR:'``, etc. """ if enable: if self._console is not None: return else: self._console = sys.stdout else: self._console = None self._enableBold = enableBold
[docs] def add_logfile(self, flog, initial_skip=2, can_close=None, mode=None): """ Add (and open, if necessary) a log file to the `MultiFileLog` object. Parameters ---------- flog: str, file, None, list of str or file objects, optional File name or file object to be added. initial_skip: int, optional The number of blank line to be written to the file *if* *not* at the beginning of the file. can_close: bool, None, optional Indicates whether the file object can be closed by the :py:meth:`~skypac.utils.MultiFileLog.close` function: * ``can_close`` is `True` -- file can be closed by the :py:meth:`~skypac.utils.MultiFileLog.close` function; * ``can_close`` is `False` -- file will not be closed by the :py:meth:`~skypac.utils.MultiFileLog.close` function; * ``can_close`` is `None` -- automatic selection based on the type of the `flog` argument: `True` if flog is a string (e.g., file name), `False` otherwise. mode: str, None, optional File open mode: same meaning as the ``mode`` parameter of the Python's built-in :py:func:`open` function. If `None`, the mode will be inherited from file open mode set during initialization. Returns ------- flog: file object File object of newly opened (or attached) file. """ if isinstance(flog, list): fhlist = [] for f in flog: fhlist.append(self.add_logfile(f)) return fhlist if flog is None: return None if flog == sys.stdout: self.enable_console(True) return None if isinstance(flog, str): fmode = self._fmode if mode is None else mode fh = open(flog, fmode) self._logfh.append(fh) self._close.append(True if can_close is None else can_close) # skip 'initial_skip' number of lines # if not at the beginning of the file: if fh.tell() > 0: fh.write(initial_skip * '\n') return fh if flog not in self._logfh: self._logfh.append(flog) self._close.append(False if can_close is None else can_close) # skip 'initial_skip' number of lines # if not at the beginning of the file: # TODO: may not work well in Windows if fgets was used previously # on the handle (see Python doc) if flog.tell() > 0: flog.write(initial_skip * '\n') return flog else: return None
[docs] def set_close_flag(self, fh, can_close=True): """ Modify the "can be closed" status of a given file. Parameters ---------- fh: file object can_close: bool, optional Indicates if the file can be closed by the :py:meth:`close` function. .. note:: File object ``fh`` **must** have been already added to the `MultiFileLog` object. """ if fh is None or fh not in self._logfh: return index = self._logfh.index(fh) self._close[index] = can_close
[docs] def close(self): """ Close all files *opened by* `~MultiFileLog`. It will close all files *opened by* `~MultiFileLog` - essentially, all files added as file name. It will *not* close files added as file handles. """ n = len(self._logfh) todel = [] assert(n == len(self._close)) for i in range(n): if self._close[i]: self._logfh[i].close() todel.append(i) # delete closed files from the list of file handles: for i in todel: del self._logfh[i] del self._close[i]
[docs] def flush(self): """ Flush all files. """ for fh in self._logfh: fh.flush()
[docs] def print_endlog_msg(self): """ Print a message ("Log written to...") indicating the end of the log. """ if self._console: for fh in self._logfh: if hasattr(fh, 'name'): self._console.write("Log written to \'{:s}\'.{}" .format(fh.name, self._eol)) else: self._console.write("Log written to an object of \'{}\'.{}" .format(type(fh), self._eol))
[docs] def logentry(self, msg, *fmt, **skip): """ Write a log entry to the log files. Parameters ---------- msg: str String to be printed. Can contain replacement fields delimited by braces ``{}``. fmt Parameters to be passed to :py:meth:`str.format` method for formatting the string ``msg``. skip: int Number of empty lines that should follow the log message. Examples -------- >>> logentry('Sky background for chip {} is {}', 'SCI1', 10.0, skip=2) will print: Sky background for chip SCI1 is 10.0 followed by two blank lines. """ if 'skip' in skip: nskip = skip['skip'] + 1 nskip = max(nskip, 0) else: nskip = 1 m = msg.format(*fmt) + nskip * self._eol # output to console if self._console: self._console.write(m) # output to disk log files for fh in self._logfh: fh.write(m) if self._autoflush: fh.flush()
[docs] def skip(self, nlines=1): """ Skip a specified number of blank lines. Parameters ---------- nlines: int, optional """ msg = nlines * '\n' if self._console: self._console.write(msg) for fh in self._logfh: fh.write(msg)
def eol(self): # output to console if self._console: self._console.write(self._eol) # output to disk log files for fh in self._logfh: fh.write(self._eol) if self._autoflush: fh.flush() def bold(self, bmsg, msg, *fmt): m = msg.format(*fmt) + self._eol if self._console: if self._enableBold: bm = "\x1B[1m{:s}: \x1B[0m".format(bmsg) else: bm = "*{:s}:* ".format(bmsg) self._console.write(bm + m) bm = "*{:s}:* ".format(bmsg) m = bm + m for fh in self._logfh: fh.write(m)
[docs] def warning(self, msg, *fmt): """ Prints a warning message. The word 'WARNING:' will be printed as bold on the console and enclosed in asterisks (*) in a disk file. """ self.bold("WARNING", msg, *fmt)
[docs] def error(self, msg, *fmt): """ Prints an error message. The word 'ERROR:' will be printed as bold on the console and enclosed in asterisks (*) in a disk file. """ self.bold("ERROR", msg, *fmt)
[docs] def important(self, msg, *fmt): """ Prints an "important" message. The word 'IMPORTANT:' will be printed as bold on the console and enclosed in asterisks (*) in a disk file. """ self.bold("IMPORTANT", msg, *fmt)
[docs]class ResourceRefCount(object): """ A class that implements reference counting for various resources: file objects, FITS HDU lists, etc. It is intended to be used as a mechanism of controlling the "lifespan" of resources that can be used in different parts of the code "indipendently". The resource is kept open as long as the reference count is larger than zero. Once the reference count is decreased to 0, the resource is automatically closed. .. note:: The reference count of the newly created `ResourceRefCount` object is set to 0. It is user's responsibility to increment the reference count of this object through the call to :py:meth:`hold` function. Parameters ---------- resource An object who must be kept open or closed based on reference count. close_args: tuple Positional arguments to be passed to the :py:meth:`resource_close_fnct` function. resource_close_fnct: function, optional The function (usually a method of the attached resource), that can "close" the resource. close_kwargs: dict Keyword arguments to be passed to the :py:meth:`resource_close_fnct` function. """ def __init__(self, resource, *close_args, resource_close_fnct=None, **close_kwargs): self._resource = resource self._count = 0 self._close_args = close_args self._close_kwargs = close_kwargs if resource_close_fnct is not None: self._res_close = resource_close_fnct elif hasattr(resource, 'close'): self._res_close = resource.close else: self._res_close = (lambda: None) self._close_args = () self._close_kwargs = {} self._close_notify = weakref.WeakKeyDictionary()
[docs] def subscribe_close_notify(self, obj, callback=None): """ Set the object and its method that need to be called when the resource is about to be closed. """ if not callback: self.unsubscribe_close_notify(obj) return self._close_notify[obj] = callback
[docs] def unsubscribe_close_notify(self, obj): """ Remove the object (and its method) from the list of callbacks that need to be notified of impending closing of the resource. """ if obj in self._close_notify: del self._close_notify[obj]
[docs] def is_subscribed_on_close(self, obj): """ Check if the object is subscribed to on close notifications. """ return obj in self._close_notify
def _notify_subscribers(self): for n in self._close_notify: if not n: continue cb = self._close_notify[n] getattr(n, cb)() @property def resource(self): """ Resource attached to a `ResourceRefCount` object. """ return self._resource
[docs] def hold(self): """ Increment the reference count of the attached resource. """ if self._resource is not None: self._count += 1
[docs] def release(self): """ Decrement reference count of the attached resource. If the reference count reaches zero, call all registered "on close" notify callbacks, and then call the "close" method on the resource which was set at initialization of the `ResourceRefCount` object. Finally, the `resource` property of the `ResourceRefCount` object will be set to `None`. """ if self._count > 0: self._count -= 1 if self._count == 0: self._notify_subscribers() if self._resource is not None: self._res_close() self._res_close = None del self._resource self._resource = None self._close_notify = {}
@property def refcount(self): """ Reference count. """ return self._count @property def closed(self): """ Indicates if the resource is "closed". """ return self._resource is None # Copying is not allowed for ResourceRefCount instances: def __copy__(self): return self def __deepcopy__(self, memo): return self
[docs]class ImageRef(object): """ A lightweight class that supports reference counting for FITS images and holds a :py:class:`ResourceRefCount` object. It provides several attributes that describe main characteristics of the image (file) and essential functions for manipulating reference count. Parameters ---------- hdulist_refcnt: ResourceRefCount A :py:class:`ResourceRefCount` object holding a `astropy.io.fits.HDUList` object. Attributes ---------- filename: str Name of the opened file. Can be `None` for in-memory created `astropy.io.fits.HDUList` objects. can_reload_data: bool `True` for files attached to a physical file, `False` for in-memory `astropy.io.fits.HDUList` objects. original_fname: str Original name of the file as requested by the user. *Note:* may be different from ``filename`` if the orininal file was in GEIS or WAIVER FITS format and subsequently was converted to a MEF FITS format. In that case this attribute will show the name of the original GEIS or WAIVER FITS file. original_ftype: str Type of the original file. Can take one of the following values: 'MEF', 'SIMPLE', 'GEIS', 'WAIVER', or 'UNKNOWN'. original_exists: bool Indicates if the physical file exists. It is `False` for in-memory images. mef_fname: str, None Name of the MEF FITS file if exists, `None` otherwise. mef_exists: bool Indicates whether the MEF FITS file exists. DQ_model: str, None Type of the DQ model: 'external' for WFPC, WFPC2, and FOC instruments (or non-HST data if set so) that have DQ data in a separate (from image) file and 'intrinsic' for ACS, etc. images that have DQ extensions in the image file. It is `None` if the image does not have DQ data. telescope: str Telescope that acquired the image. instrument: str Instrument used to acquire data. fmode: str File mode used to open FITS file. See `astropy.io.fits.open` for more details. memmap: bool Is the `astropy.io.fits.HDUList` memory mapped? """ def __init__(self, hdulist_refcnt=None): # loaded file name. This can be None for "pure" in-memory objects. # However, it will indicate the "original" file on the disk if the # "in-memory" HDUList was created with 'readgeis' or # 'convertwaiveredfits' self.filename = None # True for not-in-memory HDULists (e.g., loaded with fits.open() # from existing 'simple' or 'mef' FITS files) self.can_reload_data = False if hdulist_refcnt is not None: if hdulist_refcnt.closed: raise ValueError( "Resource reference counter must not be closed." ) hdulist_refcnt.hold() hdulist_refcnt.subscribe_close_notify( self, '_refcnt_about_to_close' ) self._hdulist_refcnt = hdulist_refcnt # EXTNAME of the standard image extensions. When it is obtained # from openImageEx, this is valid only for HST products and # is computed based on file name suffix, file type, and instrument. self._img_extname = None self.original_fname = None # input file name supplied to openImageEx # Possible values for original_ftype: 'MEF', 'SIMPLE', 'GEIS', # 'WAIVER', 'UNKNOWN' self.original_ftype = 'UNKNOWN' # original_exists can be False for DQ images that do not exist when the # science image is opened with openImageEx self.original_exists = False self.mef_fname = None self.mef_exists = False # Possible values for DQ_model: # - 'external' for WFPC, WFPC2, and FOC instruments or non-HST data # if set so; # - 'intrinsic' for ACS, etc. that have DQ extensions in the image # file; # - None if the image does not have DQ data. self.DQ_model = None self.telescope = 'UNKNOWN' self.instrument = 'UNKNOWN' self.fmode = 'readonly' self.memmap = True def __copy__(self): cp = ImageRef(self._hdulist_refcnt) cp.__dict__ = self.__dict__.copy() return cp def __deepcopy__(self, memo): cp = ImageRef(self._hdulist_refcnt) memo[id(self)] = cp for k, v in self.__dict__.items(): setattr(cp, k, deepcopy(v, memo)) return cp @property def extname(self): """ Extension name of the first extension. """ return self._img_extname @extname.setter def extname(self, extn): self._img_extname = extn @property def refcount(self): """ Reference count of the attached `astropy.io.fits.HDUList`. """ if self._hdulist_refcnt is not None: return self._hdulist_refcnt.refcount else: return None # callback for when the resource reference counter is about to be # closed def _refcnt_about_to_close(self): self.can_reload_data = False self.filename = None self._hdulist_refcnt = None
[docs] def set_HDUList_RefCount(self, hdulist_refcnt=None): """ Set (attach) a new :py:class:`ResourceRefCount` object that holds a `astropy.io.fits.HDUList` object. This is allowed only if the already attached :py:class:`ResourceRefCount` can be closed. The reference count of the :py:class:`ResourceRefCount` being attached will be incremented. Parameters ---------- hdulist_refcnt: ResourceRefCount, None A :py:class:`ResourceRefCount` object containing a `astropy.io.fits.HDUList` object. If it is `None`,it will release and close the attached :py:class:`ResourceRefCount` object. Raises ------ ValueError The (already) attached :py:class:`ResourceRefCount` must have reference count <= 1 so that it can be closed before being replaced with a new resource. ValueError The resource being attached must be in an open state. """ # for now, allow setting a new RefCount object only if the # existing counter can be closed. see comments below... if self._hdulist_refcnt is not None and self._hdulist_refcnt.refcount: raise ValueError("set_HDUList_RefCount cannot replace the " "assigned ResourceRefCount object when it " "its reference count is larger than one.") if hdulist_refcnt is self._hdulist_refcnt: return if self._hdulist_refcnt is not None: if hdulist_refcnt.closed: raise ValueError("The new resource reference counter must " "not be closed.") #TODO: this will 'release' _hdulist_refcnt only once. #However there may be more copies of this instance of ImageRef # that "hold" the original (before replacement) _hdulist_refcnt. # Therefore, the original resource might still be open. # # Should we forbid replacing the resource counter if it is not # "closed"? # -- see RuntimeError below... # # In addition, wouldn't it be better to set callbacks to the # reference counter so that "subscribed" images could be "notified" # that the resource counter is about to be replaced? # # Also, there is the issue of dealing with multiple # copies/instances of this class: we should 'release' the refcount # as many times as it was held by a given instance of the ImageRef # class. At this moment this is not implemented. # self._hdulist_refcnt.release() if not self._hdulist_refcnt.closed: raise ValueError( "An attempt was made to replace an open resource reference" " counter. The existing resource counter must be closed " "first before it can be replaced." ) self._hdulist_refcnt = hdulist_refcnt if hdulist_refcnt is not None: self._hdulist_refcnt.hold() hdulist_refcnt.subscribe_close_notify( self, '_refcnt_about_to_close' )
@property def closed(self): """ Is the :py:class:`ImageRef` object closed? """ # if self._hdulist_refcnt.resource is not None: # return self._hdulist_refcnt.resource._HDUList__file.closed # else: # return True return self._hdulist_refcnt is None or self._hdulist_refcnt.closed
[docs] def release(self): """ Decrement reference count of the attached resource. If the reference count reaches zero, the attached :py:class:`ResourceRefCount` object will be closed and set to `None`. """ if self._hdulist_refcnt is not None: self._hdulist_refcnt.release() # check for self._hdulist_refcnt is not None below is # necessary as the callback (if set) may set # self._hdulist_refcnt to None if self._hdulist_refcnt is not None and \ self._hdulist_refcnt.refcount <= 0: self.can_reload_data = False self.filename = None self._hdulist_refcnt = None
[docs] def hold(self): """ Increment the reference count of the attached resource. """ if self._hdulist_refcnt is not None: self._hdulist_refcnt.hold()
@property def hdu(self): """ `astropy.io.fits.HDUList` of the attached image. """ if self._hdulist_refcnt is not None: return self._hdulist_refcnt.resource else: return None
[docs] def info(self, fh=sys.stdout): """ info(self, fh=sys.stdout) Print a summary of the object attributes. """ fh.write("Loaded file name: '{}'\n".format(self.filename)) fh.write("Original file name: '{}'\n".format(self.original_fname)) fh.write("Original file type: '{}'\n".format(self.original_ftype)) fh.write("Original file exists: '{}'\n".format(self.original_exists)) fh.write("MEF file name: '{}'\n".format(self.mef_fname)) fh.write("MEF file exists: '{}'\n".format(self.mef_exists)) fh.write("Can reload data: '{}'\n".format(self.can_reload_data)) fh.write("EXTNAME: '{}'\n".format(self.extname)) fh.write("DQ model: '{}'\n".format(self.DQ_model)) fh.write("Telescope: '{}'\n".format(self.telescope)) fh.write("Instrument: '{}'\n".format(self.instrument)) fh.write("File open mode: '{}'\n".format(self.fmode)) fh.write("File memory mapped: '{}'\n".format(self.memmap)) fh.write("Reference counter object: '{}'\n" .format(self._hdulist_refcnt)) fh.write("Reference counter value: '{}'\n".format(self.refcount))
def _extract_instr_info(header): tel = None instr = None ftype = None if 'TELESCOP' in header: tel = header['TELESCOP'].strip().upper() if 'INSTRUME' in header: instr = header['INSTRUME'].strip().upper() if 'FILETYPE' in header: ftype = header['FILETYPE'].strip().upper() return (tel, instr, ftype) def basicGEIScheck(fname): """ Perform a *quick* but *very basic* check if the input file possibly might be a GEIS file. Parameters ---------- fname: str, file object Returns ------- bool `True` if the file appears to be a GEIS file, and `False` otherwise. """ if isinstance(fname, str): # if fname is a string - open the file fh = open(fname, 'rb') closefh = True else: # assume it is a file handle: fh = fname closefh = False cardLen = fits.Card.length readbytes = cardLen + 1 try: # process first "card": card = fh.read(readbytes) if not isinstance(card, str): try: card = card.decode(encoding='ascii') except UnicodeDecodeError: if closefh: fh.close() return False if len(card) != readbytes or card[-1] != '\n' or \ card[:29].upper() != 'SIMPLE = ': if closefh: fh.close() return False endcard_found = False while True: card = fh.read(readbytes) if not isinstance(card, str): try: card = card.decode(encoding='ascii') except UnicodeDecodeError: break cardn = len(card) if (cardn == cardLen or cardn == readbytes) and \ card[:cardLen].upper() == 'END' + (cardLen - 3) * ' ': # we have reached the 'END' card endcard_found = True break if cardn != readbytes: # card does not have the standard length break if card[-1] != '\n': # card does not end with a new line character break # # EXTRA CHECK (these errors would make fits.verify to fail): # if card[0].isspace(): # if card[:8] != ' ': # break # elif (card[:8].upper() not in ['COMMENT ', 'HISTORY '] and # card[8:10] != '= '): # break finally: if closefh: fh.close() return endcard_found def basicFITScheck(fname): """ Perform a *quick* but *very basic* check if the input file possibly might be a FITS file. Parameters ---------- fname: str, file object Returns ------- bool `True` if the file appears to be a FITS file, and `False` otherwise. """ if isinstance(fname, str): # if fname is a string - open the file fh = open(fname, 'rb') closefh = True else: # assume it is a file handle: fh = fname closefh = False cardLen = fits.Card.length try: # process first "card": card = fh.read(cardLen) if not isinstance(card, str): try: card = card.decode(encoding='ascii') except UnicodeDecodeError: if closefh: fh.close() return False if len(card) != cardLen or \ card[:29].upper() != 'SIMPLE = ': if closefh: fh.close() return False # process second "card": card = fh.read(cardLen) if len(card) != cardLen: if closefh: fh.close() return False # test whether card is a byte or a string # Required to support Python3 binary file operations vs # Python2 string file results if not isinstance(card, str): try: card = card.decode(encoding='ascii') except UnicodeDecodeError: if closefh: fh.close() return False if len(card) != cardLen or \ card[:26].upper() != 'BITPIX = ': if closefh: fh.close() return False except Exception: return False finally: if closefh: fh.close() return True def _geis2FITSName(rootname, geisext): return rootname + '_' + geisext[1:] + os.extsep + 'fits' # def _waiver2FITSName(rootname, waiverext): # if len(rootname) > 0: # ch = 'h' if rootname[-1].islower() else 'H' # return rootname[:-1] + ch + waiverext # else: # return rootname[:-1] + waiverext def _getExtname(hdulist): if len(hdulist) < 2: return 'PRIMARY' if 'EXTNAME' in hdulist[1].header: return hdulist[1].header['EXTNAME'] else: return None def _openHDU(imageRef, doOpen, preferMEF, rc_orig, fmode, memmap): if doOpen: assert(rc_orig.refcount > 0) if imageRef.original_ftype in ['MEF', 'SIMPLE']: rc_new = rc_orig rc_new.hold() imageRef.extname = _getExtname(rc_orig.resource) imageRef.filename = imageRef.mef_fname imageRef.can_reload_data = True imageRef.memmap = memmap imageRef.fmode = fmode else: if preferMEF and imageRef.mef_exists: rc_new = ResourceRefCount(None) try: hdulist = fits.open(imageRef.mef_fname, mode=fmode, memmap=memmap) rc_new = ResourceRefCount(hdulist) rc_new.hold() imageRef.extname = _getExtname(hdulist) imageRef.filename = imageRef.mef_fname imageRef.can_reload_data = True imageRef.memmap = memmap imageRef.fmode = fmode except Exception: imageRef.can_reload_data = False if not rc_new.closed: rc_new.release() rc_new = rc_orig rc_new.hold() else: rc_new = rc_orig rc_new.hold() else: rc_new = ResourceRefCount(None) imageRef.filename = None imageRef.extname = None imageRef.can_reload_data = False return rc_new def _getDQHDUList(dqimage, dqfile, doOpen, ftype, basicChk, openOrigFn, kw_openOrigFn, verbose): dqimage.original_exists = path.isfile(dqfile) dqimage.original_fname = dqfile dqimage.original_ftype = ftype dqimage.mef_exists = dqimage.mef_fname is not None \ and path.isfile(dqimage.mef_fname) if doOpen and not dqimage.original_exists and verbose: print("Data quality file \'{}\' not found.".format(dqfile)) dqimage.filename = None return None if dqimage.original_exists and doOpen: try: if basicChk(dqfile): dqhdulist = openOrigFn(dqfile, **kw_openOrigFn) dqhdr = dqhdulist[0].header dqimage.filename = dqfile dqimage.original_exists = True dqimage.extname = _getExtname(dqhdulist) if 'FILETYPE' in dqhdr and \ dqhdr['FILETYPE'].strip().upper() == 'SDQ': return dqhdulist else: dqhdulist.close() if verbose: print("Unsupported \'FILETYPE\' value in " "data quality file \'{}\'".format(dqfile)) except Exception: if verbose: print("Could not read data quality file " "\'{}\'".format(dqfile)) dqimage.original_exists = False dqimage.original_ftype = 'UNKNOWN' dqimage.filename = None return None
[docs]def openImageEx(filename, mode='readonly', dqmode='readonly', memmap=True, saveAsMEF=True, output_base_fitsname=None, clobber=True, imageOnly=False, openImageHDU=True, openDQHDU=False, preferMEF=True, verbose=False): """ Open an image file and (if requested) the associated DQ file and return corresponding :py:class:`ImageRef` objects. This function is an enhanced version of :py:func:`stsci.tools.fileutil.openImage` function in that it can open both the image file and the associated DQ image. It also provides additional inormation about the opened files: file type, original file name, DQ model ("intrinsic", where DQ data are placed in the same file as the science data, or "extrinsic" when DQ data are in a separate file from the science data), etc. Because of the way it was implemented, it requires half of the number of calls to `astropy.io.fits.open` thus making it almost twice as fast as the :py:func:`~stsci.tools.fileutil.openImage` function. Parameters ---------- filename: str File name of the file to be opened. The image file formats are recognized: simple/MEF FITS, HST GEIS format, or WAIVER FITS format. mode: str, optional File mode used to open main image FITS file. See `astropy.io.fits.open` for more details. dqmode: str, optional File mode used to open DQ image FITS file. See parameter ``mode`` above for more details. memmap: bool, optional Should memory mapping to be used whe opening simple/MEF FITS? saveAsMEF: bool, optional Should an input GEIS or WAIVER FITS be converted to simple/MEF FITS? output_base_fitsname: str, None, optional The base name of the output simple/MEF FITS when ``saveAsMEF`` is `True`. If it is `None`, the file name of the converted file will be determined according to HST file naming conventions. clobber: bool, optional If ``saveAsMEF`` is `True`, should any existing output files be overwritten? imageOnly: bool, optional Should this function open the image file only? If `True`, then the DQ-related attributes will not be valid. openImageHDU: bool, optional Indicates whether the returned :py:class:`ImageRef` object corresponding to the science image file should be in an open or closed state. openDQHDU: bool, optional Indicates whether the returned :py:class:`ImageRef` object corresponding to the DQ image file should be in an open or closed state. preferMEF: bool, optional Should this function open an existing MEF file that complies with HST naming convention when the input file is in GEIS or WAIVER FITS format, even when ``saveAsMEF`` is `False` or ``clobber`` is `False`? verbose: bool, optional If `True`, some addition information will be printed out to standard output. Returns ------- (img, dqimg): (ImageRef, ImageRef) A tuple of :py:class:`ImageRef` objects corresponding to the science image and to the DQ image. Each object in the returned tuple open or closed depending on the input arguments. .. note:: If the returned object is *open*, then its reference count will be at least 1. The caller is responsible for "releasing" the object when it is no longer needed. .. note:: If the DQ model of the opened file is "intrinsic", then the `dqimg` component of the returned tuple will hold a reference counter to the same image. Thus, for "intrinsic" DQ data models, the reference count of the returned objects may be 2 (if both science ``openImageHDU`` and ``openDQHDU`` are `True`). Raises ------ ValueError If input file is neither a GEIS file nor a FITS file. ValueError Errors occured while accessing/reading the file possibly due to corrupted file, non-compliant file format, etc.s """ # Initialize output dictionary: sci_image = ImageRef() dq_image = ImageRef() # Image file: hdulist = None # Data Quality (DQ) file (valid only if 'associated_DQ_file' is True): dqhdulist = None # Extra Info: telescope = 'UNKNOWN' instrument = 'UNKNOWN' # supported GEISS/WAIVER instruments (for DQ detection): supported_extern_DQ = ['WFPC2', 'WFPC', 'FOC', 'HRS'] # Insure that the filename is always fully expanded # This will not affect filenames without paths or # filenames specified with extensions. filename = fileutil.osfn(filename) # Extract the rootname and extension specification # from input image name _fname, _iextn = fileutil.parseFilename(filename) sci_image.original_fname = _fname (_root, fext) = path.splitext(_fname) if output_base_fitsname is None: _outroot = _root else: fpath, oldbase = path.split(_root) _outroot = path.join(fpath, output_base_fitsname) # readgeis may hang on incorrect (non-GEIS) files. To avoid this, # do a basic GEIS file check: if len(fext) == 4 and fext[-1].lower() == 'h' and basicGEIScheck(_fname): # input file might be in GEIS format so try to read it: hdulist = readgeis.readgeis(_fname) ftype = 'GEIS' convert2MEF = saveAsMEF sci_image.original_exists = True sci_image.original_ftype = ftype sci_image.filename = _fname sci_image.mef_fname = _geis2FITSName(_outroot, fext) sci_image.mef_exists = path.isfile(sci_image.mef_fname) sci_image.extname = _getExtname(hdulist) sci_image.fmode = 'readonly' sci_image.memmap = False (telescope, instrument, hstftype) = _extract_instr_info( hdulist[0].header ) has_DQ = (telescope == 'HST' and hstftype == 'SCI' and instrument in supported_extern_DQ and fext[-2] == '0') if has_DQ: sci_image.DQ_model = 'external' _dqfext = fext[:2] + '1' + fext[-1] _dqfname = _root + _dqfext dq_image.mef_fname = _geis2FITSName(_outroot, _dqfext) dqhdulist = _getDQHDUList( dq_image, _dqfname, not imageOnly, ftype, basicGEIScheck, readgeis.readgeis, {}, verbose ) else: sci_image.DQ_model = 'intrinsic' else: # Assume the input file is a FITS file if not basicFITScheck(_fname): raise ValueError( "Input file \'{}\' is neither a GEIS file nor a FITS file." .format(_fname) ) hdulist = fits.open(_fname, mode=mode, memmap=memmap) hdr = hdulist[0].header (telescope, instrument, hstftype) = _extract_instr_info(hdr) #TODO: the check for existance of data in the primary header should be # replaced with a more efficient check of NAXIS, GCOUNT, etc. as it # is not very efficient to load data just to check that data exist. data0 = hdulist[0].data if data0 is None: if len(hdulist) > 1: ftype = 'MEF' else: raise ValueError( "Input file \'{}\' has either an unsupported FITS format, " "contains no image data, or is from an unsupported " "instrument.".format(_fname) ) else: if len(hdulist) > 1 and isinstance(hdulist[1], fits.TableHDU): ftype = 'WAIVER' if (not (telescope == 'HST' and instrument in supported_extern_DQ) and verbose): print("Input file \'{}\' appears to have \'WAIVER\'-FITS " "like structure but originates from an unsupported " "instrument.".format(_fname)) else: ftype = 'SIMPLE' if ftype in ['MEF', 'SIMPLE']: # STANDARD SIMPLE OR MEF FITS FILE: convert2MEF = False sci_image.original_exists = True sci_image.original_ftype = ftype sci_image.filename = _fname sci_image.mef_fname = _fname sci_image.mef_exists = True sci_image.extname = _getExtname(hdulist) has_DQ = (telescope == 'HST' and instrument in supported_extern_DQ and hstftype == 'SCI' and len(_root) > 2 and _root[-2] == '0') if has_DQ: sci_image.DQ_model = 'external' _dqfname = _root[:-2] + '1' + _root[-1] + fext dq_image.mef_fname = _dqfname dq_image.fmode = dqmode dqhdulist = _getDQHDUList( dq_image, _dqfname, not imageOnly, ftype, (lambda x: True), fits.open, {'mode': dqmode, 'memmap': memmap}, verbose ) if dqhdulist is None: dq_image.mef_exists = False else: sci_image.DQ_model = 'intrinsic' else: # "WAIVER"-FITS FORMAT: convert2MEF = saveAsMEF if len(_root) == 0: ch = '' else: ch = 'h' if _root[-1].islower() else 'H' whdulist = hdulist try: hdulist = convertwaiveredfits.convertwaiveredfits(whdulist) except Exception as e: hdulist = None raise e finally: whdulist.close() del whdulist sci_image.original_exists = True sci_image.original_ftype = ftype sci_image.filename = _fname sci_image.mef_fname = _root[:-1] + ch + fext sci_image.mef_exists = path.isfile(sci_image.mef_fname) sci_image.extname = _getExtname(hdulist) sci_image.fmode = 'readonly' sci_image.memmap = False telescope, instrument, hstftype = _extract_instr_info( hdulist[0].header ) has_DQ = (telescope == 'HST' and hstftype == 'SCI' and instrument in supported_extern_DQ and len(_root) > 2 and _root[-2] == '0') if has_DQ: sci_image.DQ_model = 'external' _dqfname = _root[:-2] + '1' + _root[-1] + fext dq_image.mef_fname = _outroot[:-2] + '1' + ch + fext dqhdulist = _getDQHDUList( dq_image, _dqfname, not imageOnly, ftype, basicFITScheck, convertwaiveredfits.convertwaiveredfits, {}, verbose ) else: sci_image.DQ_model = 'intrinsic' sci_image.telescope = telescope sci_image.instrument = instrument sci_image.ftype = hstftype dq_image.telescope = telescope dq_image.instrument = instrument dq_image.ftype = hstftype if convert2MEF: # convert GEIS or WAIVER to MEF: if (sci_image.mef_exists and clobber) or not sci_image.mef_exists: if verbose: print("Writing out {} as MEF to \'{}\'" .format(ftype, sci_image.mef_fname)) hdulist.writeto(sci_image.mef_fname, overwrite=clobber) sci_image.mef_exists = True if dq_image.original_exists and not imageOnly and \ ((dq_image.mef_exists and clobber) or not dq_image.mef_exists): if verbose: print("Writing out DQ {} as MEF to \'{}\'" .format(ftype, dq_image.mef_fname)) dqhdulist.writeto(dq_image.mef_fname, overwrite=clobber) dq_image.mef_exists = True rcim_orig = ResourceRefCount(hdulist) rcim_orig.hold() if sci_image.DQ_model == 'intrinsic': rcdq_orig = rcim_orig rcdq_orig.hold() fmode = mode if openImageHDU else dqmode if openImageHDU or (openDQHDU and not imageOnly): rcim = _openHDU(sci_image, True, preferMEF, rcim_orig, fmode, memmap) dq_image = copy(sci_image) if openDQHDU: rcdq = rcim rcdq.hold() else: rcdq = _openHDU(dq_image, False, preferMEF, rcdq_orig, fmode, memmap) if not openImageHDU: rcim.release() rcim = _openHDU(sci_image, False, preferMEF, rcim_orig, mode, memmap) else: dq_image = copy(sci_image) rcim = _openHDU(sci_image, False, preferMEF, rcim_orig, mode, memmap) rcdq = _openHDU(dq_image, False, preferMEF, rcdq_orig, fmode, memmap) # try to "guess" DQ extension name if count_extensions(hdulist, 'DQ') > 0: dq_image.extname = 'DQ' else: dq_image.extname = None else: rcdq_orig = ResourceRefCount(dqhdulist) rcdq_orig.hold() rcim = _openHDU( sci_image, openImageHDU, preferMEF, rcim_orig, mode, memmap ) rcdq = _openHDU( dq_image, openDQHDU and not imageOnly and not rcdq_orig.closed, preferMEF, rcdq_orig, dqmode, memmap ) dq_image.fmode = dqmode sci_image.set_HDUList_RefCount(None if rcim.closed else rcim) dq_image.set_HDUList_RefCount(None if rcdq.closed else rcdq) dq_image.DQ_model = None rcim_orig.release() rcdq_orig.release() rcim.release() rcdq.release() return (sci_image, dq_image)
[docs]def almost_equal(arr1, arr2, fp_accuracy=None, fp_precision=None): r""" Compares two values, or values of `numpy.ndarray` and verifies that these values are close to each other. For exact type (integers and boolean) the comparison is exact. For inexact types (`float`, `numpy.float32`, etc.) it checks that the values (or *all* values in a `numpy.ndarray`) satisfy the following inequality: .. math:: |v1-v2| \leq a + 10^{-p} \cdot |v2|, where ``a`` is the accuracy ("absolute error") and ``p`` is precision ("relative error"). Parameters ---------- arr1: float, int, bool, str, numpy.ndarray, None, etc. First value or array of values to be compared. arr2: float, int, bool, str, numpy.ndarray, None, etc. Second value or array of values to be compared. fp_accuracy: int, float, None, optional Accuracy to withing values should be close. Default value will use twice the value of the machine accuracy (machine epsilon) for the input type. This parameter has effect only when the values to be compared are of inexact type (e.g., `float`). fp_precision: int, float, None, optional Accuracy to withing values should be close. Default value will use twice the value of the machine precision (resolution) for the input type. This parameter has effect only when the values to be compared are of inexact type (e.g., `float`). Returns ------- almost_equal: bool Returns `True` if input values are close enough to each other or `False` otherwise. Raises ------ ValueError If `fp_accuracy` is negative. TypeError If `fp_precision` is not `int`, `float`, or `None`. """ if fp_accuracy is not None: if not (isinstance(fp_accuracy, float) or isinstance(fp_accuracy, int)) \ or ((isinstance(fp_accuracy, float) or isinstance(fp_accuracy, int)) and fp_accuracy < 0): raise ValueError("Accuracy must be either a non-negative " "number or None") if fp_precision is not None and not (isinstance(fp_precision, float) or isinstance(fp_precision, int)): raise ValueError("Precision must be either None, an integer, or " "a floating-point number") a1 = np.asarray(arr1) a2 = np.asarray(arr2) if a1.shape != a2.shape: return False t1 = a1.dtype.type t2 = a2.dtype.type if issubclass(t1, np.inexact): fi = np.finfo(t1) acc = 2.0 * fi.eps if fp_accuracy is None else fp_accuracy prec = 2.0 * fi.resolution if fp_precision is None \ else 10**(-fp_precision) elif issubclass(t1, np.integer) or issubclass(t1, np.bool_): acc = 0 prec = 0 else: acc = None prec = None if issubclass(t2, np.inexact): if acc is None: return False fi = np.finfo(t2) if fp_accuracy is None: acc = max(2.0 * fi.eps, acc) if fp_precision is None: prec = max(2.0 * fi.resolution, acc) elif (issubclass(t2, np.integer) or issubclass(t2, np.bool_)): if acc is None: return False acc = 0 prec = 0 if acc is None or (acc == 0 and prec == 0): return np.all(a1 == a2) else: return np.all(np.abs(a1 - a2) <= acc + prec * np.abs(a2))
[docs]def skyval2txt(files='*_flt.fits', skyfile='skyfile.txt', skykwd='SKYUSER', default_ext=('SCI', '*')): """ A convenience function that allows retrieving computed sky background values from image headers and storing them in a text file that can be re-used by ``drizzlepac.astrodrizzle.AstroDrizzle()``. This is particularly useful when performing sky *matching* on a large number of images which takes considerable time. Saving computed sky values to a text file allows re-running ``AstroDrizzle()`` without re-computing sky values. .. warning:: The file specified by ``skyfile`` is overwritten without warning. .. note:: Images that do not have specified extensions will be ignored. Parameters ---------- files: str File name(s), including extension specification if necessary, from which sky values should be retrieved: * a comma-separated list of valid science image file names (see note below) and (optionally) extension specifications, e.g.: ``'j1234567q_flt.fits[1], j1234568q_flt.fits[sci,2]'``; * an @-file name, e.g., ``'@fits_files_with_skyvals.txt'``. .. note:: **Valid** **science** **image** **file** **names** are: * file names of existing FITS, GEIS, or WAIVER FITS files; * partial file names containing wildcard characters, e.g., ``'*_flt.fits'``; * Association (ASN) tables (must have ``_asn``, or ``_asc`` suffix), e.g., ``'j12345670_asn.fits'``. .. warning:: @-file names **MAY** **NOT** be followed by an extension specification. .. warning:: If an association table or a partial file name with wildcard characters is followed by an extension specification, it will be considered that this extension specification applies to **each** file name in the association table or **each** file name obtained after wildcard expansion of the partial file name. skyfile: str, optional Output "skyfile" to which sky values from the image headers should be written out. skykwd: str, optional Header keyword holding the value of the computed sky background. default_ext: int, tuple, optional Default extension to be used with image files that to not have an extension specified. """ from .parseat import parse_cs_line files = parse_cs_line(files, doNotOpenDQ=True, im_fmode='readonly', default_ext=default_ext) with open(skyfile, 'w') as f: for file in files: if len(file.fext) < 1: # ignore images that do not have required extension(s): file.release_all_images() continue f.write('{:s}'.format(file.image.filename)) for ext in file.fext: skyval = file.image.hdu[ext].header.get(skykwd, 0.0) f.write('\t{:.16g}'.format(skyval)) f.write('\n') file.release_all_images()