# Copyright 2020 Patrick C. Tapping
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""
Support for reading and writing Ultrafast Systems .ufs files.
"""
import struct
import numpy as np
def _read_string(ufsdata, cursor):
"""
Read a string from ``ufsdata`` at position given by the cursor.
An unsigned 32 bit integer gives length of string in bytes, then string data follows.
Returns the string and the incremented cursor value as a tuple.
"""
string_length = struct.unpack_from('>I', ufsdata, offset=cursor)
cursor += 4
string = struct.unpack_from('>{}s'.format(string_length[0]), ufsdata, offset=cursor)
cursor += string_length[0]
return (string[0].decode('utf8'), cursor)
def _read_uint(ufsdata, cursor):
"""
Read an unsigned 32 bit integer from ``ufsdata`` at position given by the cursor.
Returns the number and the incremented cursor value as a tuple.
"""
number = struct.unpack_from('>I', ufsdata, offset=cursor)
cursor += 4
return (number[0], cursor)
def _read_double(ufsdata, cursor):
"""
Read an 64 bit floating point number from ``ufsdata`` at position given by the cursor.
Returns the number and the incremented cursor value as a tuple.
"""
number = struct.unpack_from('>d', ufsdata, offset=cursor)
cursor += 8
return (number[0], cursor)
def _read_doubles(ufsdata, cursor, count):
"""
Read a series of 64 bit floating point numbers from ``ufsdata`` at position given by the cursor.
Returns the list of numbers and the incremented cursor value as a tuple.
"""
numbers = struct.unpack_from('>{}d'.format(count), ufsdata, offset=cursor)
cursor += 8*count
return (np.array(numbers), cursor)
def _write_string(ufsstream, string):
"""
Write a string to ``ufsstream`` following the UFS formatting protocol.
"""
ufsstream.write(struct.pack('>I', len(string)))
ufsstream.write(struct.pack('>{}s'.format(len(string)), string))
def _write_uint(ufsstream, value):
"""
Write an unsigned integer to ``ufsstream`` following the UFS formatting protocol.
"""
ufsstream.write(struct.pack('>I', value))
def _write_double(ufsstream, value):
"""
Write a double-precision floating point number to ``ufsstream`` following the UFS formatting protocol.
"""
ufsstream.write(struct.pack('>d', value))
def _write_doubles(ufsstream, values):
"""
Write a list of double-precision floating point number to ``ufsstream`` following the UFS formatting protocol.
"""
ufsstream.write(struct.pack('>{}d'.format(len(values)), *values))
[docs]class UFSData():
def __init__(self, ufsdata=None, ufsfile=None):
"""
Read the byte stream given by ``ufsdata`` and initialise a UFSData instance.
:param ufsdata: Bytes-like object from which to decode UFS data.
"""
#: Version string for the UFS file format.
self.version = None
#: Label string for the first axis.
self.axis1_label = None
#: Unit string for the first axis.
self.axis1_units = None
#: Values for the data points on the first axis.
self.axis1_data = None
#: Label string for the second axis.
self.axis2_label = None
#: Unit string for the second axis.
self.axis2_units = None
#: Values for the data points on the second axis.
self.axis2_data = None
#: Unit string for the matrix data points.
self.data_units = None
#: Data matrix as a 2D array.
self.data_matrix = None
#: String containing metadata information.
self.metadata = None
if not ufsdata is None:
self.read_data(ufsdata)
elif not ufsfile is None:
self.read_file(ufsfile)
[docs] def read_data(self, ufsdata):
"""
Read the byte stream given by ``ufsdata`` and store decoded data.
:param ufsdata: Bytes-like object from which to decode UFS data.
"""
# Keep track of our current location through the file
cursor = 0x0
(self.version, cursor) = _read_string(ufsdata, cursor)
(self.axis1_label, cursor) = _read_string(ufsdata, cursor)
(self.axis1_units, cursor) = _read_string(ufsdata, cursor)
(axis1_count, cursor) = _read_uint(ufsdata, cursor)
(self.axis1_data, cursor) = _read_doubles(ufsdata, cursor, axis1_count)
(self.axis2_label, cursor) = _read_string(ufsdata, cursor)
(self.axis2_units, cursor) = _read_string(ufsdata, cursor)
(axis2_count, cursor) = _read_uint(ufsdata, cursor)
(self.axis2_data, cursor) = _read_doubles(ufsdata, cursor, axis2_count)
(self.data_units, cursor) = _read_string(ufsdata, cursor)
(data_size0, cursor) = _read_uint(ufsdata, cursor)
# Don't actually know how they handle 3D data... might be data_size0 += 1
if data_size0 == 0: data_size0 = 1
(data_size1, cursor) = _read_uint(ufsdata, cursor)
(data_size2, cursor) = _read_uint(ufsdata, cursor)
(self.data_matrix, cursor) = _read_doubles(ufsdata, cursor, (data_size0*data_size1*data_size2))
self.data_matrix =self.data_matrix.reshape((data_size0, data_size1, data_size2))
(self.metadata, cursor) = _read_string(ufsdata, cursor)
# print("Version string: " + self.version)
# print("Axis1: {}, {} values, {} to {} {}".format(self.axis1_label, axis1_count, self.axis1_data[0], self.axis1_data[-1], self.axis1_units))
# print("Axis2: {}, {} values, {} to {} {}".format(self.axis2_label, axis2_count, self.axis2_data[0], self.axis2_data[-1], self.axis2_units))
# print("Data: {} {} x {}".format(self.data_units, data_size1, data_size2))
# print(self.metadata)
[docs] def read_file(self, filename):
"""
Read a ``.ufs`` file.
:param filename: Filename to read.
"""
with open(filename, "rb") as f:
filedata = f.read()
self.read_data(filedata)
[docs] def write_data(self, destination):
"""
Write the raw UFS data out to a stream.
:param destination: Stream to write UFS data to.
"""
_write_string(destination, self.version)
_write_string(destination, self.axis1_label)
_write_string(destination, self.axis1_units)
_write_uint(destination, self.axis1_data.shape[0])
_write_doubles(destination, self.axis1_data)
_write_string(destination, self.axis2_label)
_write_string(destination, self.axis2_units)
_write_uint(destination, self.axis2_data.shape[0])
_write_doubles(destination, self.axis2_data)
_write_string(destination, self.data_units)
_write_uint(destination, self.data_matrix.shape[0] - 1) # ??? or 1 if zero?
_write_uint(destination, self.data_matrix.shape[1])
_write_uint(destination, self.data_matrix.shape[2])
_write_doubles(destination, self.data_matrix.flat)
_write_string(destination, self.metadata)
[docs] def write_file(self, filename):
"""
Write the raw UFS data out to a file.
:param filename: Filename to write UFS data to.
"""
with open(filename, "wb") as f:
self.write_data(f)