Flowmetalby

Reactive event handling

This example demonstrates inter-flow communication using Flowmetal’s inbox model. Every flow has a single inbox — the runtime wakes a flow when its inbox has messages. Flows communicate by sending messages to other flows’ inboxes.

Setup

load("pkg.flowmetal.io/prelude/v1",
     "args",
     "flow",
     "FlowContext",
     "unwrap",
)

Simple event handler

The simplest reactive flow: receive an event, do something with it. args.event() declares that this flow is event-driven — the runtime fills it from the inbox on each dispatch.

echo = flow(
    args = {
        "event": args.event(),
    },
    implementation = _echo_impl,
)

def _echo_impl(fctx: FlowContext):
    print(fctx.args.event)

Stateful handler

args.state() declares a mutable register that the runtime persists across invocations. No external store needed — the runtime checkpoints it automatically.

counter = flow(
    args = {
        "event": args.event(),
        "count": args.state(initial = 0),
    },
    implementation = _counter_impl,
)

def _counter_impl(fctx: FlowContext):
    fctx.args.count += 1
    print("count:", fctx.args.count, "event:", fctx.args.event)

Request-reply

The reply address is carried in the message itself — Erlang-style {from, payload} tuples. The caller includes its own inbox so the worker knows where to send results.

worker = flow(
    args = {
        "event": args.event(),
    },
    implementation = _worker_impl,
)

def _worker_impl(fctx: FlowContext):
    result = "processed: " + str(fctx.args.event["payload"])
    fctx.actions.send(fctx.args.event["reply_to"], result)

Relay (static wiring)

Not all targets need to be in the message. args.inbox() declares a write-only reference to another flow’s inbox, provided at spawn time.

relay = flow(
    args = {
        "event": args.event(),
        "downstream": args.inbox(),
    },
    implementation = _relay_impl,
)

def _relay_impl(fctx: FlowContext):
    fctx.actions.send(fctx.args.downstream, fctx.args.event)

Wiring it all together

run() returns a flow_ref. The .inbox property narrows it to a write-only inbox reference — the minimum-authority capability for sending messages.

def _main_impl(fctx: FlowContext):
    # Spawn handlers
    echo_ref = unwrap(fctx.actions.run(echo))
    fctx.actions.send(echo_ref.inbox, "hello")
    fctx.actions.send(echo_ref.inbox, "world")

    # Stateful counter
    counter_ref = unwrap(fctx.actions.run(counter))
    for i in range(5):
        fctx.actions.send(counter_ref.inbox, "tick {}".format(i))

    # Request-reply with our inbox as return address
    worker_ref = unwrap(fctx.actions.run(worker))
    fctx.actions.send(worker_ref.inbox, {
        "reply_to": fctx.inbox,
        "payload": 42,
    })

    # Relay chain: echo <- relay <- us
    relay_ref = unwrap(fctx.actions.run(relay,
        downstream = echo_ref.inbox,
    ))
    fctx.actions.send(relay_ref.inbox, "relayed message")

main = flow(implementation = _main_impl)

if __name__ == "__flow__":
    main()

View the complete example