Automancer docs

Tutorial: Add support for new devices

Introduction

To add support for new devices, the devices API lets you add new devices with minimal effort, while providing many related features out of the box:

  • When writing to devices
    • Writing to devices using the protocol syntax
    • Manually controlling the device from the UI
    • Locking devices to prevent them from being written elsewhere
    • Handling device disconnection
  • When reading device readouts
    • Recording data over time, even when data events arrive in batches or out-of-order
    • Using a value in an expression
    • Ensuring a condition stays true over time
    • Displaying values in a plot in the UI

Furthermore, the use of a common API makes it possible for plugins to build upon each other. For instance, in the case of a USB device, three plugins could work together:

  1. a low-level plugin communicates to the USB driver and provides a low-level interface in the form of binary I/O;
  2. a mid-level plugin elevates the low-level interface to a higher level in the form of high-level commands;
  3. a high-level plugin provides tools to simplify working with this interface, for example by grouping such commands.

In this context, a device is not necessarily a physical device connected to the computer. It could also be a remotely controlled device, or just a virtual object which is convenient to consider a device.

Understand the device architecture

All devices and their associated values are stored in a tree made of nodes. The root node can be accessed as Host.root_node and the leaves represent values which can be read or written. Intermediate (non-leaf) nodes are all instances of CollectionNode and serve as collections of other nodes.

In most cases, the subtree used when adding a new device is straightforward and can be defined as a DeviceNode with multiple custom children nodes, which are subclasses of ValueNode. The following example represents the tree for a thermostat:

Root (HostRootNode <- CollectionName)
├── ...
└── MyThermostat (ThermostatDevice <- DeviceNode <- CollectionNode)
    ├── readout (ThermostatReadout <- NumericNode, ValueNode)
    └── setpoint (ThermostatSetpoint <- NumericNode, ValueNode)

The leftmost name on each line represents the identifier of the node, a value which must be unique among all nodes of a collection node. The identifier of device nodes usually starts with a capital letter. Here, the name MyThermostat is chosen by the user whereas readout and setpoint are fixed nodes of all ThermostatDevice instances. The user can reference the values of the device as MyThermostat.readout and MyThermostat.setpoint. Strings inside the parentheses indicate the subclasses of each node.

The full tree is visible in the logs when starting the host. If the user defines two thermostats in the configuration, then two instances of ThermostatDevice will be created, each with their own subtree.

Root
├── ...
├── Thermostat1
│   ├── readout
│   └── setpoint
└── Thermostat2
    ├── readout
    └── setpoint

Create a device node

Define a device node by creating a subclass of DeviceNode. Start with the following example:

class ThermostatDevice(DeviceNode):
  def __init__(self):
    super().__init__()

    self.connected = True
    self.id = "Thermostat"
    self.label = "Thermostat 2000"

    # Dictionary of children nodes, to be completed later on
    self.nodes = {}

The connected, id and label attributes are defined on BaseNode which is inherited by all nodes, including DeviceNode. For details, see BaseNode and DeviceNode.

Now, register an instance of ThermostatDevice in the Host.devices dictionary (an alias for Host.root_node.nodes) using its identifier. Because devices are created and registered once and only during host initialization, this must happen in the plugin's executor.

class ThermostatExecutor(BaseExecutor):
  def __init__(self, conf, *, host):
    super().__init__()

    self.device = ThermostatDevice()
    host.devices[self.device.id] = self.device

After reloading the setup, the device should now be visible in the Device control view.

This hardcoded example works well for a simple scenario, but it is not suitable to a complex setup a user might be working with. It is best to let the user choose how many devices to create, and the identifier and name of each device. Let's also add a configurable port option which you will need later. A more complete example using the plugin's configuration might look like this:

class ThermostatDevice(DeviceNode):
  def __init__(self, id: str, label: str, port: str):
    super().__init__()

    self.connected = True
    self.id = "Thermostat"
    self.label = "Thermostat 2000"

    # Custom attribute
    self.port = port

class ThermostatExecutor(BaseExecutor):
  options_type = DictType({
    'devices': ListType(DictType({
      'id': IdentifierType(),
      'label': Attribute(StrType(), optional=True),
      'port': StrType()
    }))
  })

  def __init__(self, conf, *, host):
    super().__init__()
    self.devices = dict()

    for device_conf in conf['devices']:
      device = ThermostatDevice(
        id=device_conf['id'],
        label=device_conf['label'],
        port=device_conf['port']
      )

      self.devices[device.id] = device
      host.devices[device.id] = device

The user can now define devices with the following:

plugins:
  <plugin_name>:
    options:
      devices:
        - id: Themostat1
          label: Thermostat 1
          port: COM3
        - id: Thermostat2
          port: COM5

Write device connection logic

A node is connected if it is available for normal operation, such as reading and writing. A disconnected node being accessed typically produces an error. In the example above, you set self.connected = True to simulate a device which is always connected. This is usually accurate for virtual devices, such as a node reporting time according to the system's clock, but not for physical devices like the thermostat.

To link the ThermostatDevice class to a physical device, you will need to write the connection logic in a new asynchronous method which you will call start(). As an example, consider the thermostat's to be a serial (RS-232) device and use the pySerial package for communication, but the API is otherwise agnostic to the underlying technology.

class ThermostatDevice(BaseDevice):
  def __init__(self):
    ...

    # Set the device as disconnected by default
    self.connected = False

  async def start(self):
    try:
      self.serial = Serial(self.port, baudrate=9600, timeout=1)
      self.serial.write(b"CONNECT\r\n")
      response = self.serial.read_until(b"\n").decode()

      if response != "OK":
        print("Failed to connect")
        return
    except SerialException as e:
      print("Failed to connect")
      return

    self.connected = True

You first instantiate a serial communication with Serial(...) and make a connection attempt by writing the CONNECT command. The device should respond with OK if the connection was successful. If the device does not respond, or responds with something else, consider the connection to have failed. If the connection was successful, set self.connected = True to indicate that the device is ready for normal operation.

There are two problems with this function. First, the calls to self.serial.write() and self.serial.read_until() are blocking, therefore the host will become unresponsive until the device responds. To prevent this, wrap these calls with asyncio.to_thread() to run them on another thread. This is not required if the library you are relying on already provides an asynchronous API.

  class ThermostatDevice(BaseDevice):
    ...

    async def start(self):
      try:
        self.serial = Serial(self.port, baudrate=9600, timeout=1)
-       self.serial.write(b"CONNECT\r\n")
-       response = self.serial.read_until(b"\n").decode()
+       await asyncio.to_thread(lambda: self.serial.write(b"CONNECT\r\n"))
+       response = await asyncio.to_thread(lambda: self.serial.read_until(b"\n")).decode()

        if response != "OK":
          print("Failed to connect")
          return
      except SerialException as e:
        print("Failed to connect")
        return

      self.connected = True

Second, the function doesn't handle reconnection: if the device gets disconnected, it will not be able to reconnect without restarting the setup. To fix this, add a loop which attempts to reconnect every second until the connection is successful. To make sure that the connection is still active, can make a dummy request to a device every second while the device is connected.

class ThermostatDevice(BaseDevice):
  ...

  async def start(self):
    # Loop that attempts to reconnect every second
    while True:
      try:
        self.serial = Serial(self.port, baudrate=9600, timeout=1)
        await asyncio.to_thread(lambda: self.serial.write(b"CONNECT\r\n"))
        response = await asyncio.to_thread(lambda: self.serial.read_until(b"\n")).decode()

        if response != "OK":
          print("Failed to connect")
          await asyncio.sleep(1)
          continue

        self.connected = True

        try:
          # Loop that ensures that the connection is still active
          while True:
            await asyncio.to_thread(lambda: self.serial.write(b"REQUEST\r\n"))
            await asyncio.to_thread(lambda: self.serial.read_until(b"\n")
            await asyncio.sleep(1)
        finally:
          self.connected = False
      except SerialException as e:
        print("Failed to connect")
        await asyncio.sleep(1)

The start() method now runs in a infinite loop for the entire lifecycle of the device. The loop can be stopped by cancelling the call to this method. In practice, this means that one of the asyncio.to_thread() or asyncio.sleep() calls will get cancelled and you must plan any cleanup procedure accordingly. For this example, there is no cleanup required assuming we can reconnect to the device after disconnection. For details, see Task Cancellation.

The only missing step is to call start() for each device from the executor. Executors have their own start() method which is called by the host, from which you will start the devices. Use the Pool.open() function to manage tasks created by the executor. For details on this pattern, see [...].

class ThermostatExecutor(BaseExecutor):
  def __init__(self, conf, *, host):
    super().__init__()
    self.devices = dict()

    for device_conf in conf['devices']:
      ...

      self.devices[device.id] = ThermostatDevice(...)

  async def start(self):
    async with Pool.open() as pool:
      for node in self.devices.values():
        pool.start_soon(node.start())

      yield

The yield statement at the end of start() is required to indicate that the executor is ready. It is possible to run initialization steps before yielding, if required.

Design the value nodes

Now that you have set up the device, you can start designing the value nodes which contain the device's data.

A value node is a class which inherits from ValueNode[T] and represents a value of type T which can be read, written or both. For the thermostat, you will implement two nodes:

  • A readout node which represents the current temperature, as reported by a thermometer embedded in the thermostat. This node is read-only and can only be watched through polling, i.e. it is unable to report changes to its value automatically but rather must be queried. This is not the case for all devices.
  • A setpoint node which represents the desired temperature. This node is readable and writable. It is also stable: there is no risk of the setpoint temperature changing without action from this plugin.

Note that reading the readout node is not the same as reading the setpoint node. The readout node represents the current temperature, while the setpoint node represents the desired temperature. The setpoint node can be read to get the current desired temperature, but it can also be written to change the desired temperature.

Because both of these nodes deal with quantities, use the NumericNode, which inherits from BaseNode, to handle units, minimum and maximum at no cost. Use degC for Celsius degrees as the unit for both nodes, and 10 – 30 °C as the acceptable range for the setpoint.

class ThermostatReadoutNode(NumericNode):
  def __init__(self, device):
    super().__init__(
      # ValueNode options (inherited)
      readable=True,

      # NumericNode options
      unit=ureg.degC
    )

    self.id = "readout"

class ThermostatSetpointNode(NumericNode):
  def __init__(self, device):
    super().__init__(
      # ValueNode options (inherited)
      min=(10 * ureg.degC),
      max=(30 * ureg.degC),
      unit=ureg.degC

      # NumericNode options
      readable=True,
      writable=True
    )

    self.id = "setpoint"

All value nodes must be initialized and declared by their parent, as well as started by calling start(). Their connected attribute must also be set to the same value as their parent. You may also want to add a Lock common to the device node to avoid conflicting requests to the serial device.

import asyncio import Lock

class ThermostatDevice(DeviceNode):
  def __init__(self, ...):
    ...

    self.nodes = {
      'readout': ThermostatReadoutNode(self),
      'setpoint': ThermostatSetpointNode(self),
    }

    self.lock = Lock()

  async def start(self):
    async with Pool.open() as pool:
      for node in self.nodes.values():
        pool.start_soon(node.start())

      # Proceed with rest of the start() method
      # ...

      self.connected = True

      # Also initialized children nodes
      for node in self.nodes.values():
        node.connected = True

Now implement the abstract _read() and _write() methods. The _read() method shouldn't return anything, but instead it should set the value attribute of the node, a tuple composed of the time at which the value was obtained and the actual value, as a Quantity object.

Both methods can raise a NodeUnavailableError exception if the device is not connected. The exception will be ignored as you are expected to handle the disconnection by settings disconnected to False, which you did earlier.

class ThermostatReadoutNode(ValueNode):
  async def _read(self):
    async with self.device.lock:
      try:
        await asyncio.to_thread(lambda: self.device.serial.write(b"GET_READOUT\r\n"))
        response = await asyncio.to_thread(lambda: self.device.serial.read_until(b"\n"))

        self.value = (
          time.time(),
          float(response) * ureg.degC
        )
      except SerialError as e:
        raise NodeUnavailableError from e

class ThermostatSetpointNode(ValueNode):
  async def _read(self):
    # Same as ThermostatReadoutNode._read() but with "GET_SETPOINT" instead of "GET_READOUT"

  async def _write(self, value, /):
    async with self.device.lock:
      try:
        await asyncio.to_thread(lambda: self.device.serial.write(f"SET_SETPOINT {value.magnitude:.2f}\r\n".encode()))
        response = await asyncio.to_thread(lambda: self.device.serial.read_until(b"\n"))

        if response != "OK":
          raise NodeUnavailableError
      except SerialError as e:
        raise NodeUnavailableError from e

Finally, define a way for the value of each node to be observed. The readout node can be set be polled every second by inheriting from the PollableWatchableNode class with the poll_interval=1 option. The setpoint node is stable, which is the default. Its value will only be read when it becomes connected.

class ThermostatReadoutNode(NumericNode, PollableReadableNode):
  def __init__(self, device):
    super().__init__(
      # PollableReadableNode options
      poll_interval=1,

      ...
    )

    ...