Reactive event handling
This example demonstrates inter-flow communication using Flowmetal’s inbox model. Every flow has a single inbox — the runtime wakes a flow when its inbox has messages. Flows communicate by sending messages to other flows’ inboxes.
Setup
load("pkg.flowmetal.io/prelude/v1",
"args",
"flow",
"FlowContext",
"unwrap",
)
Simple event handler
The simplest reactive flow: receive an event, do something with it. args.event() declares that this flow is event-driven — the runtime fills it from the inbox on each dispatch.
echo = flow(
args = {
"event": args.event(),
},
implementation = _echo_impl,
)
def _echo_impl(fctx: FlowContext):
print(fctx.args.event)
Stateful handler
args.state() declares a mutable register that the runtime persists across invocations. No external store needed — the runtime checkpoints it automatically.
counter = flow(
args = {
"event": args.event(),
"count": args.state(initial = 0),
},
implementation = _counter_impl,
)
def _counter_impl(fctx: FlowContext):
fctx.args.count += 1
print("count:", fctx.args.count, "event:", fctx.args.event)
Request-reply
The reply address is carried in the message itself — Erlang-style {from, payload} tuples. The caller includes its own inbox so the worker knows where to send results.
worker = flow(
args = {
"event": args.event(),
},
implementation = _worker_impl,
)
def _worker_impl(fctx: FlowContext):
result = "processed: " + str(fctx.args.event["payload"])
fctx.actions.send(fctx.args.event["reply_to"], result)
Relay (static wiring)
Not all targets need to be in the message. args.inbox() declares a write-only reference to another flow’s inbox, provided at spawn time.
relay = flow(
args = {
"event": args.event(),
"downstream": args.inbox(),
},
implementation = _relay_impl,
)
def _relay_impl(fctx: FlowContext):
fctx.actions.send(fctx.args.downstream, fctx.args.event)
Wiring it all together
run() returns a flow_ref. The .inbox property narrows it to a write-only inbox reference — the minimum-authority capability for sending messages.
def _main_impl(fctx: FlowContext):
# Spawn handlers
echo_ref = unwrap(fctx.actions.run(echo))
fctx.actions.send(echo_ref.inbox, "hello")
fctx.actions.send(echo_ref.inbox, "world")
# Stateful counter
counter_ref = unwrap(fctx.actions.run(counter))
for i in range(5):
fctx.actions.send(counter_ref.inbox, "tick {}".format(i))
# Request-reply with our inbox as return address
worker_ref = unwrap(fctx.actions.run(worker))
fctx.actions.send(worker_ref.inbox, {
"reply_to": fctx.inbox,
"payload": 42,
})
# Relay chain: echo <- relay <- us
relay_ref = unwrap(fctx.actions.run(relay,
downstream = echo_ref.inbox,
))
fctx.actions.send(relay_ref.inbox, "relayed message")
main = flow(implementation = _main_impl)
if __name__ == "__flow__":
main()