# Copyright 2021 Patrick C. Tapping
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
__all__ = ["APTDevice", "find_device", "list_devices"]
import logging
import asyncio
from threading import Thread
import atexit
import re
import serial
from serial.tools import list_ports, list_ports_common
from .. import protocol as apt
from ..enums import EndPoint, LEDMode
[docs]
class APTDevice():
"""
Initialise and open serial device for the ThorLabs APT controller.
If the ``serial_port`` parameter is ``None`` (default), then an attempt to detect an APT device
will be performed.
The first device found will be initialised.
If multiple devices are present on the system, then the use of the ``serial_number`` parameter
will specify a particular device by its serial number.
This is a `regular expression <https://docs.python.org/3/library/re.html>`_ match, for example
``serial_number="83"`` would match devices with serial numbers
starting with 83, while ``serial_number=".*83$"`` would match devices ending in 83.
Status updates can be obtained automatically from the device by setting ``status_updates="auto"``,
which will request the controller to send regular updates, as well as sending the required "keepalive"
acknowledgement messages to maintain the connection to the controller.
In this case, ensure the :data:`keepalive_message` property is set correctly for the controller.
To instead query the device for status updates on a regular basis, set ``status_updates="polled"``,
in which case ensure the :data:`update_message` property is set correctly for the controller.
The default setting of ``status_updates="none"`` will mean that no status updates will be
performed, leaving the task up to sub-classes to implement.
:param serial_port: Serial port device the device is connected to.
:param vid: Numerical USB vendor ID to match.
:param pid: Numerical USB product ID to match.
:param manufacturer: Regular expression to match to a device manufacturer string.
:param product: Regular expression to match to a device product string.
:param serial_number: Regular expression to match to a device serial number.
:param location: Regular expression to match to a device physical location (eg. USB port).
:param controller: The destination :class:`EndPoint <thorlabs_apt_device.enums.EndPoint>` for the controller.
:param bays: Tuple of :class:`EndPoint <thorlabs_apt_device.enums.EndPoint>`\\ (s) for the populated controller bays.
:param channels: Tuple of indices (1-based) for the controller bay's channels.
:param status_updates: Set to ``"auto"``, ``"polled"`` or ``"none"``.
"""
def __init__(self, serial_port=None, vid=None, pid=None, manufacturer=None, product=None, serial_number=None, location=None, controller=EndPoint.RACK, bays=(EndPoint.BAY0,), channels=(1,), status_updates="none"):
# If serial_port not specified, search for a device
if serial_port is None:
serial_port = find_device(vid=vid, pid=pid, manufacturer=manufacturer, product=product, serial_number=serial_number, location=location)
# Accept a serial.tools.list_ports.ListPortInfo object (which we may have just found)
if isinstance(serial_port, list_ports_common.ListPortInfo):
serial_port = serial_port.device
if serial_port is None:
raise RuntimeError("No Thorlabs APT devices detected matching the selected criteria.")
self._log = logging.getLogger(__name__)
self._log.info(f"Initialising serial port ({serial_port}).")
# Open and configure serial port settings for ThorLabs APT controllers
self._port = serial.Serial(serial_port,
baudrate=115200,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.1,
)
self._port.reset_input_buffer()
self._port.reset_output_buffer()
self._log.info("Opened serial port OK.")
# APT protocol unpacker for decoding received messages
self._unpacker = apt.Unpacker(self._port, on_error="warn")
# ID numbers for controller, bay device and channel identification
self.controller = controller
"""ID code for the controller message :class:`EndPoint <thorlabs_apt_device.enums.EndPoint>`."""
self.bays = bays
"""Tuple of ID codes for the controller's card bay :class:`EndPoint <thorlabs_apt_device.enums.EndPoint>`\\ (s)."""
self.channels = channels
"""Tuple of indexes for the the channels in card bays."""
# List of functions to call when error notifications received
self._error_callbacks = set()
# Create a new event loop for ourselves to queue and send commands
self._loop = asyncio.new_event_loop()
# Schedule the first check for incoming data on the serial port
self._loop.call_soon(self._schedule_reads)
self.read_interval = 0.01
"""Time to wait between read attempts on the serial port, in seconds."""
self.keepalive_message = apt.mot_ack_dcstatusupdate
"""
Function to generate the keepalive message which are sent at regular intervals when status
updates are configured as ``"auto"``.
Examples are ``mot_ack_dcstatusupdate``, ``pz_ack_pzstatusupdate``, ``pzmot_ack_statusupdate``
or similar from :class:`thorlabs_apt_device.protocol.functions`, and are device specific.
"""
self.keepalive_interval = 0.9
"""Time interval between sending of keepalive messages, in seconds."""
self.update_message = apt.mot_req_statusupdate
"""
Function to generate the status update request message which are sent at regular intervals
when status updates are configured as ``"polled"``.
Examples are ``mot_req_statusupdate``, ``mot_req_dcstatusupdate``, ``mot_req_statusbits``,
``rack_req_statusbits``, ``la_req_statusupdate`` or similar from
:class:`thorlabs_apt_device.protocol.functions`, and are device specific.
"""
self.update_interval = 0.01
"""
Time interval between sending of status update requests, in seconds. Note that this is in
fact the delay to use after completing a previous status update, thus a value of 0.01 s does
not mean that status updates will be performed 100 times a second, depending on the time it
takes to complete a command, or the presence of other commands on the message queue.
"""
if status_updates == "auto":
# Request the controller to start sending regular status updates
# Wait a little while though so sub-class init stuff can take effect first
for bay in self.bays:
self._loop.call_later(0.25, self._write, apt.hw_start_updatemsgs(source=EndPoint.HOST, dest=bay))
# Schedule sending of the "keep alive" acknowledgement commands
self._loop.call_later(0.75, self._schedule_keepalives)
elif status_updates == "polled":
# Schedule sending of the update request
self._loop.call_later(0.25, self._schedule_updates)
# Create a new thread to run the event loop in
self._thread = Thread(target=self._run_eventloop)
# Set as daemon thread so it can be killed automatically at program exit
self._thread.daemon = True
self._thread.start()
# Close the serial port at exit in case close() wasn't called
atexit.register(self._atexit)
def _run_eventloop(self):
"""
Entry point for the event loop thread.
"""
self._log.debug("Starting event loop.")
asyncio.set_event_loop(self._loop)
try:
self._loop.run_forever()
finally:
self._loop.close()
self._loop = None
self._log.debug("Event loop stopped.")
self._close_port()
def _close_port(self):
"""
Stop status messages and then close the serial port connection to the controller.
"""
if self._port is not None:
self._log.debug("Stopping hardware update messages.")
try:
for bay in self.bays:
self._write(apt.hw_stop_updatemsgs(source=EndPoint.HOST, dest=bay))
# Don't know if the disconnect message actually does anything, but might as well send it
if self.controller is not None:
self._log.debug("Sending disconnect notification.")
self._write(apt.hw_disconnect(source=EndPoint.HOST, dest=self.controller))
except:
# Something wrong writing to the port, ignore
self._log.debug("Unable to send disconnect messages.")
pass
self._log.info("Closing serial connection.")
self._port.close()
self._port = None
def _atexit(self):
"""
Catch exit signal and attempt to close everything gracefully.
"""
# Request the event loop to stop
self.close()
# Wait for event loop thread to finish
self._thread.join()
def _write(self, command_bytes):
"""
Write a command out the the serial port.
:param command_bytes: Command to send to the device as raw byte array.
"""
self._log.debug(f"Writing command bytes: {command_bytes.hex()}")
self._port.write(command_bytes)
self._port.flush()
def _schedule_reads(self):
"""
Check for any incoming messages and process them at regular intervals.
"""
#self._log.debug(f"Checking for data on serial port.")
for msg in self._unpacker:
#self._log.debug(f"Received message: {msg}")
self._process_message(msg)
# Schedule next check
self._loop.call_later(self.read_interval, self._schedule_reads)
def _schedule_keepalives(self):
"""
Send the "keep alive" acknowledgement command at regular intervals if status updates configured as ``"auto"``.
"""
#self._log.debug(f"Sending keep alive acknowledgement.")
for bay in self.bays:
self._loop.call_soon(self._write, self.keepalive_message(source=EndPoint.HOST, dest=bay))
# Schedule next send
self._loop.call_later(self.keepalive_interval, self._schedule_keepalives)
def _schedule_updates(self):
"""
Send the status update request at regular intervals if status updates configured as ``"polled"``.
"""
for bay in self.bays:
for channel in self.channels:
self._loop.call_soon(self._write, self.update_message(source=EndPoint.HOST, dest=bay, chan_ident=channel))
# Schedule next send
self._loop.call_later(self.update_interval, self._schedule_updates)
def _process_message(self, m):
"""
Process a single response message from the controller.
:param m: The unpacked message from the controller.
"""
# TODO: Process any messages common to all APT controllers (which ones?)
if m.msg == "hw_response":
# Should there be an error code? The documentation is a little unclear
self._log.warn(f"Received unknown event notification from APT device {m.source}.")
for callback in self._error_callbacks:
callback(source=m.source, msgid=0, code=-1, notes="unknown")
elif m.msg == "hw_rich_response":
self._log.warn(f"Received event notification code {m.code} from APT device {m.source}: {m.notes}")
for callback in self._error_callbacks:
callback(source=m.source, msgid=m.msgid, code=m.code, notes=m.notes)
[docs]
def register_error_callback(self, callback_function):
"""
Register a function to be called in the case of an error being reported by the device.
The function passed in should have the signature ``callback_function(source, msgid, code, notes)``,
where ``source`` is the :class:`enums.EndPoint` where the message originated, ``msgid`` is the
type of message which triggered the error (or 0 if unknown or a spontaneous error),
``code`` is a numerical error code and ``notes`` is a string description.
:params callback_function: Function to call in case of device error.
"""
if callable(callback_function):
self._error_callbacks.add(callback_function)
else:
self._log.warn("Attempted to register a non-callable object as a callback function.")
[docs]
def unregister_error_callback(self, callback_function):
"""
Unregister a previously registered error callback function.
The function passed in should have been previously registered using :meth:`register_error_callback`.
:params callback_function: Function to unregister.
"""
if callback_function not in self._error_callbacks:
self._log.warn("Attemped to unregister an unknown function.")
else:
self._error_callbacks.pop(callback_function)
[docs]
def close(self):
"""
Close the serial connection to the ThorLabs APT controller.
"""
if self._loop is not None:
self._log.debug("Stopping event loop.")
self._loop.call_soon_threadsafe(self._loop.stop)
# Note, this returns before event loop has actually stopped and serial port closed
[docs]
def identify(self, channel=0):
"""
Flash the device's front panel LEDs to identify the unit.
For some single-channel USB controlled devices ``channel=None`` is used,
which sends the identify command to the USB controller :class:`EndPoint`.
On devices which are considered "rack" controllers (including single-channel "rack"
units such as the BBD201), the ``channel`` parameter will actually refer to the card bay.
There are likely other types of units (though currently untested) which have a single "bay"
with multiple channels, and then the ``channel`` parameter would refer to the actual
channel index of the controller card.
:param channel: Index (0-based) of controller bay channel to send the command.
"""
if channel is None:
self._log.debug("Identifying [channel=None].")
self._loop.call_soon_threadsafe(self._write, apt.mod_identify(source=EndPoint.HOST, dest=EndPoint.USB, chan_ident=0))
else:
self._log.debug(f"Identifying [channel={self.channels[channel]}].")
self._loop.call_soon_threadsafe(self._write, apt.mod_identify(source=EndPoint.HOST, dest=EndPoint.RACK, chan_ident=self.channels[channel]))
[docs]
def find_device(vid=None, pid=None, manufacturer=None, product=None, serial_number=None, location=None):
"""
Search attached serial ports for a specific device.
The first device found matching the criteria will be returned.
Because there is no consistent way to identify Thorlabs APT devices, the default parameters do not
specify any selection criteria, and thus the first serial port will be returned.
A specific device should be selected using a unique combination of the parameters.
The USB vendor (``vid``) and product (``pid``) IDs are exact matches to the numerical values,
for example ``vid=0x067b`` or ``vid=1659``.
The remaining parameters are strings specifying a regular expression match to the corresponding field.
For example ``serial_number="83"`` would match devices with serial numbers starting with 83, while
``serial_number=".*83$"`` would match devices ending in 83.
A value of ``None`` means that the parameter should not be considered, however an empty string value
(``""``) is subtly different, requiring the field to be present, but then matching any value.
Note that the APT protocol documentation lists formats for device serial numbers.
For example, a TDC001 "DC Driver T-Cube" should have serial numbers starting with "83".
Be aware that different operating systems may return different data for the various fields,
which can complicate matching.
To see a list of serial ports and the relevant data fields:
.. code-block: python
import serial
for p in list_ports.comports():
print(f"{p.device}, {p.manufacturer}, {p.product}, {p.vid}, {p.pid}, {p.serial_number}, {p.location}")
:param vid: Numerical USB vendor ID to match.
:param pid: Numerical USB product ID to match.
:param manufacturer: Regular expression to match to a device manufacturer string.
:param product: Regular expression to match to a device product string.
:param serial_number: Regular expression to match to a device serial number.
:param location: Regular expression to match to a device physical location (eg. USB port).
:returns: First :class:`~serial.tools.list_ports.ListPortInfo` device which matches given criteria.
"""
for p in serial.tools.list_ports.comports():
if (vid is not None) and not vid == p.vid: continue
if (pid is not None) and not pid == p.pid: continue
if (manufacturer is not None) and ((p.manufacturer is None) or not re.match(manufacturer, p.manufacturer)): continue
if (product is not None) and ((p.product is None) or not re.match(product, p.product)): continue
if (serial_number is not None) and ((p.serial_number is None) or not re.match(serial_number, p.serial_number)): continue
if (location is not None) and ((p.location is None) or not re.match(location, p.location)): continue
return p
[docs]
def list_devices():
"""
Return a string listing all detected serial devices and any associated identifying properties.
The manufacturer, product, vendor ID (vid), product ID (pid), serial number, and physical
device location are provided.
These can be used as parameters to :meth:`find_device` or the constructor of a APTDevice class
to identify and select a specific serial device.
:returns: String listing all serial devices and their details.
"""
result = ""
for p in serial.tools.list_ports.comports():
try:
vid = f"{p.vid:#06x}"
pid = f"{p.pid:#06x}"
except:
vid = p.vid
pid = p.pid
result += f"device={p.device}, manufacturer={p.manufacturer}, product={p.product}, vid={vid}, pid={pid}, serial_number={p.serial_number}, location={p.location}\n"
return result.strip("\n")