Hey, I've got a question that I haven't been able to find a clean answer to in the docs. I'd like to have a flow evaluate the results of one of its tasks and then conditionally spawn a new version of itslef. Something like:
Copy code
@task(name="get next batch of records")
def get_batch(result=PrefectResult()):
...
return len(records)
with Flow("import_flow") as flow:
num_of_records = get_batch()
if num_of_records.read() > 0: # this particular syntax doesn't do it, and I'm asking for how to read this here (if I should be)
kickoff_task = StartFlowRun(project_name="imports", flow_name="import_flow") # StartFlowRun also doesn't seem to spawn a new task - is this the right way to call this?
d
Dylan
12/03/2020, 5:34 PM
Hi @Chris Jordan!
I believe you’re running into an issue with deferred results. Essentially,
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.