Kyle McChesney
10/20/2021, 3:14 PMupstream_tasks
, I have the following example flow:
with Flow(
'example',
executor=LocalExecutor(),
) as flow:
data_csv_url = Parameter('data_csv_url')
output_url = Parameter('output_url')
data = generate_data(data_csv_url)
data_with_stuff = generate_stuff.map(data)
data_with_stuff_written = write_data(data_with_stuff, output_url)
data_with_other_stuff = generate_other_stuff.map(
data_with_stuff,
upstream_tasks=(data_with_stuff_written,),
)
I read a csv file and generate some “data” objects (a list of them) for the given flow run. I think map them across another task, which mutates the data items to add things to them. I want to then write the state of the objects to a file for tracking and I want to do this BEFORE I make further modifications in`generate_other_stuff`. With this flow def, generate_other_stuff
fails with out any exception and does not seem to even run the task