Automancer docs

Tutorial: Create a timer process


This tutorial will guide you through the process of creating a simple process. The process will be a simple timer with support for halting, but without pause/resume or jumping, unlike the built-in timer.

This timer plugin will have three main components:

  • A parser that understands the sleep attribute and validates its argument.
  • A process that executes the sleep command. This is the core of the plugin.
  • A client-side component that displays a progress bar.

Create the plugin

The file structure of the plugin will be as follows. For simplicity, the Python project, Python module and Automancer plugin will all be named timer.

├── pyproject.toml
└── timer
    ├── index.js

Start by creating the pyproject.toml file. This example uses Setuptools but feel free to use any other build system.

requires = ["setuptools>=61"]

name = "timer"
version = "1.0.0"

timer = "timer"

timer = ["*"]

The most important thing here is the declaration of an automancer.plugins entry point. The entry point is named timer on the left-hand side and points to the timer module on the right-hand side. This is how Automancer will find the plugin.

The tools.setuptools.package-data is necessary for the client-side file index.js to be included in the project once packaged.

Next, create the file, as follows. The client_path assignment references the client-side file timer/index.js. Make sure to create that file or comment that line for now.

from importlib.resources import files

namespace = "timer"
client_path = files(__name__) / "index.js"

You can now install the package, and it will be automatically discovered by Automancer. Install it in editable mode to avoid having to reinstall it after each change.

$ pip install -e .

Write the parser

To add functionality to the plugin, create the file, and add the following code:

import automancer as am

async def process(context):
  # To be filled in

class Parser(am.BaseParser):
  namespace = "timer"

  def __init__(self, fiber):

    self.transformers = [am.ProcessTransformer(process, {
      'sleep': am.Attribute(
        description="Sleeps for a fixed duration.",
    }, parser=fiber)]

All parsers must inherit from am.BaseParser. In this parser, you are instantiating a single transformer which defines operations on the attributes being currently processed. In particular, the built-in ProcessTransformer is a lead transformer that will create a process from the provided function process, which you will write in the next section. It is possible to create custom transformers to further customize the parsing procedure.

The transformer only reads a single attribute, sleep with a description (visible in the editor when hovering or completing) and quantity type. Expressions are also allowed by default, therefore all of these are valid:

sleep: 1 sec
sleep: 10 min
sleep: {{ 1 * unit.sec }}

There is no need to set the attribute as optional. If the attribute is missing, the transformer will be ignored.

Before testing the plugin, make sure to import the Parser inside, by adding this line. The name Parser will be recognized by Automancer.

from .process import Parser

You can test parsing in the text editor, but running protocols with an empty process will result in an error.

Create a location class

Each process can periodically report a location object, an arbitrary instance which represents the current progress of the process. This object is only used to display information to the user while and after running the protocol, and it must be exportable. Dataclasses are very convenient to implement location objects.

Add the following to the file. You will be sending the current progress (ranging from 0 to 1) to the user interface, as well as the duration which was just obtained. The duration cannot be obtained reliably in any other way by the client given that it could have been an expression until reaching that step.

from automancer import Exportable
from dataclasses import dataclass

class ProcessLocation(Exportable):
  duration: Quantity
  progress: float

  def export(self):
    return {
      "duration": self.duration.value,
      "progress": self.progress

The process' location must be sent each time the process' progress changes if it cannot be extrapolated from the previous location instance. For the timer, there is no need to periodically send location as the current progress can be trivially extrapolated from the previous location's progress and time, and the current time.

Write the process

Start with the following. This code implements the basic functionality for the timer, but no support advanced functionality such as for pausing and halting.

async def process(data, context):

  await asyncio.sleep(data.value)


Implement the corresponding client-side code

import { createProcessBlockImpl } from 'automancer';

export default {
  name: 'timer',
  blocks: [
    '_': createProcessBlockImpl({
      Component(props) {
        return (
      createFeatures(data, location) {
        return [{
          icon: 'hourglass_empty',
          label: 'Wait'

Implement process control operations


There is no action needed to make sure halting works as it would just cause the call to context.wait(asyncio.sleep()) (if not paused) or context.checkpoint() (if paused) to be cancelled.

However, the code can be improved by sending the correct progress when halting occurs, and then re-raising the exception to complete the halt request. This location will not visible when running the protocol as another step will immediately take the current one's plane, but will be displayed in the logs.

  await context.wait(...)
except asyncio.CancelledError:
  current_time = time.time()
  progress += (current_time - start_time) / data.value

  context.send_location(ProcessLocation(data, progress))


When the process is paused and a halt request is received, the call to context.checkpoint() is cancelled instead. The progress reported when pausing is already correct in this case.


To implement pausing, use context.wait() to stop the asyncio.sleep() call once a pause request is received. You must also add a loop to reflect the possibility for the process to be paused and resumved multiple times.

async def process(context):
  data =
  progress = 0.0

  context.send_location(data, progress)

  while True:
    start_time = time.time()

    context.send_duration(data.value * (1.0 - progress))

      await context.wait(asyncio.sleep(data.value * (1.0 - progress)))
    except am.PauseRequest:
      current_time = time.time()
      progress += (current_time - start_time) / data.value

      context.send_duration(data.value * (1.0 - progress))
      context.send_location(ProcessLocation(data, progress))

      await context.checkpoint()

  context.send_location(data, progress)


Implement jumping by intercepting the JumpRequest exception and using the e.point attribute to retreive the point to apply to the process. Like with halting, you may want to update the location before proceeding.

  await context.wait(...)
except am.JumpRequest as e:
  progress = e.point


Implement swapping by intercepting the SwapRequest exception and using the attribute to retreive the new data of the process. If the new duration is lower than the current progress, you can decide to either raise the exception to reject the swap request and have the process be restarted, or return to terminate the process and skip this step.

  await context.wait(...)
except am.SwapRequest as e:
  progress += ...

  if < (progress *
    raise # or return

  data =