import sys
import tarfile
from pathlib import Path
from astropy.io import fits
from astropy.table import Table
from ndcube import NDCollection
from sunpy import log
from irispy.io.sji import read_sji_lvl2
from irispy.io.spectrograph import read_spectrograph_lvl2
__all__ = ["fits_info", "read_files"]
def _get_simple_metadata(file):
"""
Get simple metadata from a FITS file.
Parameters
----------
file : `pathlib.Path`
The FITS file to read.
Returns
-------
`tuple`
A tuple containing the instrument name and description.
"""
if not file.name.endswith(".fits") and not file.name.endswith(".fits.gz"):
return "", ""
header = fits.getheader(file)
instrume = header.get("INSTRUME", "")
describe = header.get("TDESC1", "")
return instrume, describe
def _extract_tarfile(filenames):
"""
Extracts a tar file to the same location as the tar file.
Parameters
----------
filenames : `list of str`
The filenames of the tar files to extract.
"""
expanded_files = []
for fname in filenames:
filename = Path(fname)
if tarfile.is_tarfile(filename):
extract_dir = filename.with_suffix("").with_suffix("") # removes .tar.gz or .tar
extract_dir.mkdir(parents=True, exist_ok=True)
with tarfile.open(filename, "r") as tar:
tar.extractall(extract_dir, filter="data")
expanded_files.extend([extract_dir / member.name for member in tar.getmembers() if member.isfile()])
else:
expanded_files.append(filename)
return expanded_files
def _get_spec_group_key(file):
"""
Group spectrograph FITS files that belong to the same observation.
Parameters
----------
file : `pathlib.Path`
The FITS file to inspect.
Returns
-------
`tuple`
Key built from stable observation-identifying header metadata.
Falls back to a per-file key if the header cannot be read or if
the observation-grouping metadata is missing.
"""
try:
header = fits.getheader(file)
except OSError:
return file, None
obsid = header.get("OBSID")
startobs = header.get("STARTOBS")
if obsid is None or not startobs:
return file, None
return obsid, startobs
def _get_spec_return_key(file_group, describe, returns):
key = f"{describe}"
if key not in returns:
return key
group_key = _get_spec_group_key(file_group[0])
obsid, startobs = group_key
if startobs is None:
return f"{describe} ({Path(obsid).stem})"
key = f"{describe} ({obsid})"
if key not in returns:
return key
return f"{describe} ({obsid}, {startobs})"
[docs]
def fits_info(filename: str) -> None:
"""
Prints information about the extension of a raster or SJI level 2 data file.
Parameters
----------
filename : str
Filename to load.
"""
def get_description(idx, idx_mod):
# The last two extensions are reserved for auxiliary data.
auxiliary_extension_indices = [num_extensions - 2, num_extensions - 1]
if idx == 0 and idx_mod == 0:
return "Primary Header (no data)"
if idx not in auxiliary_extension_indices:
text_modifier = (
f" ({header[f'TDET{idx + idx_mod}'][:3]})" if "SPEC" in header[f"TDET{idx + idx_mod}"] else ""
)
return f"{header[f'TDESC{idx + idx_mod}'].replace('_', ' ')} ({header[f'TWMIN{idx + idx_mod}']:.0f} - {header[f'TWMAX{idx + idx_mod}']:.0f} AA{text_modifier})"
return "Auxiliary data"
results = [
f"Filename: {Path(filename).absolute()}",
f"Observation: {fits.getval(filename, 'OBS_DESC')}",
f"OBS ID: {fits.getval(filename, 'OBSID')}",
]
table = Table(
names=("No.", "Name", "Ver", "Type", "Cards", "Dimensions", "Format", "Description"),
dtype=("S8", "S32", "S8", "S16", "S8", "S40", "S16", "U200"),
)
with fits.open(filename) as hdulist:
hdu_info = hdulist.info(output=False)
header = hdulist[0].header
num_extensions = len(hdulist)
for idx in range(num_extensions):
description = get_description(idx, 0 if "SPEC" in header["INSTRUME"] else 1)
hdu_info[idx] = (*hdu_info[idx][:-1], description)
table.add_row(list(map(str, hdu_info[idx])))
sys.stdout.write("\n".join(results) + "\n")
table.pprint()
sys.stdout.flush()
[docs]
def read_files(filenames, *, spectral_windows=None, uncertainty=False, memmap=False, allow_errors=False, **kwargs):
"""
A wrapper function to read any number of raster, SJI or IRIS-aligned AIA data files.
The goal is be able to download an entire IRIS observation and read it
in one go, without having to worry about the type of file.
Parameters
----------
filename : `list` of `str`, `str`, `pathlib.Path`
Filename(s) to load.
spectral_windows: iterable of `str` or `str`
Spectral windows to extract from files. Default=None, implies, extract all
spectral windows.
uncertainty : `bool`, optional
If `True` (not the default), will compute the uncertainty for the data (slower and
uses more memory). If ``memmap=True``, the uncertainty is never computed.
memmap : `bool`, optional
If `True` (not the default), FITS data are opened using Astropy/NumPy
memory mapping rather than being fully read into memory at once. This
can keep memory usage low when working with many files, since array
data are accessed from disk on demand. In this mode FITS image scaling
is disabled, so the returned data are unscaled/raw FITS values rather
than automatically scaled physical values. If ``memmap=True``, the
uncertainty is never computed.
allow_errors : `bool`, optional
Will continue loading the files if one fails to load.
Defaults to `False`.
kwargs : `dict`, optional
Additional keyword arguments to pass to the reader functions.
Returns
-------
`ndcube.NDCollection`
With keys being the value of TDESC1, the values being the cube.
"""
if isinstance(filenames, (str, Path)):
filenames = [filenames]
filenames = sorted(filenames)
filenames = [Path(f) for f in filenames]
returns = {}
spec_groups = {}
for filename in filenames:
try:
sdo_tarfile = bool(filename.name.endswith("SDO.tar.gz"))
raster_tarfile = bool(filename.name.endswith("_raster.tar.gz"))
instrume, describe = _get_simple_metadata(filename)
log.debug(f"Processing file: {filename} with instrume: {instrume}")
if sdo_tarfile or instrume in ["IRIS", "SJI"] or instrume.startswith("AIA"):
file = _extract_tarfile([filename]) if sdo_tarfile else [filename]
for f in file:
instrume, describe = _get_simple_metadata(f)
returns[f"{describe}"] = read_sji_lvl2(f, memmap=memmap, uncertainty=uncertainty, **kwargs)
elif raster_tarfile:
file = _extract_tarfile([filename]) if raster_tarfile else [filename]
instrume, describe = _get_simple_metadata(file[0])
returns[f"{describe}"] = read_spectrograph_lvl2(
file, spectral_windows=spectral_windows, memmap=memmap, uncertainty=uncertainty, **kwargs
)
elif instrume == "SPEC":
group_key = _get_spec_group_key(filename)
spec_groups.setdefault(group_key, []).append(filename)
else:
log.warning(f"INSTRUME: {instrume} was not recognized and not loaded")
except Exception as e:
if allow_errors:
log.warning(f"File {filename} failed to load with {e}")
continue
raise
for file_group in spec_groups.values():
try:
instrume, describe = _get_simple_metadata(file_group[0])
key = _get_spec_return_key(file_group, describe, returns)
returns[key] = read_spectrograph_lvl2(
file_group, spectral_windows=spectral_windows, memmap=memmap, uncertainty=uncertainty, **kwargs
)
except Exception as e:
if allow_errors:
log.warning(f"File group {file_group} failed to load with {e}")
continue
raise
if not returns:
msg = f"No supported IRIS files were loaded from {filenames}."
raise ValueError(msg)
return NDCollection(returns.items()) if len(returns) > 1 else next(iter(returns.values()))