Flowmetal

An introductory example

# Flowmetal uses package source qualified imports.
# The syntax of a load is `load(<source>, *to_import, **renames)`.
# The prelude is loaded for you, but we're being explicit for this example.
load("pkg.flowmetal.io/v1/prelude",
     "args",                  # Pattern for declaring task & workflow inputs
     "flow",                  # The `flow` constructor
     "FlowContext",           # The context type used for performing actions
     "current_flow",          # Helper for getting the fctx
     "Ok", "Err", "Result",   # Rust-style error handling
     "fail",                  # Err's the current task or workflow
     "unwrap",                # Unwraps a `Result`, `fail`ing if it's `Err`
)

# Flows are units of error handling.
# If the task suceeds, the return value of the task call is Ok(<return val>).
# If the task fails, then return value is Err(...).
# Tasks are, by default, blocking but can be used as futures.
analyze = flow(
    implementation = _analyze_impl,
)

# Implementation functions do ... whatever you need and return results.
# Flowmetal can do internal data processing, but is designed to coordinate
# any heavy lifting as external operations and track metadata within a flow.
def _analyze_impl(fctx: FlowContext) -> str:
    return "some results"

# Flows perform side-effects by invoking actions from the flow.
# Actions are things like creating or using resources, making requests &c.
# Wrap the `sleep` action so it's a bit nicer to use.
# This is in the prelude, but there's no magic.
def sleep(duration):
    current_flow().actions.sleep(duration)

# Flows are failable, but we can easily implement retrying.
# This is in the prelude, but there's no magic.
# We construct a helper function with default settings which will retry.
def retry(subflow, 
          should_retry = None,
          limit = 3,
          backoff = lambda count: 2 * count * count):
    def _inner(*args, 
               limit=limit,
               should_retry=should_retry,
               backoff=backoff,
               **kwargs):
        for count in range(limit):
            result = subflow()
            if is_ok(result):
                return result
               
            # We know we have an error. Should we give up?
            if should_retry:
                if not should_retry(result):
                    return result
                   
            # Apply backoff according to the strategy
            sleep(backoff(count))
            
        # We get here only by running out of tries
        return result
   
   return _inner

# This flow will fail!
def _check_impl(fctx: FlowContext) -> None:
    fail("Oh no!")

# But we can compose ourselves a flow that'll try to make it work.
# We can tell this will try three times and fail.
check_results = retry(flow(_check_impl))

# As this is an example, mock out ETL stage tasks
check = save = report = cleanup = analyze

def _etl_impl(fctx: FlowContext) -> Result:
    # Call the analyze task and wait for it to be evaluated & unwrap the result.
    # If the analyze task context failed, then unwrap() will propagate the error.
    # Otherwise it'll give us the return value of the task back.
    analyze_value = unwrap(analyze())

    # Here we're taking the result and we'll handle the error ourselves.
    # We happen to know that check will always fail.
    check_results = check(analyze_value)
    if is_ok(check_results):
        # Run these two in parallel
        save_task = fctx.actions.run(save, analyze_value)
        report_task = fcxt.actions.run(report, analyze_value)
        
        # Wait for their results
        results = fctx.actions.wait([save_task, report_task])
        
        # Cause a `fail()` if either errored
        [unwrap(it) for it in results]
        
        # Otherwise success!
        return Ok(None)

    # There may be cleanup to handle.
    # We could implement cleanup as a wrapper using function composition.
    # But keep it simple for now
    else:
        cleanup()
        return check_results  # Or we could fail with the error, either way

etl_workflow = flow(
    implementation = _etl_impl,
    args = {
        "input_data": args.string(default="input_source"),
    },
)

if __name__ == "__flow___":
    etl_workflow()