https://prefect.io logo
c

Chris Jordan

12/03/2020, 3:58 PM
Hey, I've got a question that I haven't been able to find a clean answer to in the docs. I'd like to have a flow evaluate the results of one of its tasks and then conditionally spawn a new version of itslef. Something like:
Copy code
@task(name="get next batch of records")
def get_batch(result=PrefectResult()):
    ...
    return len(records)

with Flow("import_flow") as flow:
    num_of_records = get_batch()
    if num_of_records.read() > 0: # this particular syntax doesn't do it, and I'm asking for how to read this here (if I should be)
        kickoff_task = StartFlowRun(project_name="imports", flow_name="import_flow") # StartFlowRun also doesn't seem to spawn a new task - is this the right way to call this?
d

Dylan

12/03/2020, 5:34 PM
Hi @Chris Jordan! I believe you’re running into an issue with deferred results. Essentially,
num_of_records
is a Result at Flow definition time. At execution time, it returns an object that’s meant to be unwrapped in another Prefect Task. If you want to take conditional actions based on Task Results, check out: https://docs.prefect.io/api/latest/tasks/control_flow.html#control-flow-tasks
c

Chris Jordan

12/03/2020, 6:12 PM
thanks a bunch, I'll take a look
2 Views