Flowmetalby

Fire dispatch

"""TireFire Heavy Industries: incident dispatch demo flow.

Pulls a fresh lot from tire-registry, decides a priority from its
weight + condition, opens an incident on the (flaky) fire-dispatch
API, and polls until the incident reaches a terminal state.

fire-dispatch is deliberately unreliable (~75% failure, mostly
retryable transients like `crews_unavailable` / `dispatch_service_down`).
The flow loops up to MAX_ATTEMPTS times, reopening a fresh incident
on each retryable failure, and only gives up when the fire
terminally escalates or the attempt budget runs out. Each http()
call is a checkpoint so replay-after-suspend resumes mid-retry.

`fctx.actions.http(method, url, …)` returns the decoded response on
success (with `body` holding the parsed JSON for JSON responses) and
raises on connector-side failures (5xx, timeout, network).

The two upstream services are in-cluster ClusterIP services deployed
alongside the platform:
  http://tire-registry.flowmetal.svc.cluster.local:8080
  http://fire-dispatch.flowmetal.svc.cluster.local:8080
"""

TIRE_REGISTRY = "http://tire-registry.flowmetal.svc.cluster.local:8080"
FIRE_DISPATCH = "http://fire-dispatch.flowmetal.svc.cluster.local:8080"

MAX_ATTEMPTS = 4
MAX_POLLS = 15
POLL_SLEEP_SECONDS = 2


def _fib(n):
    if n < 1:
        fail("fib: n must be a positive integer")
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a


def _priority_for(lot):
    if lot["condition"] == "engulfed" or lot["weight_kg"] > 1000:
        return "urgent"
    return "normal"


def _open_and_poll(fctx, lot, priority, attempt):
    """Open an incident and poll until it terminates. Returns the body
    dict; caller decides whether to retry based on `status`."""
    print("[attempt %d] calling fire-dispatch to roll a containment crew…" % attempt)
    open_resp = fctx.actions.http(
        "POST",
        FIRE_DISPATCH + "/incidents",
        body_json = {"lot_id": lot["lot_id"], "priority": priority},
    )
    incident_id = open_resp["body"]["incident_id"]
    print("[attempt %d] incident %s open, waiting on containment…" % (attempt, incident_id))

    for _ in range(MAX_POLLS):
        fctx.actions.sleep(POLL_SLEEP_SECONDS)
        body = fctx.actions.http(
            "GET", FIRE_DISPATCH + "/incidents/" + incident_id,
        )["body"]
        if body["status"] != "pending":
            return body
    fail("incident %s did not terminate within poll budget" % incident_id)


def _dispatch_impl(fctx):
    # 1. Pull a fresh tire-fire report from the registry.
    print("[dispatch] phoning tire-registry for the next incoming report…")
    lot = fctx.actions.http("GET", TIRE_REGISTRY + "/lots/next")["body"]
    priority = _priority_for(lot)
    print("[dispatch] lot %s at %d kg, condition '%s' — priority %s." % (
        lot["lot_id"], lot["weight_kg"], lot["condition"], priority,
    ))

    # 2. Dispatch with retry. Transient failures (crews_unavailable,
    #    dispatch_service_down) re-roll the dispatch on a fresh
    #    incident id; a terminal `fire_escalated` aborts with partial
    #    success (the lot will show up in another run).
    last_body = None
    for attempt in range(1, MAX_ATTEMPTS + 1):
        body = _open_and_poll(fctx, lot, priority, attempt)
        last_body = body
        status = body["status"]
        if status == "contained":
            print("[dispatch] lot %s contained on attempt %d." % (lot["lot_id"], attempt))
            return {"lot_id": lot["lot_id"], "status": "contained", "attempts": attempt, "detail": body}
        # fire-dispatch puts the specific failure mode in `error_kind`
        # (crews_unavailable, fire_escalated, dispatch_service_down)
        # and marks retryable transients with `retryable: true`.
        reason = body.get("error_kind", status)
        if not body.get("retryable", False):
            print("[dispatch] lot %s terminally lost: %s." % (lot["lot_id"], reason))
            return {"lot_id": lot["lot_id"], "status": status, "attempts": attempt, "detail": body}
        print("[dispatch] attempt %d failed (%s); retrying." % (attempt, reason))
        fctx.actions.sleep(_fib(attempt))

    # Retry budget exhausted. This is an expected business outcome
    # (upstream is designedly flaky) not a flow bug, so return DONE
    # with an explicit abandonment status instead of fail()-ing —
    # FAILED + a Starlark traceback reads as an internal error.
    print("[dispatch] lot %s abandoned after %d attempts; last attempt: %s." % (
        lot["lot_id"], MAX_ATTEMPTS,
        (last_body or {}).get("error_kind", "unknown"),
    ))
    return {
        "lot_id": lot["lot_id"],
        "status": "retries_exhausted",
        "attempts": MAX_ATTEMPTS,
        "detail": last_body,
    }


dispatch = flow(implementation = _dispatch_impl)