Automancer docs

Developing processes

Creating a simple process

A process is an object responsible for the execution of a step. The process attached to a step determines when the step terminates or fails, and whether it can be paused.

Practically, a process is a Python callable object which returns an awaitable, in other words it is usually an asynchronous function. It takes two arguments:

  • data – The data provided by the user, already evaluated. Here, data is a quantops.Quantity object as specified by earlier by setting the attribute's type to am.QuantityType. Accessing data.value provides the duration in SI units, i.e. in seconds. In other applications, data could be a more complex object, such as a list of records.
  • context – A ProcessContext object used to interact with the process' owner.

The function is wrapped in a task and is cancelled when a halt request is received. You can intercept the cancellation to take appropriate action.

Raising an exception will cause the process to fail, however the user will be able to recover from the error.

Let's look at the different methods provided by ProcessContext:

  • context.send_location(<location>) – Send a location object to the client. Calling this method more than once in the same event loop iteration causes all locations but the last to be ignored.
  • context.send_analysis(<analysis>) – Send a runtime analysis object containing errors, warnings and effects.
  • context.send_duration(<duration>) – Send a update for the estimated remaining duration.
  • await context.checkpoint() – Wait for the process to resume, if it was paused.
  • await context.receive() – Wait for a message from the client to be received.
  • await context.wait(<awaitable>) – Wait for an arbitrary awaitable to complete and return its return value. If a process control request is received, cancels the awaitable and raises a corresponding exception, one of JumpRequest, PauseRequest or SwapRequest. - context.data – The starting data object.
  • context.point – The starting point object.
  • context.pausable – Indicates whether the process is pausable. Set this attribute to true to enable pausing. Defaults to false.

Implementing process control operations

Halting

Halting is triggered by user action, during error handling, jumping and swapping, depending on the context.

Halt requests can be intercepted by catching the asyncio.CancelledError exception from any awaited coroutine. Like with regular coroutines, it is best to re-raise the exception as soon as possible. The finally clause can be used to clean up resources.

import asyncio

async def process(data, context):
  try:
    await run_step1()
    await run_step2()
  except asyncio.CancelledError:
    # ...

Pausing

Pausing is triggered by user action or during error handling. If pausing is not implemented, no ‘Pause’ button is displayed to the user and error handling relies on halting instead.

Before implementing pausing, the context.pausable attribute must be set to true. The “pausability” of a process change may change over time as it changes from one phase to another, thus it is important to declare that to the user interface.

Pausing can be implemented in two different ways.

  • Calling context.checkpoint() periodically causes the coroutine to be suspended if a pause request has already been received. This is useful for processes made of multiple small steps that cannot be paused individually but where a pause can occur in between two steps.

    async def process(data, context):
      context.pausable = True
    
      for index in range(10):
        await run_step(index)
        context.send_location(...)
    
        await context.checkpoint()
    
  • Calling context.wait() with an awaitable causes the awaitable to be cancelled after a pause request is received, and a PauseRequest exception to be raised. The exception must then be catched and context.checkpoint() called to wait for the process to resume. Letting the exception propagate to the coroutine will result in an error. For example, to stop a long step when a pause request is received, and restart it after resuming:

    async def process(data, context):
      context.pausable = True
    
      while True:
        try:
          await context.wait(long_step())
        except PauseRequest:
          context.send_location(...)
          await context.checkpoint()
        else:
          break
    

Jumping

Jumping refers to update the process from one internal state to another. Jumping is only triggered by user action.

Objects used to describe a jump request are known as points.

Jump requests can be detected by calling context.checkpoint() and context.wait() requests. If a jump request is received, a JumpRequest exception is raised. If the exception propagates to the coroutine, which is the default behavior, the jump fails and the process is recreated with the new point.

This example shows how to jump between steps in a five-step process:

def process(context):
  for index in range(context.point.index if context.point else 0, 5):
    try:
      await context.wait(run_step(index))
    except JumpRequest as e:
      # e.old_point
      index = e.point.index

To avoid restarting unnecessary execution when jumping to the current step, the solution is a little more complicated:

def process(context):
  for index in range(context.point if context.point else 0, 5):
    coro = run_step(index)

    while True:
      try:
        await context.wait(asyncio.shield(coro))
      except JumpRequest as e:
        if (new_index := e.point.index) != index:
          index = new_index
          break

        coro.cancel()

        try:
          await coro
        except asyncio.CancelledError:
          pass
      else:
        break

Swapping

Swapping is the most complex process control operation and refers to the ability to swap the full input data of a process. Unlike jumping, swapping acts on the input data and is used upon user action, when editing protocols at runtime.

Swapping has similar behavior to jumping, using a SwapRequest exception. If the exception propagates to the coroutine, the swap fails and the process is recreated with the new data.

This example shows how to restart an HTTP request when a swap request is received.

def process(context):
  data = context.data

  while True:
    try:
      await context.wait(fetch_http(data.address))
    except SwapRequest as e:
      # e.old_data
      data = e.data
    else:
      break

Unless there are significant benefits when adding swapping, unlike this example, the default behavior, i.e. recreating the process, should suffice.

Recipes

Execute a single synchronous action

import asyncio

def process(context):
  context.send_duration(4.5) # Estimated 4.5 seconds

  await am.shield(asyncio.to_thread(run_synchronous_action))

Execute multiple synchronous actions

from dataclasses import dataclass
from typing import Literal

@dataclass
class Location:
  phase: Literal[0, 1, 2]

  def export(self):
    return { "phase": self.phase }

def process(context):
  context.send_location(Location(0))
  context.send_duration(2.0)

  await am.shield(asyncio.to_thread(run_synchronous_action1))

  context.send_location(Location(1))
  context.send_duration(1.0)

  await context.checkpoint()
  await am.shield(asyncio.to_thread(run_synchronous_action2))

  context.send_location(Location(2))

Report an error or warning

def process(context):
  try:
    await run_asynchronous_action()
  except Exception as e:
    context.send_error(am.GenericDiagnostic(e))
    context.send_warning(am.GenericDiagnostic(e))

Report a critical error

def process(context):
  try:
    await run_asynchronous_action()
  except Exception as e:
    raise am.ProcessFailureError from e

Report an effect

def process(context):
  context.send_effect(am.FileCreatedEffect('file.txt'))

Send an email

import smtplib
from email.mime.text import MIMEText

def process(context):
  message = MIMEText(context.data.contents)

  message['Subject'] = context.data.subject
  message['From'] = context.data.sender
  message['To'] = context.data.recipient

  session = smtplib.SMTP()

  try:
    await am.shield(asyncio.to_thread(lambda session.connect(context.data.address)))
    await context.checkpoint()

    try:
      await am.shield(asyncio.to_thread(lambda: session.login(context.data.username, context.data.password)))
    except smtp.SMTPAuthenticationError as e:
      raise am.ProcessFailureError('Invalid credentials') from e

    await context.checkpoint()

    await am.shield(asyncio.to_thread(lambda: session.sendmail(context.data.sender, [context.data.recipient], msg.as_string())))
    await context.checkpoint()
  except smtp.SMTPException as e:
    raise am.ProcessFailureError from e
  finally:
    await am.shield(asyncio.to_thread(lambda: session.quit()))

Make an HTTP request

import urllib3

def process(context):
  try:
    resp = await asyncio.to_thread(lambda: urllib3.request("GET", context.data))
  except urllib3.exceptions.HTTPError as e:
    raise am.ProcessFailureError as e