# Flowmetal uses package source qualified imports.
# The syntax of a load is `load(<source>, *to_import, **renames)`.
# The prelude is loaded for you, but we're being explicit for this example.
load("pkg.flowmetal.io/v1/prelude",
"args", # Pattern for declaring task & workflow inputs
"flow", # The `flow` constructor
"FlowContext", # The context type used for performing actions
"current_flow", # Helper for getting the fctx
"Ok", "Err", "Result", # Rust-style error handling
"fail", # Err's the current task or workflow
"unwrap", # Unwraps a `Result`, `fail`ing if it's `Err`
)
# Flows are units of error handling.
# If the task suceeds, the return value of the task call is Ok(<return val>).
# If the task fails, then return value is Err(...).
# Tasks are, by default, blocking but can be used as futures.
analyze = flow(
implementation = _analyze_impl,
)
# Implementation functions do ... whatever you need and return results.
# Flowmetal can do internal data processing, but is designed to coordinate
# any heavy lifting as external operations and track metadata within a flow.
def _analyze_impl(fctx: FlowContext) -> str:
return "some results"
# Flows perform side-effects by invoking actions from the flow.
# Actions are things like creating or using resources, making requests &c.
# Wrap the `sleep` action so it's a bit nicer to use.
# This is in the prelude, but there's no magic.
def sleep(duration):
current_flow().actions.sleep(duration)
# Flows are failable, but we can easily implement retrying.
# This is in the prelude, but there's no magic.
# We construct a helper function with default settings which will retry.
def retry(subflow,
should_retry = None,
limit = 3,
backoff = lambda count: 2 * count * count):
def _inner(*args,
limit=limit,
should_retry=should_retry,
backoff=backoff,
**kwargs):
for count in range(limit):
result = subflow()
if is_ok(result):
return result
# We know we have an error. Should we give up?
if should_retry:
if not should_retry(result):
return result
# Apply backoff according to the strategy
sleep(backoff(count))
# We get here only by running out of tries
return result
return _inner
# This flow will fail!
def _check_impl(fctx: FlowContext) -> None:
fail("Oh no!")
# But we can compose ourselves a flow that'll try to make it work.
# We can tell this will try three times and fail.
check_results = retry(flow(_check_impl))
# As this is an example, mock out ETL stage tasks
check = save = report = cleanup = analyze
def _etl_impl(fctx: FlowContext) -> Result:
# Call the analyze task and wait for it to be evaluated & unwrap the result.
# If the analyze task context failed, then unwrap() will propagate the error.
# Otherwise it'll give us the return value of the task back.
analyze_value = unwrap(analyze())
# Here we're taking the result and we'll handle the error ourselves.
# We happen to know that check will always fail.
check_results = check(analyze_value)
if is_ok(check_results):
# Run these two in parallel
save_task = fctx.actions.run(save, analyze_value)
report_task = fcxt.actions.run(report, analyze_value)
# Wait for their results
results = fctx.actions.wait([save_task, report_task])
# Cause a `fail()` if either errored
[unwrap(it) for it in results]
# Otherwise success!
return Ok(None)
# There may be cleanup to handle.
# We could implement cleanup as a wrapper using function composition.
# But keep it simple for now
else:
cleanup()
return check_results # Or we could fail with the error, either way
etl_workflow = flow(
implementation = _etl_impl,
args = {
"input_data": args.string(default="input_source"),
},
)
if __name__ == "__flow___":
etl_workflow()