Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions sunkit_spex/fitting/fitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
"""
This module contains functions to carry out astropy fitting with spectral models
"""

import astropy.units as u
from astropy.modeling import fitting
from astropy.modeling import models
from matplotlib import pyplot as plt

import numpy as np

from sunkit_spex.models.physical.thermal import ThermalEmission
from sunkit_spex.models.physical.nonthermal import ThickTarget
from sunkit_spex.models.physical.albedo import Albedo
from sunkit_spex.models.scaling import InverseSquareFluxScaling
from sunkit_spex.models.instrument_response import MatrixModel
# from sunkit_spex.visualisation.plotter import plot

__all__ = ["fitter"]


class Fitter:

def __init__(
self,
model,
spectrum_object,
fitting_method = fitting.TRFLSQFitter(calc_uncertainties=True),
fit_range=None):

self._model = model
self._spectrum_object = spectrum_object
self._fitting_method = fitting_method
self._fit_range = fit_range
self._fitted_model = None
# self._PIPELINE_COMPONENTS = {'SRM', 'Albedo', 'InverseSquareFluxScaling'}

@property
def model(self):
return self._model



def _set_abledo_angle(self):

if 'Albedo' in self.model.submodel_names:

# print(len(self._spectrum_object.meta['ph_axis']))

replacement_albedo = Albedo(energy_edges=self._spectrum_object.meta['ph_axis'],
theta=self._spectrum_object.meta['angle'])
replacement_albedo.theta.fixed = True

self._model = self._model.replace_submodel('Albedo',replacement_albedo)
Comment on lines +44 to +54
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be derived from the Spectrum object e.g. spec.observer_location and spec.flare_location which would be SkyCoords


def _set_observer_distance(self):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derived from spec.observer_location


match = np.where(np.array(self._model.submodel_names)=='InverseSquareFluxScaling')[0]

if np.shape(match) != 0:
param_names = [f'observer_distance_{str(ind)}' for ind in match]

for param_name in param_names:
setattr(self._model, param_name, self._spectrum_object.meta['distance'])
getattr(self._model, param_name).fixed = True

def _set_srm(self):

if 'SRM' in self.model.submodel_names:

self._model = self._model.replace_submodel('SRM',MatrixModel(matrix= np.array(self._spectrum_object.meta['srm']),
spectrum_object=self._spectrum_object,
model_spec_units=u.ph * u.keV**-1 * u.s**-1 * u.cm**-2))
@property
def fitting_method(self):
return self._fitting_method

@fitting_method.setter
def fitting_method(self, value):
self._fitting_method = value

@property
def fitted_model(self):
"""Return the fitted model. None until do_fit() has been called."""
if self._fitted_model is None:
raise RuntimeError("No fitted model available — call do_fit() first.")
return self._fitted_model

@property
def fit_range(self):
return self._fit_range


@fit_range.setter
def fit_range(self, value):
"""
value : tuple
(emin, emax) in same units as spectral_axis
"""

if value is None:
self._fit_range = None
return

emin, emax = value
edges = self._spectrum_object.spectral_axis.bin_edges

# Determine bins fully inside range
lower = edges[:-1]
upper = edges[1:]

indices = np.where((lower >= emin) & (upper <= emax))[0]

self._fit_range = value
self._fit_mask = indices

def _apply_fit_range(self):

if self._fit_range is None:
return

mask = self._fit_mask

self._spectrum_object = self._spectrum_object[mask[0]:mask[-1]+1]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the mask / or subscripting the final data before it goes into the fitter? Also what about the SRM?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SRM is clipped just below this


# print(self._spectrum_object.spectral_axis.bin_edges.shape)

self._spectrum_object.spectral_axis._bin_edges = np.array(self._spectrum_object.spectral_axis.bin_edges[mask[0]:mask[-1]+2])


# print(self._spectrum_object.spectral_axis.bin_edges.shape)

if 'srm' in self._spectrum_object.meta:
self._spectrum_object.meta['srm'] = \
self._spectrum_object.meta['srm'][:,mask[0]:mask[-1]+1]


def _fit_prep(self):

self._apply_fit_range()

self._set_abledo_angle()
self._set_observer_distance()
self._set_srm()



def do_fit(self):


self._fit_prep()


w = np.array(1/self._spectrum_object.uncertainty.array) << self._spectrum_object.uncertainty.unit
data = np.array(self._spectrum_object.data) << self._spectrum_object.unit


# Store on the instance; access via the fitted_model property
self._fitted_model = self._fitting_method(
model=self._model,
x=self._spectrum_object.meta['ph_axis'],
y=data,
weights=w,
estimate_jacobian=True)

# return fitted_model


# def plot_fit_results(self,save=True):


# if save:
# plot(self._spectrum_object.spectral_axis._bin_edges*u.keV,
# self._spectrum_object.meta['ph_axis'],
# self._spectrum_object.data << self._spectrum_object.unit,
# self._spectrum_object.uncertainty.array << self._spectrum_object.unit,
# self.fitted_model,
# f'{self._spectrum_object.meta['time_range'][0]}_{self._spectrum_object.meta['time_range'][1]}_sunkit_spex_fit.png',
# f'{self._spectrum_object.meta['time_range'][0]} - {self._spectrum_object.meta['time_range'][1]}',
# self.fitting_method.fit_info['param_cov'],
# self._spectrum_object)
# else:
# plot(self._spectrum_object.spectral_axis._bin_edges*u.keV,
# self._spectrum_object.meta['ph_axis'],
# self._spectrum_object.data << self._spectrum_object.unit,
# self._spectrum_object.uncertainty.array << self._spectrum_object.unit,
# self.fitted_model,
# False,
# f'{self._spectrum_object.meta['time_range'][0]} - {self._spectrum_object.meta['time_range'][1]}',
# self.fitting_method.fit_info['param_cov'],
# self._spectrum_object)




# 'here we perform the fitting'

# def plot_fit_results(self):
# 'here we plot the fitting results'

# def chi_squared(self):
# 'here we calculate the chi^2'

# def get_fit_results(self):
# 'here we return fit results and uncertainties'

# def get_fit_components(self):
# 'here we return the fitted components'

# def run_mcmc(self):
# 'run_mcmc'
153 changes: 147 additions & 6 deletions sunkit_spex/models/instrument_response.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,156 @@
"""Module for model components required for instrument response models."""

import numpy as np

from astropy.modeling import Fittable1DModel, Parameter
import astropy.units as u

__all__ = ["MatrixModel"]


class MatrixModel(Fittable1DModel):
def __init__(self, matrix):
self.matrix = Parameter(default=matrix, description="The matrix with which to multiply the input.", fixed=True)
super().__init__()
name = "SRM"
conversion_factor = Parameter(fixed=True)
_input_units_allow_dimensionless = True

def __init__(self, matrix=None,
model_spec_units=u.dimensionless_unscaled,
data_spec_units=u.dimensionless_unscaled,
conversion_factor=1*u.dimensionless_unscaled,
Comment on lines +18 to +19
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually these should probably live the the SRM object (an NDCude with 2 spectral axes in input or photon and one output, count or data)

spectrum_object= None,
spectral_model=True):

self.spectral_model = spectral_model

if not self.spectral_model:

self.model_spec_units = model_spec_units
self.data_spec_units = data_spec_units
self.matrix = matrix
conversion_factor = 1 << (data_spec_units / model_spec_units)
else:
# self.matrix = matrix
self.spectrum_object = spectrum_object
self.model_spec_units = model_spec_units

if spectrum_object:
self.data_spec_units = spectrum_object.unit
conversion_factor = 1* u.ct / u.ph
# conversion_factor = 1 << (self.data_spec_units / self.model_spec_units)
# print(conversion_factor)
else:
self.data_spec_units = data_spec_units
# conversion_factor = 1 << (data_spec_units / model_spec_units)
conversion_factor = 1* u.ct / u.ph

super().__init__(conversion_factor=conversion_factor)

def evaluate(self, x, conversion_factor):

# matrix = self.matrix

if self.spectral_model:

matrix = self.spectrum_object.meta['srm']
input_axis = np.array(self.spectrum_object.spectral_axis.bin_edges)
input_widths = np.diff(input_axis)
output_widths = np.diff(self.spectrum_object.meta['ph_axis'])

# print('IR SRM = ',self.spectrum_object.meta['srm'].shape)
# print('IR SRM = ',self.spectrum_object.spectral_axis.bin_edges.shape)

geo_area = self.spectrum_object.meta['geo_area']
exposure_time = self.spectrum_object.meta['exposure_time']
norm = input_widths * exposure_time * geo_area

# print('input_widths = ',input_widths)
# print('exposure_time = ',exposure_time)
# print('geo_area = ',geo_area)

# print(x.unit)
# print(conversion_factor.unit)
# print(norm.unit)

# flux = (x @ matrix) * conversion_factor * norm
flux = (((x*output_widths*exposure_time)@ (matrix*geo_area*u.cm**2)) * conversion_factor )

else:
flux = x @ matrix * conversion_factor * (geo_area*u.cm**2)

# print('HHEERRREEEE')

if hasattr(conversion_factor,"unit"):
return flux
else:
return flux.value

def set_spectrum_object(self, new_spectrum_object):
self.spectrum_object = new_spectrum_object

# @property
# def model_spec_units(self):
# return self._model_spec_units

# @model_spec_units.setter
# def model_spec_units(self, new_unit):
# self._model_spec_units = new_unit

# if hasattr(self,"data_spec_units"):

# if self.data_spec_units != u.dimensionless_unscaled:

# new_param_unit = self.data_spec_units / new_unit

# self.conversion_factor = self.conversion_factor.value * new_param_unit

# else:

# self.conversion_factor = self.conversion_factor * u.dimensionless_unscaled


# @property
# def data_spec_units(self):
# return self._data_spec_units

# @data_spec_units.setter
# def data_spec_units(self, new_unit):
# self._data_spec_units = new_unit

# if hasattr(self,"model_spec_units"):


# if self.data_spec_units != u.dimensionless_unscaled:

# new_param_unit = new_unit / self.model_spec_units

# self.conversion_factor = self.conversion_factor.value * new_param_unit

# else:

# self.conversion_factor = self.conversion_factor * u.dimensionless_unscaled

@property
def input_units(self):
# return {"x": self.model_spec_units }SS
return {"x": u.ph * u.keV**-1 * u.s**-1 * u.cm**-2 }

@property
def return_units(self):
# return {"y": self.data_spec_units}
return {"y": u.ct}

def _parameter_units_for_data_units(self, inputs_unit, outputs_unit):
return {"conversion_factor": self.conversion_factor.unit}

# @property
# def input_units(self):
# # return {"x": self.model_spec_units }SS
# return {"x": u.ph * u.keV**-1 * u.s**-1 * u.cm**-2 }

# @property
# def return_units(self):
# # return {"y": self.data_spec_units}
# return {"y": u.ct* u.keV**-1 * u.s**-1}

def evaluate(self, model_y):
# Requires input must have a specific dimensionality
return model_y @ self.matrix
# def _parameter_units_for_data_units(self, inputs_unit, outputs_unit):
# return {"conversion_factor": self.conversion_factor.unit}
Loading