Nick Coy
07/28/2021, 1:25 AMwith Flow('mapped_flow') as flow:
aws_list = aws_list_files(AWS_CREDENTIALS)
file = aws_download.map(
key = aws_list,
credentials = unmapped(AWS_CREDENTIALS),
bucket = unmapped('some_bucket'),
upstream_tasks=[unmapped(aws_list)])
format_df = format_data.map(file, aws_list, upstream_tasks=[unmapped(file)])
move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS), upstream_tasks=[unmapped(format_df)])
load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_task=[move_to_cloud])
the problem is that the load_files_gcs_bq starts once all the mapped tasks enter a mapped state but none have actually run. I have been reading the mapping docs but I feel like I am missing something? any help would be greatly appreciatedKevin Kho
upstream_task=[move_to_cloud]
in your code? Might have to be upstream_tasks
?
I think you can do away with specifying the upstream tasks. It’s automatically set when the input is used downstream like this
with Flow('mapped_flow') as flow:
aws_list = aws_list_files(AWS_CREDENTIALS)
file = aws_download.map(
key = aws_list,
credentials = unmapped(AWS_CREDENTIALS),
bucket = unmapped('some_bucket')
)
format_df = format_data.map(file, aws_list)
move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS))
load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_tasks=[move_to_cloud])
Kevin Kho
upstream_task
unless load_files_gcs_bq
takes kwargs?Nick Coy
07/28/2021, 3:32 AMupstream_tasks=[move_to_cloud]
in my code. So I can remove the upstream_tasks
for tasks that take an input from an upstream task. But load_files_gcs_bq
does not take inputs and that is what is causing the issue. That task fires before the other mapped tasks actually runKevin Kho
flow.run()
or a run with a backend (Cloud or Server)? Just being super sure, was this something you changed and re-registered?
What storage are you using also?Nick Coy
07/28/2021, 3:39 AMKevin Kho
load_files_gcs_bq(GCP_CREDENTIALS, upstream_tasks=[unmapped(move_to_cloud)])
?Kevin Kho
Nick Coy
07/28/2021, 3:40 AMNick Coy
07/28/2021, 3:47 AMNick Coy
07/28/2021, 3:47 AMKevin Kho
upstream_tasks=[move_to_cloud]
Kevin Kho
Nick Coy
07/28/2021, 3:48 AM