Developing processes
Creating a simple process
A process is an object responsible for the execution of a step. The process attached to a step determines when the step terminates or fails, and whether it can be paused.
Practically, a process is a Python callable object which returns an awaitable, in other words it is usually an asynchronous function. It takes two arguments:
data
– The data provided by the user, already evaluated. Here,data
is aquantops.Quantity
object as specified by earlier by setting the attribute's type toam.QuantityType
. Accessingdata.value
provides the duration in SI units, i.e. in seconds. In other applications,data
could be a more complex object, such as a list of records.context
– AProcessContext
object used to interact with the process' owner.
The function is wrapped in a task and is cancelled when a halt request is received. You can intercept the cancellation to take appropriate action.
Raising an exception will cause the process to fail, however the user will be able to recover from the error.
Let's look at the different methods provided by ProcessContext
:
context.send_location(<location>)
– Send a location object to the client. Calling this method more than once in the same event loop iteration causes all locations but the last to be ignored.context.send_analysis(<analysis>)
– Send a runtime analysis object containing errors, warnings and effects.context.send_duration(<duration>)
– Send a update for the estimated remaining duration.await context.checkpoint()
– Wait for the process to resume, if it was paused.await context.receive()
– Wait for a message from the client to be received.await context.wait(<awaitable>)
– Wait for an arbitrary awaitable to complete and return its return value. If a process control request is received, cancels the awaitable and raises a corresponding exception, one ofJumpRequest
,PauseRequest
orSwapRequest
. -context.data
– The starting data object.context.point
– The starting point object.context.pausable
– Indicates whether the process is pausable. Set this attribute totrue
to enable pausing. Defaults tofalse
.
Implementing process control operations
Halting
Halting is triggered by user action, during error handling, jumping and swapping, depending on the context.
Halt requests can be intercepted by catching the asyncio.CancelledError
exception from any awaited coroutine. Like with regular coroutines, it is best to re-raise the exception as soon as possible. The finally
clause can be used to clean up resources.
import asyncio
async def process(data, context):
try:
await run_step1()
await run_step2()
except asyncio.CancelledError:
# ...
Pausing
Pausing is triggered by user action or during error handling. If pausing is not implemented, no ‘Pause’ button is displayed to the user and error handling relies on halting instead.
Before implementing pausing, the context.pausable
attribute must be set to true
. The “pausability” of a process change may change over time as it changes from one phase to another, thus it is important to declare that to the user interface.
Pausing can be implemented in two different ways.
-
Calling
context.checkpoint()
periodically causes the coroutine to be suspended if a pause request has already been received. This is useful for processes made of multiple small steps that cannot be paused individually but where a pause can occur in between two steps.async def process(data, context): context.pausable = True for index in range(10): await run_step(index) context.send_location(...) await context.checkpoint()
-
Calling
context.wait()
with an awaitable causes the awaitable to be cancelled after a pause request is received, and aPauseRequest
exception to be raised. The exception must then be catched andcontext.checkpoint()
called to wait for the process to resume. Letting the exception propagate to the coroutine will result in an error. For example, to stop a long step when a pause request is received, and restart it after resuming:async def process(data, context): context.pausable = True while True: try: await context.wait(long_step()) except PauseRequest: context.send_location(...) await context.checkpoint() else: break
Jumping
Jumping refers to update the process from one internal state to another. Jumping is only triggered by user action.
Objects used to describe a jump request are known as points.
Jump requests can be detected by calling context.checkpoint()
and context.wait()
requests. If a jump request is received, a JumpRequest
exception is raised. If the exception propagates to the coroutine, which is the default behavior, the jump fails and the process is recreated with the new point.
This example shows how to jump between steps in a five-step process:
def process(context):
for index in range(context.point.index if context.point else 0, 5):
try:
await context.wait(run_step(index))
except JumpRequest as e:
# e.old_point
index = e.point.index
To avoid restarting unnecessary execution when jumping to the current step, the solution is a little more complicated:
def process(context):
for index in range(context.point if context.point else 0, 5):
coro = run_step(index)
while True:
try:
await context.wait(asyncio.shield(coro))
except JumpRequest as e:
if (new_index := e.point.index) != index:
index = new_index
break
coro.cancel()
try:
await coro
except asyncio.CancelledError:
pass
else:
break
Swapping
Swapping is the most complex process control operation and refers to the ability to swap the full input data of a process. Unlike jumping, swapping acts on the input data and is used upon user action, when editing protocols at runtime.
Swapping has similar behavior to jumping, using a SwapRequest
exception. If the exception propagates to the coroutine, the swap fails and the process is recreated with the new data.
This example shows how to restart an HTTP request when a swap request is received.
def process(context):
data = context.data
while True:
try:
await context.wait(fetch_http(data.address))
except SwapRequest as e:
# e.old_data
data = e.data
else:
break
Unless there are significant benefits when adding swapping, unlike this example, the default behavior, i.e. recreating the process, should suffice.
Recipes
Execute a single synchronous action
import asyncio
def process(context):
context.send_duration(4.5) # Estimated 4.5 seconds
await am.shield(asyncio.to_thread(run_synchronous_action))
Execute multiple synchronous actions
from dataclasses import dataclass
from typing import Literal
@dataclass
class Location:
phase: Literal[0, 1, 2]
def export(self):
return { "phase": self.phase }
def process(context):
context.send_location(Location(0))
context.send_duration(2.0)
await am.shield(asyncio.to_thread(run_synchronous_action1))
context.send_location(Location(1))
context.send_duration(1.0)
await context.checkpoint()
await am.shield(asyncio.to_thread(run_synchronous_action2))
context.send_location(Location(2))
Report an error or warning
def process(context):
try:
await run_asynchronous_action()
except Exception as e:
context.send_error(am.GenericDiagnostic(e))
context.send_warning(am.GenericDiagnostic(e))
Report a critical error
def process(context):
try:
await run_asynchronous_action()
except Exception as e:
raise am.ProcessFailureError from e
Report an effect
def process(context):
context.send_effect(am.FileCreatedEffect('file.txt'))
Send an email
import smtplib
from email.mime.text import MIMEText
def process(context):
message = MIMEText(context.data.contents)
message['Subject'] = context.data.subject
message['From'] = context.data.sender
message['To'] = context.data.recipient
session = smtplib.SMTP()
try:
await am.shield(asyncio.to_thread(lambda session.connect(context.data.address)))
await context.checkpoint()
try:
await am.shield(asyncio.to_thread(lambda: session.login(context.data.username, context.data.password)))
except smtp.SMTPAuthenticationError as e:
raise am.ProcessFailureError('Invalid credentials') from e
await context.checkpoint()
await am.shield(asyncio.to_thread(lambda: session.sendmail(context.data.sender, [context.data.recipient], msg.as_string())))
await context.checkpoint()
except smtp.SMTPException as e:
raise am.ProcessFailureError from e
finally:
await am.shield(asyncio.to_thread(lambda: session.quit()))
Make an HTTP request
import urllib3
def process(context):
try:
resp = await asyncio.to_thread(lambda: urllib3.request("GET", context.data))
except urllib3.exceptions.HTTPError as e:
raise am.ProcessFailureError as e