Source code for trspectrometer.plugins.acquisition

# Copyright 2021 Patrick C. Tapping
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Module for management of data acquisition methods.

This acquisition plugin module is implemented as a :ref:`hardware` :ref:`plugin <plugins>`, even
though the built-in acquisition methods only indirectly depend on other hardware plugins to be
present. This results in the acquisition methods being displayed in the :ref:`hardware_status`, with
their availability changing depending on the presence or absence of any required hardware devices.

The implementation of the actual data acquisition is abstracted from the software interface. The
python `pluginlib <https://pluginlib.readthedocs.io/en/latest/index.html>`__ is used to find classes
which implement and extend the :class:`~Acquisition` class and actually control the hardware,
collect data, process it, and store into an appropriate :data:`data structure
<trspectrometer.datastorage>` for display by the :data:`~trspectrometer.datapanel.DataPanel`.

Acquisition plugin classes which are built-in include:

- :data:`~trspectrometer.plugins.acquisition.ta_stepped.TA_SteppedAcquisition`, which performs a
  traditional transient absorption acquisition using a "change delay, acquire" loop. This mode is
  compatible with all delay types.

- :data:`~trspectrometer.plugins.acquisition.ta_swept.TA_SweptAcquisition`, which is a rapid
  transient absorption acquisition mode that performs continuous data collection while sweeping the
  delay range. It requires a delay which can provide raw encoder (quadrature) signals to the
  :data:`~trspectrometer.plugins.interface` hardware.

- :data:`~trspectrometer.plugins.acquisition.ta_dummy.TA_DummySteppedAcquisition`, which simulates a
  stepped transient absorption acquisition for demonstration and testing purposes. It does not
  require any additional hardware devices.

To load the acquisition plugin module, ensure ``"acquisition"`` is present in the
:ref:`configuration file`'s ``load=[...]`` list inside the :ref:`plugins` section. To configure the
module to use a specific driver class, ensure a ``[[hardware.acquisition]]`` section is present such as
the default:

.. code-block:: toml

    # Acquisition configuration
    # Each entry specifies a acquisition device
    #   name (string) : Friendly name for the acquisition method to use in the application
    #   class (string) : Name of specific class to load to drive the acquisition
    #   options (dict) : Dictionary of key=value pairs to pass to the class init() method
    [[hardware.acquisition]]
    name = "Dummy Stepped"
    class = "TA_DummySteppedAcquisition"
    options = {}

Multiple acquisition methods can be specified by including multiple ``[[hardware.acquisition]]``
sections. The double rectangular brackets around (``[[hardware.acquisition]]``) indicate that
multiple sections are permitted. The same class type may be initialised multiple times with
different values for its options.
"""


import os
import sys
import logging

import numpy as np
import tomlkit
import pluginlib
from PySide6 import QtCore, QtWidgets
from PySide6.QtCore import Signal

import configuration as config
import hardware as hw


#: Default settings to place in configuration file if not existing already.
_default_config = \
"""
# Acquisition configuration
# Each entry specifies a acquisition device
#   name (string) : Friendly name for the acquisition method to use in the application
#   class (string) : Name of specific class to load to drive the acquisition
#   options (dict) : Dictionary of key=value pairs to pass to the class init() method
[[hardware.acquisition]]
name = "Dummy Stepped"
class = "TA_DummySteppedAcquisition"
options = {}
"""

_log = logging.getLogger(__name__)

#: QWidget class type to use for displaying hardware status.
statuspanel = None

#: List of instances of acquisition method classes.
#: The number of entries in the list should correspond to the ``[[hardware.acquisition]]``
#: entries in the configuration file. If a device is missing or not initialised,
#: a value of ``None`` should be used as a placeholder.
devices = []

#: List of functions to call when devices are refreshed.
_change_callbacks = set()

#: Plugin class loader
_loader = None

[docs]def init(): """ Initialise the acquisition devices specified in the configuration file. """ global devices # Close any existing devices close() devices = [] for d in config.data["hardware"]["acquisition"]: device = None # Attempt to initialise the acquisition class specified in the configuration file try: device_class = _loader.get_plugin("Acquisition", d["class"]) if device_class is None: _log.error(f"Can't find plugin for {d['name']} class {d['class']}") else: # Get options dictionary from configuration file opts = {} try: opts = d["options"] if not isinstance(opts, dict): _log.warning(f"Options entry for {d['name']} is not a dictionary type.") opts = {} except: pass device = device_class(**opts) device.name = d["name"] _log.info(f"Initialised {d['name']} using class {d['class']}") except Exception as ex: _log.error(f"Unable to initialise {d['name']} using class {d['class']}: {ex}") # Add the device (or None placeholder) to list of devices devices.append(device) # Notify callbacks of device refresh for cb in _change_callbacks.copy(): cb()
[docs]def add_change_callback(callback_function): """ Register a function to be called when the devices are refreshed. :param callback_function: Function to call when devices are refreshed. """ if callable(callback_function): _change_callbacks.add(callback_function)
[docs]def remove_change_callback(callback_function): """ Unregister a callback function added using ``add_change_callback()``. :param callback_function: Function to unregister. """ if callback_function in _change_callbacks: _change_callbacks.remove(callback_function)
[docs]def close(): """ Close any open acquisition devices. """ global devices for i, device in enumerate(devices): if device: try: device.close() except: _log.exception(f"Error closing acquisition {type(device).__name__} device for {config.data['hardware']['acquisition'][i]}.") # None is a placeholder for closed/missing devices devices = [None for _ in config.data["hardware"]["acquisition"]] # Notify callbacks of device close for cb in _change_callbacks.copy(): cb()
[docs]@pluginlib.Parent(group="acquisition") class Acquisition(): """ Parent class of all acquisition class plugins. Every acquisition driver class should derive from this parent, so that the automatic device class discovery can function. """ def __init__(self, **kwargs): #: Friendly name to identify this unit. self.name = "Acquisition" #: Description of this acquisition device. self.description = "Unknown Acquisition Method"
[docs] def close(self) -> None: """ Close the connection to the device. """ pass
[docs] def is_initialised(self) -> bool: """ Test whether the acquisition hardware is present and initialised, and not in any error state. """ return True
[docs] def start(self) -> None: """ Begin acquisition of data. It is up to the sub-classes implementing this method to determine what and how much data should be obtained in whatever manner is appropriate. For example, the data range and number of scans can be extracted from the :class:`DataPanel` user interface. During acquisition, the presence of new data points can be communicated to other components by emitting the :data:`~trspectrometer.signals.raw_data_updated` Signal. Parameters can be used to specify which points in the data set have changed to allow more efficient re-drawing for example. Upon completion of the acquisition, the :data:`~trspectrometer.signals.acquisition_stopped` Signal should be emitted to notify other components. If an error occurred, an Exception instance can be passed to provide further information. For example: .. code-block:: python from signalstorage import signals from utils import AcquisitionError if error: # Acquisition completed with some error signals.acquisition_stopped.emit(AcquisitionError("Something bad happened.")) else: # Acquisition completed OK signals.acquisition_stopped.emit(None) """ pass
[docs] def stop(self) -> None: """ Abort the acquisition of data. """ pass
[docs]class AcquisitionStatusPanel(QtWidgets.QFrame): """ Panel to insert into the Hardware Status window to show status of the acquisition method. """ #: Qt Signal to indicate the devices have changed in some way and the panel requires refreshing. devices_changed = Signal() def __init__(self, parent=None): super().__init__(parent) self.setLayout(QtWidgets.QVBoxLayout()) self.horizontalLayout = QtWidgets.QHBoxLayout() self.label = QtWidgets.QLabel("Acquisition:") self.label.setStyleSheet("QLabel { font-weight: bold }") self.horizontalLayout.addWidget(self.label) self.resetPushButton = QtWidgets.QPushButton("Reset") self.horizontalLayout.addWidget(self.resetPushButton) self.grid = QtWidgets.QGridLayout() self.grid.setContentsMargins(10, 0, 10, 0) self.layout().addLayout(self.horizontalLayout) self.layout().addLayout(self.grid) # Reset button triggers hardware re-init self.resetPushButton.clicked.connect(init) self.devices_changed.connect(self.refresh) self.refresh()
[docs] def showEvent(self, event): """ Handler for widget show events. """ # Refresh UI when devices change add_change_callback(self._devices_changed)
[docs] def hideEvent(self, event): """ Handler for widget hide events. """ # Remove change callback when not visible remove_change_callback(self._devices_changed)
def _devices_changed(self): # Wrap signal emission so update happens in Qt thread self.devices_changed.emit()
[docs] def refresh(self): """ Refresh the state of the hardware connection and update the panel's display. """ global devices # Remove all widgets from grid and destroy them while True: w = self.grid.takeAt(0) if w is None: break w.widget().deleteLater() # List the devices from configuration file for i, d in enumerate(config.data["hardware"]["acquisition"]): self.grid.addWidget(QtWidgets.QLabel(d["name"]), i, 0) if devices[i]: self.grid.addWidget(QtWidgets.QLabel(devices[i].description), i, 1) if devices[i].is_initialised(): w = QtWidgets.QLabel("OK") w.setStyleSheet("QLabel { color : green; }") else: w = QtWidgets.QLabel("ERROR") w.setStyleSheet("QLabel { color : red; }") self.grid.addWidget(w, i, 2, QtCore.Qt.AlignRight) else: self.grid.addWidget(QtWidgets.QLabel(d["class"]), i, 1) w = QtWidgets.QLabel("Missing") w.setStyleSheet("QLabel { color : orange; }") self.grid.addWidget(w, i, 2, QtCore.Qt.AlignRight)
# Point to the correct widget class for displaying the hardware status panel statuspanel = AcquisitionStatusPanel # Create default config entries for device(s) if required config.add_defaults(tomlkit.parse(_default_config)) # None is a placeholder for closed/missing devices devices = [None for _ in config.data["hardware"]["acquisition"]] # Bypass this code block if module was imported by sphinx documentation generation utility if not "sphinx" in sys.argv[0]: # Look for valid plugin classes try: plugin_paths = [ os.path.join(d, "acquisition") for d in config.data["directories"]["plugins"] ] plugin_paths = [ d for d in plugin_paths if os.access(d, os.R_OK) ] _loader = pluginlib.PluginLoader(group="acquisition", modules=["acquisition"], paths=plugin_paths) except Exception as ex: raise RuntimeError(f"Error loading acquisition plugin classes: {ex}") _log.info(f"Available acquisition plugin classes: {', '.join(list(_loader.plugins['Acquisition']))}") # Add ourselves to the list of active hardware modules hw.modules[__name__] = sys.modules[__name__] # The DataPanel will want to know when acquisition hardware changes state add_change_callback(config.mainwindow.dataPanel.acquisition_plugins_changed)