Tutorial: Create a timer process
Introduction
This tutorial will guide you through the process of creating a simple process. The process will be a simple timer with support for halting, but without pause/resume or jumping, unlike the built-in timer.
This timer plugin will have three main components:
- A parser that understands the
sleep
attribute and validates its argument. - A process that executes the sleep command. This is the core of the plugin.
- A client-side component that displays a progress bar.
Create the plugin
The file structure of the plugin will be as follows. For simplicity, the Python project, Python module and Automancer plugin will all be named timer
.
.
├── pyproject.toml
└── timer
├── __init__.py
├── index.js
└── process.py
Start by creating the pyproject.toml
file. This example uses Setuptools but feel free to use any other build system.
[build-system]
requires = ["setuptools>=61"]
[project]
name = "timer"
version = "1.0.0"
[project.entry-points."automancer.plugins"]
timer = "timer"
[tools.setuptools.package-data]
timer = ["*"]
The most important thing here is the declaration of an automancer.plugins
entry point. The entry point is named timer
on the left-hand side and points to the timer
module on the right-hand side. This is how Automancer will find the plugin.
The tools.setuptools.package-data
is necessary for the client-side file index.js
to be included in the project once packaged.
Next, create the __init__.py
file, as follows. The client_path
assignment references the client-side file timer/index.js
. Make sure to create that file or comment that line for now.
from importlib.resources import files
namespace = "timer"
client_path = files(__name__) / "index.js"
You can now install the package, and it will be automatically discovered by Automancer. Install it in editable mode to avoid having to reinstall it after each change.
$ pip install -e .
Write the parser
To add functionality to the plugin, create the process.py
file, and add the following code:
import automancer as am
async def process(context):
# To be filled in
pass
class Parser(am.BaseParser):
namespace = "timer"
def __init__(self, fiber):
super().__init__(fiber)
self.transformers = [am.ProcessTransformer(process, {
'sleep': am.Attribute(
am.QuantityType('second')
description="Sleeps for a fixed duration.",
)
}, parser=fiber)]
All parsers must inherit from am.BaseParser
. In this parser, you are instantiating a single transformer which defines operations on the attributes being currently processed. In particular, the built-in ProcessTransformer
is a lead transformer that will create a process from the provided function process
, which you will write in the next section. It is possible to create custom transformers to further customize the parsing procedure.
The transformer only reads a single attribute, sleep
with a description (visible in the editor when hovering or completing) and quantity type. Expressions are also allowed by default, therefore all of these are valid:
sleep: 1 sec
sleep: 10 min
sleep: {{ 1 * unit.sec }}
There is no need to set the attribute as optional. If the attribute is missing, the transformer will be ignored.
Before testing the plugin, make sure to import the Parser
inside __init__.py
, by adding this line. The name Parser
will be recognized by Automancer.
from .process import Parser
You can test parsing in the text editor, but running protocols with an empty process will result in an error.
Create a location class
Each process can periodically report a location object, an arbitrary instance which represents the current progress of the process. This object is only used to display information to the user while and after running the protocol, and it must be exportable. Dataclasses are very convenient to implement location objects.
Add the following to the file. You will be sending the current progress (ranging from 0
to 1
) to the user interface, as well as the duration which was just obtained. The duration cannot be obtained reliably in any other way by the client given that it could have been an expression until reaching that step.
from automancer import Exportable
from dataclasses import dataclass
@dataclass
class ProcessLocation(Exportable):
duration: Quantity
progress: float
def export(self):
return {
"duration": self.duration.value,
"progress": self.progress
}
The process' location must be sent each time the process' progress changes if it cannot be extrapolated from the previous location instance. For the timer, there is no need to periodically send location as the current progress can be trivially extrapolated from the previous location's progress and time, and the current time.
Write the process
Start with the following. This code implements the basic functionality for the timer, but no support advanced functionality such as for pausing and halting.
async def process(data, context):
context.send_duration(data.value)
context.send_location(ProcessLocation(
duration=data,
progress=0.0
))
await asyncio.sleep(data.value)
context.send_location(ProcessLocation(
duration=data,
progress=1.0
))
Implement the corresponding client-side code
import { createProcessBlockImpl } from 'automancer';
export default {
name: 'timer',
blocks: [
'_': createProcessBlockImpl({
Component(props) {
return (
<div>{JSON.stringify(props.location)}</div>
);
},
createFeatures(data, location) {
return [{
icon: 'hourglass_empty',
label: 'Wait'
}];
}
})
]
}
Implement process control operations
Halting
There is no action needed to make sure halting works as it would just cause the call to context.wait(asyncio.sleep())
(if not paused) or context.checkpoint()
(if paused) to be cancelled.
However, the code can be improved by sending the correct progress when halting occurs, and then re-raising the exception to complete the halt request. This location will not visible when running the protocol as another step will immediately take the current one's plane, but will be displayed in the logs.
try:
await context.wait(...)
except asyncio.CancelledError:
current_time = time.time()
progress += (current_time - start_time) / data.value
context.send_location(ProcessLocation(data, progress))
raise
When the process is paused and a halt request is received, the call to context.checkpoint()
is cancelled instead. The progress reported when pausing is already correct in this case.
Pausing
To implement pausing, use context.wait()
to stop the asyncio.sleep()
call once a pause request is received. You must also add a loop to reflect the possibility for the process to be paused and resumved multiple times.
async def process(context):
data = context.data
progress = 0.0
context.send_location(data, progress)
while True:
start_time = time.time()
context.send_duration(data.value * (1.0 - progress))
try:
await context.wait(asyncio.sleep(data.value * (1.0 - progress)))
except am.PauseRequest:
current_time = time.time()
progress += (current_time - start_time) / data.value
context.send_duration(data.value * (1.0 - progress))
context.send_location(ProcessLocation(data, progress))
await context.checkpoint()
else:
break
context.send_location(data, progress)
Jumping
Implement jumping by intercepting the JumpRequest
exception and using the e.point
attribute to retreive the point to apply to the process. Like with halting, you may want to update the location before proceeding.
try:
await context.wait(...)
except am.JumpRequest as e:
context.send_location(...)
progress = e.point
Swapping
Implement swapping by intercepting the SwapRequest
exception and using the e.data
attribute to retreive the new data of the process. If the new duration is lower than the current progress, you can decide to either raise
the exception to reject the swap request and have the process be restarted, or return
to terminate the process and skip this step.
try:
await context.wait(...)
except am.SwapRequest as e:
progress += ...
context.send_location(...)
if e.data < (progress * context.data.value):
raise # or return
data = e.data