"""TireFire Heavy Industries: incident dispatch demo flow.
Pulls a fresh lot from tire-registry, decides a priority from its
weight + condition, opens an incident on the (flaky) fire-dispatch
API, and polls until the incident reaches a terminal state.
fire-dispatch is deliberately unreliable (~75% failure, mostly
retryable transients like `crews_unavailable` / `dispatch_service_down`).
The flow loops up to MAX_ATTEMPTS times, reopening a fresh incident
on each retryable failure, and only gives up when the fire
terminally escalates or the attempt budget runs out. Each http()
call is a checkpoint so replay-after-suspend resumes mid-retry.
`fctx.actions.http(method, url, …)` returns the decoded response on
success (with `body` holding the parsed JSON for JSON responses) and
raises on connector-side failures (5xx, timeout, network).
The two upstream services are in-cluster ClusterIP services deployed
alongside the platform:
http://tire-registry.flowmetal.svc.cluster.local:8080
http://fire-dispatch.flowmetal.svc.cluster.local:8080
"""
TIRE_REGISTRY = "http://tire-registry.flowmetal.svc.cluster.local:8080"
FIRE_DISPATCH = "http://fire-dispatch.flowmetal.svc.cluster.local:8080"
MAX_ATTEMPTS = 4
MAX_POLLS = 15
POLL_SLEEP_SECONDS = 2
def _fib(n):
if n < 1:
fail("fib: n must be a positive integer")
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
def _priority_for(lot):
if lot["condition"] == "engulfed" or lot["weight_kg"] > 1000:
return "urgent"
return "normal"
def _open_and_poll(fctx, lot, priority, attempt):
"""Open an incident and poll until it terminates. Returns the body
dict; caller decides whether to retry based on `status`."""
print("[attempt %d] calling fire-dispatch to roll a containment crew…" % attempt)
open_resp = fctx.actions.http(
"POST",
FIRE_DISPATCH + "/incidents",
body_json = {"lot_id": lot["lot_id"], "priority": priority},
)
incident_id = open_resp["body"]["incident_id"]
print("[attempt %d] incident %s open, waiting on containment…" % (attempt, incident_id))
for _ in range(MAX_POLLS):
fctx.actions.sleep(POLL_SLEEP_SECONDS)
body = fctx.actions.http(
"GET", FIRE_DISPATCH + "/incidents/" + incident_id,
)["body"]
if body["status"] != "pending":
return body
fail("incident %s did not terminate within poll budget" % incident_id)
def _dispatch_impl(fctx):
# 1. Pull a fresh tire-fire report from the registry.
print("[dispatch] phoning tire-registry for the next incoming report…")
lot = fctx.actions.http("GET", TIRE_REGISTRY + "/lots/next")["body"]
priority = _priority_for(lot)
print("[dispatch] lot %s at %d kg, condition '%s' — priority %s." % (
lot["lot_id"], lot["weight_kg"], lot["condition"], priority,
))
# 2. Dispatch with retry. Transient failures (crews_unavailable,
# dispatch_service_down) re-roll the dispatch on a fresh
# incident id; a terminal `fire_escalated` aborts with partial
# success (the lot will show up in another run).
last_body = None
for attempt in range(1, MAX_ATTEMPTS + 1):
body = _open_and_poll(fctx, lot, priority, attempt)
last_body = body
status = body["status"]
if status == "contained":
print("[dispatch] lot %s contained on attempt %d." % (lot["lot_id"], attempt))
return {"lot_id": lot["lot_id"], "status": "contained", "attempts": attempt, "detail": body}
# fire-dispatch puts the specific failure mode in `error_kind`
# (crews_unavailable, fire_escalated, dispatch_service_down)
# and marks retryable transients with `retryable: true`.
reason = body.get("error_kind", status)
if not body.get("retryable", False):
print("[dispatch] lot %s terminally lost: %s." % (lot["lot_id"], reason))
return {"lot_id": lot["lot_id"], "status": status, "attempts": attempt, "detail": body}
print("[dispatch] attempt %d failed (%s); retrying." % (attempt, reason))
fctx.actions.sleep(_fib(attempt))
# Retry budget exhausted. This is an expected business outcome
# (upstream is designedly flaky) not a flow bug, so return DONE
# with an explicit abandonment status instead of fail()-ing —
# FAILED + a Starlark traceback reads as an internal error.
print("[dispatch] lot %s abandoned after %d attempts; last attempt: %s." % (
lot["lot_id"], MAX_ATTEMPTS,
(last_body or {}).get("error_kind", "unknown"),
))
return {
"lot_id": lot["lot_id"],
"status": "retries_exhausted",
"attempts": MAX_ATTEMPTS,
"detail": last_body,
}
dispatch = flow(implementation = _dispatch_impl)