"""
Module for parsing ``@-files`` or user input strings for use
by :py:mod:`stsci.skypac` module.
:Authors: Mihai Cara
:License: :doc:`LICENSE`
"""
import os
import sys
import glob
from copy import copy, deepcopy
from .utils import MultiFileLog, ImageRef, openImageEx, get_ext_list
from stsci.tools import fileutil, parseinput, asnutil
try:
# added to provide interpretation of environment variables
from stsci.tools.fileutil import osfn
except ImportError:
osfn = None
__all__ = ['parse_at_file', 'parse_at_line', 'parse_cs_line',
'FileExtMaskInfo']
# Because os.stat is not supported on all platforms, we use STDLIB
# implemenation on posix platforms and our own implementation (through simple
# file name comparison) of os.stat on other platforms.
if os.name == 'posix':
_sameFile = os.path.samefile
_sameStat = os.path.samestat
# _Stat = os.stat
def _Stat(fn):
# class st(object):
# def __init__(self, ino, dev):
# self.st_ino = ino
# self.st_dev = dev
if fn is None:
# return st(None, None)
return os.stat_result(10 * (None,))
else:
return os.stat(fn)
else:
# TODO: not sure if this is needed for cygwin. check this at a later time!
def _sameFile(fn1, fn2):
# TODO: A better way would be to use GetFileInformationByHandle as
# shown here:
# http://timgolden.me.uk/python/win32_how_do_i/see_if_two_files_are_the_same_file.html
# or as it is implemented in Python 3.3 though _getfinalpathname
# (still relying on WinAPI calls). Same applies to _Stat
# function below.
return (fn1.st_ino == fn2.st_ino and fn1.st_dev == fn2.st_dev)
def _sameStat(st1, st2):
return st1 == st2
def _Stat(fn):
# class st(object):
# def __init__(self, ino, dev):
# self.st_ino = ino
# self.st_dev = dev
if fn is None:
# return st(None, None)
return os.stat_result(10 * (None,))
else:
fname = os.path.abspath(os.path.expanduser(fn))
# return st(fname, 0)
return os.stat_result((0, fname) + 8 * (0,))
class CharAccumulator(object):
def __init__(self, allowed_qmarks=['\'', '\"']):
self.quotes = deepcopy(allowed_qmarks)
self.dirty = False
self.reset()
def __str__(self):
if self.quoted:
return self.interpreted_str()
else:
return self._buff.rstrip()
def interpreted_str(self):
if not self.quoted:
s = self._buff.rstrip()
return s if len(s) > 0 else None
elif self.closed:
return self._buff
else:
raise SyntaxError("CharAccumulator object contains a "
"string with an opening quotation mark but "
"without a corresponding closing quotation "
"mark (EOL).")
def reset(self):
# if self.dirty:
self._buff = ''
self._dirty = False
self._closed = False
self._quoted = False
self._qmark = ''
@property
def length(self):
return len(self._buff)
@property
def dirty(self):
return self._dirty
@dirty.setter
def dirty(self, dFlag):
self._dirty = dFlag
@property
def qmark(self):
return self._qmark
@qmark.setter
def qmark(self, quotation_mark):
assert(quotation_mark in self.quotes)
self._qmark = quotation_mark
self.dirty = True
self.quoted = True
@property
def closed(self):
return self._closed
def close(self):
self._closed = True
self._qmark = ''
@property
def quoted(self):
return self._quoted
@quoted.setter
def quoted(self, qFlag):
self._quoted = qFlag
if not qFlag:
self._qmark = ''
def append(self, ch):
assert(len(ch) > 0)
# recursively add multiple characters:
if len(ch) > 1:
for c in ch:
self.append(c)
return
# do not add characters (except for white spaces which will be skipped)
# to the string buffer if the quotation marks have been closed
# or if the accumulator was closed externally
if self.closed:
if ch.isspace():
return
raise BufferError("An attempt was made to add a character to "
"a closed CharAccumulator object.")
# trim leading white spaces:
if not self.dirty and ch.isspace():
return
if ch in self.quotes:
if not (self.quoted or self.dirty):
self.qmark = ch
return
elif self.quoted and ch == self.qmark:
self.close()
return
self._buff = self._buff + ch
self.dirty = True
class ExtSpec(object):
def __init__(self, fname='Unknown'):
self._extstr = CharAccumulator()
self.reset(fname)
def _set_sigle_ext_style(self):
assert(self._ncomp <= 1)
self._compound_ext_delim = [None, None]
self._ext_style = 0
self._nmin = 1
self._nmax = 1
def _set_square_ext_style(self):
assert(self._ncomp <= 2)
self._compound_ext_delim = ['[', ']']
self._ext_style = 1
self._nmin = 1
self._nmax = 2
def _set_tuple_ext_style(self):
assert(self._ncomp <= 2)
self._compound_ext_delim = ['(', ')']
self._ext_style = 2
self._nmin = 2
self._nmax = 2
def reset(self, fname=None):
if fname is not None:
self.fname = fname
self._ext = None
self._extcomp = []
self._ncomp = 0
self._cntbraket = 0
self._dirty = False
self._closed = False
self._extstr.reset()
self._set_sigle_ext_style() # default
@property
def ext_style(self):
return self._ext_style
@property
def count(self):
return self._ncomp
@property
def dirty(self):
return self._dirty
def flag_as_dirty(self):
self._dirty = True
@property
def ext(self):
return self._ext
@property
def closed(self):
return self._closed
@property
def can_close(self):
if not self.dirty or self.closed:
return True
return (self.ext_style == 0 and (not self._extstr.quoted or
self._extstr.closed))
def close(self):
if self.closed:
return
self._closed = True
self._extstr.close()
self._extcomp.append(self._extstr.interpreted_str())
self._ncomp += 1
# validate extension specification and convert to one of
# the following formats: int, str, tuple
if self._cntbraket != 0:
raise ValueError("Unbalanced or nested brackets "
"have been detected.")
if self.count > self._nmax:
raise ValueError("Invalid extension specifier for file "
"'{:s}'.".format(self.fname))
if self.ext_style == 0:
if self.count < self._nmin or self._extcomp[0] is None:
raise ValueError("Extension name (or number) cannot be empty.")
try:
if not self._extstr.quoted:
self._ext = int(self._extcomp[0])
else:
self._ext = self._extcomp[0]
except ValueError:
self._ext = self._extcomp[0]
elif self.ext_style == 1:
# [extname, extver], [extname], or [extnumber]
if self.count < self._nmin or \
(self.count == 1 and self._extcomp[0] is None):
raise ValueError("An extension specifier cannot be empty.")
if self.count == 2:
if self._extcomp[0] is None:
raise ValueError('Extension name cannot be empty.')
elif self._extcomp[1] is None:
raise ValueError('Extension version cannot be empty.')
# make sure first component is a string and second one is
# an int or wildcard (*):
if not isinstance(self._extcomp[0], str):
raise ValueError('Extension name must be a valid string.')
if self._extcomp[1] == '*':
self._ext = (self._extcomp[0], '*')
else:
try:
self._ext = (self._extcomp[0], int(self._extcomp[1]))
except ValueError:
raise ValueError("Extension version must be either "
"an integer or a wild-card ('*').")
elif self.count == 1:
try:
self._ext = int(self._extcomp[0]) # is an integer?
except ValueError:
self._ext = (self._extcomp[0], 1) # [sci] == [sci,1]
else:
assert(False) # we should not get here
elif self.ext_style == 2: # (extname, extver)
if self.count < self._nmin:
raise ValueError(
"A tuple extension specifier must contain precisely two "
"components: (extname, extver)."
)
if self._extcomp[0] is None:
raise ValueError('Extension name cannot be empty.')
elif self._extcomp[1] is None:
raise ValueError('Extension version cannot be empty.')
# make sure first component is a string and second one is
# an int or wildcard (*):
if not isinstance(self._extcomp[0], str):
raise ValueError('Extension name must be a valid string.')
if self._extcomp[1] == '*':
self._ext = (self._extcomp[0], '*')
else:
try:
self._ext = (self._extcomp[0], int(self._extcomp[1]))
except ValueError:
raise ValueError("Extension version must be either "
"an integer or a wild-card ('*').")
def append(self, ch):
assert(len(ch) > 0)
# recursively add multiple characters:
if len(ch) > 1:
for c in ch:
self.append(c)
return
# do not add characters (except for white spaces which will be skipped)
# to the string buffer if the quotation marks have been closed
# or if the accumulator was closed externally
if self.closed:
if ch.isspace():
return
if ch in ['(', ')', '[', ']', '{', '}']:
raise ValueError("Misplaced, unbalanced, or nested "
"brackets have been detected.")
raise ValueError("An attempt was made to add a character to "
"a closed ExtSpec object. Extension "
"specification cannot be followed by "
"non-white space characters.")
if not self.dirty:
if ch.isspace():
# trim leading white spaces:
return
else:
# set extension style:
self.flag_as_dirty()
if ch == '[':
self._cntbraket += 1
self._set_square_ext_style()
return
elif ch == '(':
self._cntbraket += 1
self._set_tuple_ext_style()
return
else:
self._cntbraket = 0
self._set_sigle_ext_style()
if ch == ',' and (self._extstr.closed or not self._extstr.quoted):
if self.ext_style == 0:
self.close()
else:
self._extstr.close()
self._extcomp.append(self._extstr.interpreted_str())
self._ncomp += 1
self._extstr.reset()
if self.count > self._nmax:
raise ValueError("Invalid extension specifier for file "
"'{:s}'.".format(self.fname))
return
if ch == self._compound_ext_delim[1] and \
(self._extstr.closed or not self._extstr.quoted):
self._cntbraket -= 1
if self._cntbraket < 0:
raise ValueError(
"Unbalanced or nested brackets have been detected."
)
self.close()
return
if not self._extstr.quoted and ch in ['(', ')', '[', ']', '{', '}']:
raise ValueError("Misplaced, unbalanced, or nested brackets "
"have been detected.")
self._extstr.append(ch)
return
class MultiExtSpec(object):
def __init__(self, fname='Unknown', default_strextv='*'):
self._compound_ext_delim = ['{', '}']
self._extspec = ExtSpec(fname)
self._defextv = default_strextv
self.reset()
def reset(self, fname=None):
if fname is not None:
self.fname = fname
self._extspec.reset()
self._extlist = []
self._next = 0
self._cntbraket = 0
self._dirty = False
self._closed = False
@property
def count(self):
return self._next
@property
def dirty(self):
return self._dirty
def flag_as_dirty(self):
self._dirty = True
@property
def ext(self):
return self._extlist
@property
def closed(self):
return self._closed
def close(self):
if self.closed:
return
self._closed = True
if self._extspec.dirty:
self._extspec.close()
if self._extspec.ext is not None:
# for a string extension: leave a str extension == '*'
# unchanged, otherwise create a tuple with
# (str, default extension):
if isinstance(self._extspec.ext, str) and self._defextv and \
not self._extspec.ext == '*':
self._extlist.append((self._extspec.ext, self._defextv))
else:
self._extlist.append(self._extspec.ext)
self._next += 1
# validate extension specification and convert to one of
# the following formats: int, str, tuple
if self._cntbraket != 0:
raise ValueError("Unbalanced or nested brackets "
"have been detected.")
def append(self, ch):
assert(len(ch) > 0)
# recursively add multiple characters:
if len(ch) > 1:
for c in ch:
self.append(c)
return
# do not add characters (except for white spaces which will be skipped)
# to the string buffer if the quotation marks have been closed
# or if the accumulator was closed externally
if self.closed:
if ch.isspace():
return
if ch in ['(', ')', '[', ']', '{', '}']:
raise ValueError("Misplaced, unbalanced, or nested "
"brackets have been detected.")
raise ValueError("An attempt was made to add a character to "
"a closed MultiExtSpec object. Extension "
"specification cannot be followed by "
"non-white space characters.")
if not self.dirty:
if ch.isspace():
# trim leading white spaces:
return
else:
# set extension style:
self.flag_as_dirty()
if ch == self._compound_ext_delim[0]:
self._cntbraket += 1
return
else:
raise ValueError("Multi-extension specification must "
"start with left curly ('{') bracket.")
if ch == ',' and self._extspec.can_close:
if self._extspec.dirty:
self._extspec.close()
if self._extspec.ext is not None:
# for a string extension: leave a str extension == '*'
# unchanged, otherwise create a tuple with
# (str, default extension):
if isinstance(self._extspec.ext, str) and self._defextv and \
not self._extspec.ext == '*':
self._extlist.append((self._extspec.ext, self._defextv))
else:
self._extlist.append(self._extspec.ext)
self._next += 1
self._extspec.reset()
return
if ch == self._compound_ext_delim[1] and self._extspec.can_close:
self._cntbraket -= 1
if self._cntbraket < 0:
raise ValueError("Unbalanced or nested brackets "
"have been detected.")
self.close()
return
self._extspec.append(ch)
return
[docs]class FileExtMaskInfo(object):
"""
A class that holds image, dq, user masks, and extensions to be used with
these files. It is designed to facilitate keeping track of user input in
catalog files.
This class is intended to be used primarily for functions such as
:py:func:`parse_at_line` and other related functions as a return value.
It is also used to initialize :py:class:`skypac.skyline.SkyLine` objects.
`FileExtMaskInfo` was designed to be used in a specific ordered
workflow. Below is a typical use of this class:
#. Initialize the object with the desired settings for default
extensions to be used with the files (when a specific extension
for a file is not provided) and the open modes for the files;
#. Add image file using :py:meth:`image`;
#. Add image's extension(s) using :py:meth:`append_ext`;
#. [Optional; can be performed at any **later** stage] Add DQ file and
extensions using :py:meth:`DQimage` and :py:meth:`dqext` methods;
#. Append mask files and extensions using :py:meth:`append_mask`;
#. [Optional] Finalize the :py:meth:`FileExtMaskInfo` object.
Parameters
----------
default_ext: int, tuple, optional
Default extension to be used with image files that to not have
an extension specified.
default_mask_ext: int, tuple, optional
Default extension to be used with image mask files that to not have
an extension specified.
clobber: bool, optional
If a file being appended is in GEIS or WAIVER FITS format, should
any existing MEF files be overwritten?
doNotOpenDQ: bool, optional
Should the DQ files be oppened when simultaneously with the image
files?
fnamesOnly: bool, optional
Return file names only, or open the files and return
:py:class:`~skypac.utils.ImageRef` objects?
im_fmode: str, optional
File mode to be used to open image FITS file.
See `astropy.io.fits.open` for more details.
dq_fmode: str, optional
File mode to be used to open DQ FITS file. This is valid only if the
DQ model of the image file is 'external' (see documentation for
:py:class:`~skypac.utils.ImageRef` for more details). For 'intrinsic'
DQ model the DQ files will use the same setting as for ``im_fmode``.
msk_fmode: str, optional
File mode to be used to open mask files.
Attributes
----------
clobber: bool
If a file being appended is in GEIS or WAIVER FITS format, should
any existing MEF files be overwritten?
dq_bits: int
Bitmask specifying what pixels in the mask should be removed
(or kept) with the precise interpretation being left to the user.
This flag is not used by this class but was designed to be
passed to other functions that will use :py:class:`FileExtMaskInfo`.
"""
def __init__(self, default_ext=('SCI', '*'), default_mask_ext=0,
clobber=False, doNotOpenDQ=False, fnamesOnly=False,
im_fmode='update', dq_fmode='readonly', msk_fmode='readonly'):
if default_ext is not None:
self._verify_ext(default_ext)
self._verify_ext(default_mask_ext)
if fnamesOnly:
self._im = None
self._dq = None
else:
self._im = ImageRef(None)
self._dq = ImageRef(None)
self._defext = default_ext
self._defmext = default_mask_ext
self._fnamesOnly = fnamesOnly
self._dontopenDQ = doNotOpenDQ
self.clobber = clobber
self.dq_bits = None # can be set from "outside" if needed
self._im_fmode = im_fmode
self._dq_fmode = dq_fmode
self._msk_fmode = msk_fmode
self.clear_ext()
def __copy__(self):
return deepcopy(self)
def clear_ext(self):
# a list to hold file extensions associated with the "main" file:
self._fext = []
# a list to hold file extensions associated with the "DQ" file:
self._dqext = []
# a list to hold file extensions associated with the mask file:
self._maskext = []
#self._maskfname = [] # a list to hold mask file names
# ImageRef list for mask files
self._maskimg = []
# file signatures (store to avoid duplicate ImageRef objects), e.g.,
# file 'stat'
self._filesig = []
# number of extensions (keys in the dictonary) associated with the
# "main" file:
self._nfext = 0
# number of mask files (and extensions) associated with the "main"
# file. _nmask <= _nfext !!!
self._nmask = 0
self._finalized = False
[docs] def clear_masks(self):
"""
Remove all attached mask files and extensions.
"""
if not self._fnamesOnly:
for im in self._maskimg:
if im is not None:
im.release()
self._maskext = []
self._maskimg = []
self._filesig = self._filesig[:2]
self._nmask = 0
self._finalized = False
@property
def count(self):
"""
Number of extensions associated with the image file.
"""
return self._nfext
@property
def finalized(self):
"""
Is the `FileExtMaskInfo` object finalized?
"""
return self._finalized
@property
def fnamesOnly(self):
"""
Was the `FileExtMaskInfo` initialized to return file names or the
:py:class:`~skypac.utils.ImageRef` objects?
"""
return self._fnamesOnly
@property
def imfstat(self):
#"""
#`stat` of the image file.
#
#"""
if len(self._filesig) > 0 and self._filesig[0] is not None:
return self._filesig[0]
return None
@property
def original_image_fname(self):
return 'UNKNOWN' if self._im is None else self._im.original_fname
@property
def image(self):
"""
Image file name or the associated :py:class:`~skypac.utils.ImageRef`
object (depending on the ``fnamesOnly`` value).
:getter: Get the :py:class:`~skypac.utils.ImageRef` image object.
:setter: Set the image file.
:type: str, ImageRef, None
.. note::
Setting the image will re-initialize ``FileExtMaskInfo``. All
previous settings will be lost and previously attached files
will be released/deleted.
"""
return self._im
@image.setter
def image(self, img):
"""
Set the image file.
Parameters
----------
img: ImageRef, str, None
New image file to be attached to ``FileExtMaskInfo`` object.
.. note::
Setting the image will re-initialize ``FileExtMaskInfo``. All
previous settings will be lost and previously attached files
will be released/deleted.
"""
# NOTE: if image is None this will effectively
# initialize the object anew
filesig = []
if img is None:
if self._fnamesOnly:
self._im = None
self._dq = None
else:
self._im.release()
self._dq.release()
self._im = ImageRef()
self._dq = ImageRef()
elif isinstance(img, str):
self._verify_fname(img)
if self._fnamesOnly:
self._im = img
self._dq = None
if os.path.isfile(img):
filesig.append(_Stat(img))
else:
filesig.append(None)
filesig.append(None) # <- DQ image
else:
(im, dq) = openImageEx(
img,
mode=self._im_fmode,
dqmode=self._dq_fmode,
memmap=False, saveAsMEF=True,
output_base_fitsname=None,
clobber=self.clobber,
imageOnly=self._dontopenDQ,
openImageHDU=True,
openDQHDU=not self._dontopenDQ,
preferMEF=True, verbose=True
)
self._im.release()
self._dq.release()
self._im = im
self._dq = dq
filesig.append(_Stat(im.original_fname))
if dq.original_fname is None or not dq.original_exists:
filesig.append(None)
else:
filesig.append(_Stat(dq.original_fname))
elif isinstance(img, ImageRef):
# This is designed to hold imageObjects from astrodrizzle
# and DQ mask will be combined with static masks and user masks
# in astrodrizzle. This is not designed to work as a stand-alone
# app.
assert(not img.closed and not self._fnamesOnly)
self._im.release()
self._dq.release()
self._im = img
self._im.hold()
# as it has been mentioned above, DQ will be dealt with in
# astrodrizzle. However, if DQ is needed here, then look for
# the DQ file name self._im.hdu[1].dqfile. For now,
# provide a dummy DQ image:
self._dq = ImageRef(None)
filesig.append(_Stat(img.original_fname))
filesig.append(None) # <- DQ image
else:
raise TypeError('Image can be None, a valid file name (string), '
'or an *open* ImageRef object.')
self.clear_ext()
self._filesig = filesig
@property
def DQimage(self):
"""
DQ image (file or :py:class:`~skypac.utils.ImageRef`
object depending on the ``fnamesOnly`` value).
:getter: Get the :py:class:`~skypac.utils.ImageRef` DQ image object.
:setter: Set the DQ file.
:type: str, ImageRef
"""
return self._dq
@DQimage.setter
def DQimage(self, dq):
"""
Set DQ image.
"""
if self._im is None:
raise AssertionError("DQ image cannot be set if science image "
"has not been previously set.")
if (self._fnamesOnly and isinstance(dq, str)):
assert(isinstance(self._dq, str)) # DEBUG
self._dq = dq
self._dqext = []
elif (not self._fnamesOnly and isinstance(dq, ImageRef)):
assert(isinstance(self._dq, ImageRef)) # DEBUG
if self._im.DQ_model is None:
raise ValueError("Cannot set DQ image when DQ model of the "
"science image is 'None'.")
self._dq.release()
self._dq = dq
self._dq.hold()
dqext = self._find_DQ_extensions()
self._dqext = dqext
dq_stat = _Stat(self._dq.original_fname)
if len(self._filesig) > 1:
self._filesig[1] = dq_stat
elif len(self._filesig) == 1:
self._filesig.append(dq_stat)
else:
raise RuntimeError("File stat list has an unexpected length."
"Suspected a logical error in the code.")
else:
raise TypeError("Type of the DQ image does not match the "
"value of the 'fnamesOnly' parameter.")
@property
def fext(self):
"""
FITS extensions associated with the image file.
"""
return self._fext
@property
def dqext(self):
"""
FITS extensions associated with the DQ file.
"""
return self._dqext
@dqext.setter
def dqext(self, dqext):
"""
Set DQ extensions.
Parameters
----------
dqext: list
A list of extension specifications to be used with DQ image.
Must be of the same length as the number of extensions
set for the image file (see :py:attr:`count`).
"""
if len(dqext) != self._nfext:
raise ValueError("Length of the DQ extension list must be "
"equal to the length of the science image "
"extension list")
# theck that there are no wildcard characters:
for ext in dqext:
if isinstance(ext, str) or \
(isinstance(ext, tuple) and not isinstance(ext[1], int)):
raise ValueError("DQ extension list may not contain "
"wildcard type extensions.")
self._dqext = deepcopy(dqext)
return
@property
def maskext(self):
"""
FITS extensions associated with the mask files.
"""
return self._maskext
@property
def mask_images(self):
"""
Mask image file names or :py:class:`~skypac.utils.ImageRef`
object depending on the ``fnamesOnly`` value.
"""
return self._maskimg
@staticmethod
def _exnumber2extnv(hdulist, extnum):
try:
hdr = hdulist[extnum].header
extname = hdr['EXTNAME']
extver = hdr['EXTVER']
return (extname, extver)
except Exception:
return None
def _find_DQ_extensions(self):
assert(self._im is not None and self._dq is not None)
if self.fnamesOnly or self._im.DQ_model is None:
return []
if self._im.extname is None or self._dq.extname is None:
return []
dqext = []
for ext in self.fext:
if isinstance(ext, int):
if self._im.DQ_model == 'intrinsic':
# step 1: convert integer extensions to tuples
# (extname, extver)
# (necessary for DQ_model='intrinsic')
tuple_ext = self._exnumber2extnv(self._im.hdu, ext)
if tuple_ext is None:
raise RuntimeError("Unable to compute DQ extensions.")
# step 2: Replace 'SCI' extension names in tuples with 'DQ'
# extension names:
dqext.append((self._dq.extname, tuple_ext[1]))
else:
# nothing to do when ext is 'ini' and DQ_model=='extrinsic'
dqext.append(ext)
else:
# ext is a tuple:
# Replace 'SCI' extension names in tuples with 'DQ'
# extension names:
dqext.append((self._dq.extname, ext[1]))
return dqext
@staticmethod
def _verify_fname(fname):
if not (fname is None or isinstance(fname, str)):
raise ValueError("File name must be a valid string.")
# _verify_ext returns True if an extension has wildcards and False if it
# is already "well" defined. It will raise ValueError if the extension
# does not have the correct format: int, or (str, int), or '*'
@classmethod
def _verify_ext(cls, ext):
if isinstance(ext, int):
return False
if isinstance(ext, str) and ext == '*':
return True
if isinstance(ext, tuple) and len(ext) == 2 and \
isinstance(ext[0], str):
if isinstance(ext[1], int):
return False
elif isinstance(ext[1], str) and ext[1] == '*':
return True
raise ValueError("Invalid extension specification: {}.".format(ext))
def _expand_wildcard_ext(self, hdulist, ext):
if isinstance(ext, tuple) and ext[1] == '*':
expext = get_ext_list(hdulist, extname=ext[0])
elif isinstance(ext, str) and ext == '*':
# any "image-like" ext when default_extname=None:
expext = get_ext_list(hdulist, extname=None)
else:
expext = [ext]
return expext
[docs] def append_ext(self, ext):
"""
*Append* extensions to the list of "selected" extensions for the
image file.
.. note::
This function *appends* the extensions. If it is desired
to *set* the extensions, use :py:meth:`replace_ext` instead.
Parameters
----------
ext: int, tuple, None, list
Extension specification: None, an integer extension *number*,
a tuple (*extension number*, *extension version*)
where extension version can be ``'*'`` which will be replaced with
the extension versions of all extensions having given extension
name. If ext is None, it will be replaced with the default
extension specification set during the initialization of the
:py:class:`FileExtMaskInfo` object.
"""
assert(not self.finalized)
try:
if (ext is not None and
((not self._fnamesOnly and self._im.hdu is None) or
(self._fnamesOnly and self._im is None))):
raise RuntimeError(
"Cannot add extensions to a FileExtMaskInfo object that "
"has not been assigned a file name."
)
if isinstance(ext, list):
for e in ext:
self.append_ext(e)
return
if self._fnamesOnly:
if ext is None:
extlist = [self._defext]
else:
self._verify_ext(ext)
extlist = [ext]
else:
if ext is None:
extlist = self._expand_wildcard_ext(
self._im.hdu, self._defext
)
else:
self._verify_ext(ext)
extlist = self._expand_wildcard_ext(self._im.hdu, ext)
self._fext += extlist
self._nfext += len(extlist)
except Exception as e:
self._im.release()
self._dq.release()
raise e
[docs] def replace_fext(self, ext):
"""
Replace/set image file extension list.
See Also
--------
append_ext
"""
assert(not self.finalized)
self._nfext = 0
self._fext = []
self.append_ext(ext=ext)
if self._im.DQ_model is not None and self._dq is not None and \
not self._dq.closed and not self._fnamesOnly:
self._dqext = self._find_DQ_extensions()
[docs] def append_mask(self, mask, ext, mask_stat=None):
"""
Append a mask image and its extension(s).
.. note::
Mask files and extensions are kept in ordered lists and their
order is significant: the first mask file-extension pair is
associated with the first extension of the science image file
set with :py:meth:`append_ext` and so on.
Parameters
----------
mask: str, ImageRef, None
Mask image file. Can be a string file name,
an :py:class:`~skypac.utils.ImageRef` object (*only* if
:py:attr:`fnamesOnly` is `False`), or `None` (to act as a place
holder in the ordered list of extensions).
ext: int, tuple, None, list
Extension specification: None, an integer extension *number*,
a tuple (*extension number*, *extension version*)
where extension version can be ``'*'`` which will be replaced with
the extension versions of all extensions having given extension
name. If ext is None, it will be replaced with the default
extension specification for mask images set during
the initialization of the :py:class:`FileExtMaskInfo` object.
mask_stat: `os.stat_result`, optional
An `os.stat_result` structure for the input ``mask`` file.
If `None`, then :py:meth:`append_mask` will compute `stat` for
the input ``mask`` file.
Raises
------
RuntimeError
Raised if attempting to add masks when the science image
was not yet set.
AssertionError
Raised if :py:attr:`finalized` is `True`.
TypeError
Raised if `mask` is an :py:class:`~skypac.utils.ImageRef`
object but :py:attr:`fnamesOnly` is `True` or if ``mask``
argument is of incorrect type.
ValueError
If ``mask`` is an :py:class:`~skypac.utils.ImageRef`, it must
*not* be closed.
"""
assert(not self.finalized)
if (not self._fnamesOnly and self._im.hdu is None) or \
(self._fnamesOnly and not self._im):
raise RuntimeError("Cannot add mask and extensions to a "
"FileExtMaskInfo object that has not "
"been assigned a file name.")
if isinstance(ext, list):
for e in ext:
self.append_mask(mask, e)
return
if mask is None or mask == '':
self._maskimg.append(None if self._fnamesOnly else ImageRef(None))
self._filesig.append(None)
self._maskext.append(None)
self._nmask += 1
return
if isinstance(mask, str):
if self._fnamesOnly:
# identification of identical files does not make sense for
# file names only.
self._maskimg.append(mask)
if ext is None:
self._maskext.append(self._defmext)
else:
# next statement raise exception if non-compliant extension
self._verify_ext(ext)
self._maskext.append(ext)
self._nmask += 1
return
# However, if we are going to return
# ImageRef objects, it will be more efficient to return
# the same ImageRef object with accordingly increased
# reference count. Thus we will avoid opening (a time-consuming
# operation) the same FITS file multiple times.
if mask_stat is None:
stat = _Stat(mask)
else:
stat = deepcopy(mask_stat)
findex = None
for i in range(len(self._filesig)):
if self._filesig[i] is None:
continue
if _sameStat(self._filesig[i], stat):
findex = i
break
if findex is not None:
#findex = self._filesig.index(stat)
if findex == 0:
mask = self._im
elif findex == 1:
mask = self._dq
else:
mask = self._maskimg[findex - 2]
mask.hold()
self._filesig.append(stat)
else:
try:
mask, dq = openImageEx(
mask, mode=self._msk_fmode, memmap=False,
saveAsMEF=True, clobber=self.clobber,
imageOnly=True, openImageHDU=True, openDQHDU=False,
preferMEF=True, verbose=False
)
mask.DQ_model = None
self._filesig.append(stat)
except IOError:
raise IOError("Unable to open file: '{:s}'".format(mask))
elif isinstance(mask, ImageRef):
if self._fnamesOnly:
raise TypeError(
"Cannot set an ImageRef mask when "
"FileExtMaskInfo was initialized with fnamesOnly=True."
)
if mask.closed or mask.original_fname is None:
raise ValueError(
"ImageRef mask must not be closed and "
"have a valid 'original_fname' attribute."
)
mask.hold()
if mask_stat is None:
stat = _Stat(mask.original_fname)
else:
stat = deepcopy(mask_stat)
self._filesig.append(stat)
else:
raise TypeError("Argument 'mask' can be a string file name, "
"ImageRef type object, None, or a list "
"with elements of string type or None.")
if self._fnamesOnly:
if ext is None:
extlist = [self._defmext]
else:
self._verify_ext(ext)
extlist = [ext]
else:
if ext is None:
extlist = self._expand_wildcard_ext(mask.hdu, self._defmext)
else:
self._verify_ext(ext)
extlist = self._expand_wildcard_ext(mask.hdu, ext)
nskip = len(extlist)
self._maskimg += nskip * [mask]
for i in range(nskip - 1):
mask.hold()
self._maskext += extlist
self._nmask += nskip
[docs] def finalize(self, toImageRef=False):
"""
Finalize the object by trimming or extending mask image lists to
match the number of science image extensions.
In principle, the number of mask files and their extensions need not
be equal to the number of extensions specified for the science image.
If the number of masks/extensions is smaller than the number of
science extensions, the list of mask extensions will be appended with
None (if :py:attr:`fnamesOnly` is `True`) or dummy
:py:class:`~skypac.utils.ImageRef` (if :py:attr:`fnamesOnly` is
`False`) until the number of mask extensions is equal to the number
of science image extensions. If the number of mask extensions is
larger than the number of science image extensions, the list
of mask extensions will be trimmed to match the number of science
image extensions. The trimmed out mask files (if represented by
:py:class:`~skypac.utils.ImageRef`) will be "released".
"""
if self.finalized:
return
if self._fnamesOnly:
if toImageRef:
self.convert2ImageRef()
else:
# nothing to do if only file names are requested: we cannot
# verify or expand wildcard extensions if we do not open
# FITS images.
return
# make mask (fname & ext) of the same length as the "main" image
# extension list:
delta = self._nfext - self._nmask
if delta > 0:
elem = delta * [None]
imgelem = delta * [ImageRef()]
self._maskimg += imgelem
self._maskext += elem
elif delta < 0:
for m in self._maskimg[self._nfext:]:
m.release()
self._maskimg = self._maskimg[:self._nfext]
self._maskext = self._maskext[:self._nfext]
if self._im.DQ_model is not None and self._dq is not None and \
not self._dq.closed and not self._fnamesOnly:
self._dqext = self._find_DQ_extensions()
self._nmask = self._nfext
self._finalized = True
[docs] def convert2ImageRef(self):
"""
Replace any existing file names with opened
:py:class:`~skypac.utils.ImageRef`
objects and change the :py:attr:`fnamesOnly` property to `False`.
.. note::
The :py:attr:`finalized` property will not be modified.
.. warning::
The :py:class:`FileExtMaskInfo` must not have been finalized
(:py:attr:`finalized` is `False`) and must contain file
names only (:py:attr:`fnamesOnly` is `True`).
Raises
------
AssertionError
Raised if :py:attr:`finalized` is `True` or
:py:attr:`fnamesOnly` is `False`.
See Also
--------
release_all_images
"""
assert(not self.finalized and self._fnamesOnly)
# create a new FileExtMaskInfo object with fnamesOnly=False
# (this should allow it to take care of opening files and identifying
# identical files):
newfi = FileExtMaskInfo(
default_ext=self._defext,
default_mask_ext=self._defmext,
clobber=self.clobber,
doNotOpenDQ=self._dontopenDQ,
fnamesOnly=False,
im_fmode=self._im_fmode,
dq_fmode=self._dq_fmode,
msk_fmode=self._msk_fmode
)
# populate it's file and extension lists:
newfi.image = self._im
newfi.append_ext(self._fext)
for i in range(self._nmask):
newfi.append_mask(mask=self._maskimg[i], ext=self._maskext[i])
# copy attributes of the new ofject to self:
self.__dict__ = copy(newfi.__dict__)
del newfi
[docs] def release_all_images(self):
"""
Release all images if ``fnamesOnly`` is `False` and replace any
existing `~skypac.utils.ImageRef` with their *original* file names.
.. note::
This will set the :py:attr:`fnamesOnly` property to `True` and
the :py:attr:`finalized` property to `False.`
See Also
--------
convert2ImageRef
"""
if self._fnamesOnly:
return
if isinstance(self._im, ImageRef):
self._im.release()
self._im = self._im.original_fname
if isinstance(self._dq, ImageRef):
self._dq.release()
self._dq = self._dq.original_fname
newmasklist = []
for m in self._maskimg:
if isinstance(m, ImageRef):
m.release()
newmasklist.append(m.original_fname)
else:
newmasklist.append(m)
self._maskimg = newmasklist
self._fnamesOnly = True
self._finalized = False
[docs] def info(self):
"""
Print information about the state of the object.
"""
print("\n--- FileExtMaskInfo: ---")
if self._fnamesOnly:
print("Image File name: {}".format(self._im))
print("Image File extensions: {}".format(self.fext))
print("DQ File name: {}".format(self._dq))
for i in range(self._nmask):
print("Mask File: {} \tExtension: {}"
.format(self._maskimg[i], self._maskext[i]))
else:
print("Image File name: {}".format(self._im.original_fname))
print("Image File extensions: {}".format(self.fext))
print("DQ File name: {}".format(self._dq.original_fname))
for i in range(self._nmask):
print(
"Mask File: {} \tExtension: {}".format(
self._maskimg[i].original_fname, self._maskext[i]
)
)
[docs]def parse_at_line(fstring, default_ext=('SCI', '*'), default_mask_ext=0,
clobber=False, fnamesOnly=False, doNotOpenDQ=False,
im_fmode='update', dq_fmode='readonly', msk_fmode='readonly',
verbose=False, _main_with_nchar=False, _external_flist=None):
r"""
parse_at_line(fstring, default_ext=('SCI', '*'), default_mask_ext=0, \
clobber=False, fnamesOnly=False, doNotOpenDQ=False,\
im_fmode='update', dq_fmode='readonly', msk_fmode='readonly',verbose=False)
Parse a line from a catalog file containing a science image file
and extension specifications and optionally followed by a
comma-separated list of mask files and extension specifications
(or `None`).
File names will be stripped of leading and trailing white spaces. If it
is essential to keep these spaces, file names may be enclosed in single
or double quotation marks. Quotation marks may also be required when file
names contain special characters used to separate file names and
extension specifications: ``,[]{}``
Extension specifications must follow the file name and must be delimited
by either square or curly brackets. Curly brackets allow specifying
multiple comma-separated extensions: integer extension numbers and/or
tuples ('ext name', ext version).
Some possible ways of specifying extensions:
[1] -- extension number
['sci',2] -- extension name and version
{1,4,('sci',3)} -- multiple extension specifications, including tuples
{('sci',*)} -- wildcard extension versions (i.e., all extensions with
extension name 'sci')
['sci'] -- equivalent to ['sci', 1]
{'sci'} -- equivalent to {('sci',*)}
For extensions in the science image for which no mask file is provided,
the corresponding mask file names may be omitted (but a comma must still
be used to show that no mask is provided in that position) or ``None`` can
be used in place of the file name. NOTE: ``'None'`` (in quotation marks)
will be interpreted as a file named ``'None'``.
Some examples of possible user input:
``image1.fits{1,2,('sci',3)},mask1.fits,,mask3.fits[0]``
In this case:
``image1.fits[1]`` is associated with ``mask1.fits[0]``;
``image1.fits[2]`` does not have an associated mask;
``image1.fits['sci',3]`` is associated with ``mask3.fits[0]``.
-- Assume ``image2.fits`` has 4 'SCI' extensions:
``image2.fits{'sci'},None,,mask3.fits``
In this case:
``image2.fits['sci', 1]`` and ``image2.fits['sci', 2]`` **and**
``image2.fits['sci', 4]`` do not have an associated mask;
``image2.fits['sci', 3]`` is associated with ``mask3.fits[0]``
.. note::
Wildcard extension version in extension specification can be
expanded *only* when ``fnamesOnly`` is `False`.
Parameters
----------
fstring: str
A comma-separated string describing the image file name
and (optionally) followed by the extension specifier
(e.g., [sci,1,2], or [sci]). The image file name may be followed
(comma-separated) by optional mask file names (and their extension
specifier).
File and extension names may NOT contain leading and/or trailing
spaces, commas, and/or square or curly brakets.
default_ext: int, tuple, optional
Default extension to be used with image files that to not have
an extension specified.
default_mask_ext: int, tuple, optional
Default extension to be used with image mask files that to not have
an extension specified.
fnamesOnly: bool, optional
Return file names only, or open the files and return
:py:class:`~skypac.utils.ImageRef` objects?
doNotOpenDQ: bool, optional
Should the DQ files be oppened when simultaneously with the image
files?
im_fmode: str, optional
File mode to be used to open image FITS file.
See `astropy.io.fits.open` for more details.
dq_fmode: str, optional
File mode to be used to open DQ FITS file. This is valid only if the
DQ model of the image file is 'external' (see documentation for
:py:class:`~skypac.utils.ImageRef` for more details). For 'intrinsic'
DQ model the DQ files will use the same setting as for ``im_fmode``.
msk_fmode: str, optional
File mode to be used to open mask files.
verbose: bool, optional
Specifies whether to print warning messages.
Raises
------
ValueError
* Input argument 'fstring' must be a Python string.
* Input argument 'fstring' contains either unbalanced
or nested square brackets.
* Extension specification must be preceeded by a valid image file name.
Returns
-------
FileExtMaskInfo
A :py:class:`FileExtMaskInfo` object.
"""
if not isinstance(fstring, str):
raise ValueError("Input argument 'fstring' must be a Python string.")
# 0 - main file, 1 - main file extension
# 2 - mask file, 3 - mask file extension:
current_item = 0
fname = CharAccumulator()
sext = ExtSpec()
mext = MultiExtSpec()
finfo = FileExtMaskInfo(
default_ext=default_ext, default_mask_ext=default_mask_ext,
clobber=clobber, fnamesOnly=fnamesOnly, doNotOpenDQ=doNotOpenDQ,
im_fmode=im_fmode, dq_fmode=dq_fmode, msk_fmode=msk_fmode
)
closing_bracket = '' # noqa: F841
current_fname = 'Unknown'
current_ext = None
nchar = 0
# this will be set only if a previous image was already openned:
dq_image = None
avoid_duplicate_open_files = _external_flist is not None and \
len(_external_flist) > 0 and not fnamesOnly
for ch in fstring:
nchar += 1
if current_item in [0, 2]:
if fname.closed or not fname.quoted:
if ch == ',':
# done parsing a file name without extension specifiers:
fname.close()
# check if file is a 'None' (or not provided):
strfname = str(fname)
if not fname.quoted:
strfname = strfname.upper()
if strfname == 'NONE' or strfname == '':
strfname = None
if (not fname.dirty or strfname is None) and \
current_item == 0:
# "main" image cannot be None!
finfo.release_all_images()
raise ValueError("Image file name must be a vaild "
"string.")
closing_bracket = '' # noqa: F841
if current_item == 0:
sfname = str(fname)
found = None
if avoid_duplicate_open_files:
# look through the external list of FileExtMaskInfo
# objects to see if this file was already opened:
stat_fname = _Stat(sfname)
for f in _external_flist:
if f.fnamesOnly:
continue
if _sameStat(f.imfstat, stat_fname):
found = f
break
if found is None:
finfo.image = sfname
else:
finfo.image = f.image
# we cannot set dq image yet as we do not have
# extensions parsed. Set it delayed.
dq_image = f.DQimage
current_fname = sfname
finfo.append_ext(None)
if _main_with_nchar:
fname.reset()
break
current_item = 2
else:
if strfname is None:
current_fname = None
else:
current_fname = str(fname)
finfo.append_mask(current_fname, None)
fname.reset()
# else current_item will be unchanged (<-parsing masks)
continue
elif ch == '[':
if not fname.dirty:
msg = "Extension specification must be preceeded " \
"by a valid image file name."
if current_item == 2:
msg += " Use curly brackets to enclose " \
"multiple extension specifiers."
finfo.release_all_images()
raise ValueError(msg)
fname.close()
current_fname = str(fname)
if current_item == 0:
found = None
if avoid_duplicate_open_files:
# look through the external list of FileExtMaskInfo
# objects to see if this file was already opened:
stat_fname = _Stat(current_fname)
for f in _external_flist:
if f.fnamesOnly:
continue
if _sameStat(f.imfstat, stat_fname):
found = f
break
if found is None:
finfo.image = current_fname
else:
finfo.image = f.image
# we cannot set dq image yet as we do not have
# extensions parsed. Set it delayed.
dq_image = f.DQimage
closing_bracket = ']' # noqa: F841
current_item = 1 if current_item == 0 else 3
current_ext = sext
current_ext.reset(fname=current_fname)
current_ext.append(ch)
continue
elif ch == '{':
if not fname.dirty:
msg = "Extension specification must be preceeded " \
"by a valid image file name."
if current_item == 2:
msg += " Use curly brackets to enclose " \
"multiple extension specifiers."
finfo.release_all_images()
raise ValueError(msg)
fname.close()
current_fname = str(fname)
if current_item == 0:
found = None
if avoid_duplicate_open_files:
# look through the external list of FileExtMaskInfo
# objects to see if this file was already opened:
stat_fname = _Stat(current_fname)
for f in _external_flist:
if f.fnamesOnly:
continue
if _sameStat(f.imfstat, stat_fname):
found = f
break
if found is None:
finfo.image = current_fname
else:
finfo.image = f.image
# we cannot set dq image yet as we do not have
# extensions parsed. Set it delayed:
dq_image = f.DQimage
closing_bracket = '}' # noqa: F841
current_item = 1 if current_item == 0 else 3
current_ext = mext
current_ext.reset(fname=current_fname)
current_ext.append(ch)
continue
elif ch in [']', '}']:
finfo.release_all_images()
raise ValueError("Misplaced, unbalanced, or nested "
"brackets have been detected.")
fname.append(ch)
else:
if ch == ',' and current_ext.closed:
# done parsing an extension specifier:
if current_item == 1:
finfo.append_ext(current_ext.ext)
if _main_with_nchar:
current_ext.reset(fname='Unknown')
fname.reset()
break
else:
finfo.append_mask(current_fname, current_ext.ext)
closing_bracket = '' # noqa: F841
current_fname = ''
current_item = 2
current_ext.reset(fname='Unknown')
fname.reset()
continue
current_ext.append(ch)
if current_item == 0:
if fname.dirty:
fname.close()
sfname = str(fname)
found = None
if avoid_duplicate_open_files:
# look through the external list of FileExtMaskInfo
# objects to see if this file was already opened:
stat_fname = _Stat(sfname)
for f in _external_flist:
if f.fnamesOnly:
continue
if _sameStat(f.imfstat, stat_fname):
found = f
break
if found is None:
finfo.image = sfname
else:
finfo.image = f.image
# we cannot set dq image yet as we do not have extensions
# parsed. Set it delayed.
dq_image = f.DQimage
finfo.append_ext(None)
elif current_item == 1:
if current_ext.dirty:
current_ext.close()
finfo.append_ext(current_ext.ext)
elif current_item == 2:
if fname.dirty:
fname.close()
finfo.append_mask(str(fname), None)
elif current_item == 3:
if current_ext.dirty:
current_ext.close()
finfo.append_mask(current_fname, current_ext.ext)
if dq_image is not None:
finfo.DQimage = dq_image
finfo.finalize()
if _main_with_nchar:
return(finfo, nchar)
else:
return finfo
# This is a simple version of the comma-separated file list parser
# that does not allow ASN tables or wild-card characters.
# USE parse_cs_line below instead.
#
#TODO: remove after the new version is extensively verified to work as expected
#
def _parse_cs_line(csline, default_ext=('SCI', '*'), clobber=False,
fnamesOnly=False, doNotOpenDQ=False, im_fmode='update',
dq_fmode='readonly', msk_fmode='readonly', logfile=None,
verbose=False):
"""
This function is similar to :py:func:`parse_at_line`, the main
difference being the content of the input string: a list of
comma-separated *science* image file names. No masks can be specified and
file names must be valid (i.e., None is not allowed). Extension
specifications are allowed and must folow the same sintax as
described for :py:func:`parse_at_line`.
Below we describe only differences bewtween this function and
:py:func:`parse_at_line`.
Parameters
----------
csline: str
Comma-separated list of valid science image file names and
extension specifications.
logfile: str, file, MultiFileLog, None, optional
Specifies the log file to which the messages should be printed.
It can be a file name, a file object, a ``MultiFileLog`` object, or
`None`.
Returns
-------
list
Returns a list of filenames if ``fnamesOnly`` is `True` or a list of
:py:class:`FileExtMaskInfo` objects if ``fnamesOnly`` is `False`.
"""
# Set-up log files:
if isinstance(logfile, MultiFileLog):
ml = logfile
else:
ml = MultiFileLog(console=verbose)
if logfile not in ('', None, sys.stdout):
ml.add_logfile(logfile)
csl = csline.strip()
fi = []
try:
while len(csl) > 0:
f, nch = parse_at_line(
csl, default_ext=default_ext, default_mask_ext=0,
clobber=clobber, fnamesOnly=fnamesOnly,
doNotOpenDQ=doNotOpenDQ, im_fmode=im_fmode, dq_fmode=dq_fmode,
msk_fmode=msk_fmode, verbose=verbose, _main_with_nchar=True
)
fi.append(f)
csl = csl[nch:]
except Exception as e:
ml.error("Unable to parse input comma-separated file name list "
"'{0}...'\n"
" Reported error: \"{1}\".\n",
csl[:20], sys.exc_info()[1])
raise e
finally:
ml.close()
return fi
[docs]def parse_cs_line(csline, default_ext=('SCI', '*'), clobber=False,
fnamesOnly=False, doNotOpenDQ=False, im_fmode='update',
dq_fmode='readonly', msk_fmode='readonly', logfile=None,
verbose=False):
"""
This function is similar to :py:func:`parse_at_line`, the main
difference being the content of the input string: a list of
comma-separated *science* image file names. No masks can be specified and
file names must be valid (i.e., `None` is not allowed). Extension
specifications are allowed and must folow the same sintax as
described for :py:func:`parse_at_line`.
Below we describe only differences bewtween this function and
:py:func:`parse_at_line`.
Parameters
----------
csline: str
User input string that needs to be parsed containing one of the
following:
* a comma-separated list of valid science image file names
(see note below) and (optionally) extension specifications,
e.g.: ``'j1234567q_flt.fits[1], j1234568q_flt.fits[sci,2]'``;
* an @-file name, e.g., ``'@files_to_match.txt'``.
.. note::
**Valid** **science** **image** **file** **names** are:
* file names of existing FITS, GEIS, or WAIVER FITS files;
* partial file names containing wildcard characters, e.g.,
``'*_flt.fits'``;
* Association (ASN) tables (must have ``_asn``, or ``_asc``
suffix), e.g., ``'j12345670_asn.fits'``.
.. warning::
@-file names **MAY** **NOT** be followed by an extension
specification.
.. warning::
If an association table or a partial file name with wildcard
characters is followed by an extension specification, it will be
considered that this extension specification applies to **each**
file name in the association table or **each** file name
obtained after wildcard expansion of the partial file name.
logfile: str, file, MultiFileLog, None, optional
Specifies the log file to which the messages should be printed.
It can be a file name, a file object, a MultiFileLog object, or None.
Returns
-------
list
Returns a list of filenames if ``fnamesOnly`` is `True` or a list of
:py:class:`FileExtMaskInfo` objects if ``fnamesOnly`` is `False`.
"""
# Set-up log files:
if isinstance(logfile, MultiFileLog):
ml = logfile
else:
ml = MultiFileLog(console=verbose)
if logfile not in ('', None, sys.stdout):
ml.add_logfile(logfile)
csl = csline.strip()
# First, check if user specified an @-file:
if csl[0] == '@':
# read the @-file and return:
ml.logentry("Parsing input @-file '{}'.", csl[1:])
return parse_at_file(
csl[1:], default_ext=default_ext,
default_mask_ext=0,
clobber=clobber,
fnamesOnly=fnamesOnly,
doNotOpenDQ=doNotOpenDQ,
match2Images=None,
im_fmode=im_fmode,
dq_fmode=dq_fmode,
msk_fmode=msk_fmode,
logfile=logfile,
verbose=verbose
)
# we expect a comma separated list of files:
ml.logentry("Parsing comma-separated list of input file names.")
# Now, parse input line generating a new FileExtMaskInfo for each
# comma-separated entry. This needs to be performed at first
# with fnamesOnly=True so that we later can expand ASN tables
# or wildcard characters.
fi_list = []
try:
while len(csl) > 0:
f, nch = parse_at_line(
csl, default_ext=default_ext, default_mask_ext=0,
clobber=clobber, fnamesOnly=True, doNotOpenDQ=doNotOpenDQ,
im_fmode=im_fmode, dq_fmode=dq_fmode, msk_fmode=msk_fmode,
verbose=verbose, _main_with_nchar=True
)
fi_list.append(f)
csl = csl[nch:]
except Exception as e:
ml.error("Unable to parse input comma-separated file name list "
"'{0}...'\n Reported error: \"{1}\".\n",
csl[:20], sys.exc_info()[1])
ml.close()
raise e
# Now walk trough each FileExtMaskInfo object and see if the
# file name is an ASN table or has wildcard characters and expand them:
fullfi = []
for fi in fi_list:
# check if it is an ASN table:
if parseinput.checkASN(fi.image):
# The input is an association table
try:
# Open the association table
assocdict = asnutil.readASNTable(
fi.image, None, prodonly=False
)
except Exception:
for f in fullfi:
f.release_all_images()
raise ValueError("Unable to read Association file '{}'."
.format(fi.image))
for fname in assocdict['order']:
newfi = copy(fi)
newfi.image = fileutil.buildRootname(fname)
# next line can be removed if we not want to allow extension
# specifications for ASN tables.
newfi.append_ext(fi.fext)
fullfi.append(newfi)
else:
# expand wildcard characters (if any)
fname = fi.image if osfn is None else osfn(fi.image)
flist = glob.glob(fname)
if flist:
for fname in flist:
newfi = copy(fi)
newfi.image = fname
newfi.append_ext(fi.fext)
fullfi.append(newfi)
else:
# if glob did not return any files, likely 'fi' will not
# be able to open the file. However, we will let it try
# and throw an error if it fails:
fullfi.append(fi)
# finally, convert file *name*-based FileExtMaskInfo objects to
# ImageRef-based:
try:
for fi in fullfi:
if not fnamesOnly:
fi.finalize(toImageRef=True)
except Exception as e:
for f in fullfi:
f.release_all_images()
raise e
return fullfi
[docs]def parse_at_file(fname, default_ext=('SCI', '*'), default_mask_ext=0,
clobber=False, fnamesOnly=False,
doNotOpenDQ=False, match2Images=None,
im_fmode='update', dq_fmode='readonly', msk_fmode='readonly',
logfile=None, verbose=False):
"""
This function is similar to :py:func:`parse_at_line`, the main
difference being that is can parse multiple (EOL terminated) lines
of the format described in the documentation for
:py:func:`parse_at_line`.
Below we describe only differences bewtween this function and
:py:func:`parse_at_line`.
Parameters
----------
fname: str
File name of the catalog file.
match2Images: list of str, list of ImageRef, None, optional
List of file names or ImageRef objects whose mask specifications
are to be read from the catalog file. Mask specifications
for other files in the catalog that do not match the files in the
``match2Images`` list will be ignored. If ``match2Images`` is `None`,
then all files from the catalog will be read.
logfile: str, file, MultiFileLog, None, optional
Specifies the log file to which the messages should be printed.
It can be a file name, a file object, a ``MultiFileLog`` object, or
`None`.
Returns
-------
list
Returns a list of filenames if ``fnamesOnly`` is `True` or a list of
:py:class:`FileExtMaskInfo` objects if ``fnamesOnly`` is `False`.
"""
# Set-up log files:
if isinstance(logfile, MultiFileLog):
ml = logfile
else:
ml = MultiFileLog(console=verbose)
if logfile not in ('', None, sys.stdout):
ml.add_logfile(logfile)
# compute file 'stat' if necessary for file matching
# (we compute it here so that 'parse_at_line' will not have to do
# this for each line)
doMatching = match2Images is not None and isinstance(match2Images, list)
if doMatching:
mstat = []
match2ImagesLen = len(match2Images)
for m in match2Images:
if isinstance(m, ImageRef):
if m.closed:
ml.close()
raise TypeError(
"The ImageRef elements of the 'match2Images' list "
"must not be 'closed'."
)
mstat.append(_Stat(m.original_fname))
elif isinstance(m, str):
mstat.append(_Stat(m))
else:
ml.close()
raise TypeError("Each element of 'match2Images' argument "
"must be either a string file name or "
"an ImageRef object.")
fi = []
try:
fh = open(fname)
except IOError:
ml.close()
raise IOError("Unable to open \"at\" file '{}'.".format(fname))
lines = fh.readlines()
nl = 0
for l in lines:
nl += 1
# skip empty and comment lines:
line = l.strip()
if not line or line[0] == '#':
continue
try:
# create a FileExtMaskInfo for each entry in the @file:
f = parse_at_line(
line, default_ext=default_ext,
default_mask_ext=default_mask_ext, clobber=clobber,
fnamesOnly=fnamesOnly, doNotOpenDQ=doNotOpenDQ,
im_fmode=im_fmode, dq_fmode=dq_fmode, msk_fmode=msk_fmode,
verbose=verbose, _main_with_nchar=False, _external_flist=fi
)
if doMatching:
indx = None
# find a matching image:
for i in range(match2ImagesLen):
if _sameStat(f.imfstat, mstat[i]):
indx = i
break
if indx is None:
# no matching image found... ignore
if f.fnamesOnly:
img_fname = f.image
else:
img_fname = f.image.original_fname
f.release_all_images()
ml.logentry("No matching image was found for the catalog "
"entry '{}'. This entry will be ignored.",
img_fname)
f.release_all_images()
else:
# a match was found. append a tuple of the file info &
# index of the found image
fi.append((f, indx))
else:
fi.append(f)
except Exception:
ml.error("Unable to parse line #{:d}: '{}'...\n"
" Reported error: \"{}\".\n"
" This line will be ignored.",
nl, l[:20], sys.exc_info()[1])
fh.close()
ml.close()
return fi