Source code for trspectrometer.plugins.interface.dummyinterface
# Copyright 2021 Patrick C. Tapping
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
from threading import Thread, Event
from time import sleep
import numpy as np
import configuration as config
from . import Interface
[docs]class DummyInterface(Interface):
"""
An :data:`~trspectrometer.plugins.interface` class which simulates the presence of an interface device.
To use this interface driver class, ensure ``"interface"`` is present in the :ref:`configuration
file`'s ``load=[...]`` list inside the :ref:`plugins` section, then include a section such as
this in the :ref:`configuration file` under the ``[hardware]`` section:
.. code-block:: toml
[[hardware.interface]]
name = "Interface"
class = "DummyInterface"
Note that multiple interfaces may be added, indicated by the double square brackets around the
section header. The same class type may be initialised multiple times with different values for
its options. Acquisition methods may then select which interface entry to use.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
#: Description of this interface device.
self.description = "Dummy Interface"
# Simulated encoder count tracking
self._enc_count = 0
# Background thread to acquire simulated data
self._acq_thread = None
self._acq_stop = Event()
[docs] def start(self, count=0) -> None:
# Stop any existing thread
self.stop()
self._acq_thread = Thread(target=self._start_acq, args=(count,))
self._acq_stop.clear()
self._acq_thread.start()
[docs] def arm(self) -> None:
self.start(count=10000)
[docs] def stop(self) -> None:
if self._acq_thread:
self._acq_stop.set()
self._acq_thread.join()
[docs] def set_encoder_count(self, value: int) -> None:
self._enc_count = value
def _start_acq(self, n):
cycletime = 1.0/config.data["hardware"]["laser_reprate"]
chop_state = np.zeros(n, dtype=bool)
chop_state[::2] = True
quad_pos = np.arange(self._enc_count, self._enc_count + n*1000, n*1000, dtype=np.uint32)
sleep(n*cycletime)
for cb in self._data_callbacks.copy():
cb(quad_pos, chop_state)