Hello everyone, I have been running into an issue ...
# ask-community
n
Hello everyone, I have been running into an issue where a task downstream of a mapped task starts once the mapped task enters a mapped state but does not actually run. here is the flow
Copy code
with Flow('mapped_flow') as flow:
    aws_list = aws_list_files(AWS_CREDENTIALS)
    file = aws_download.map(
        key = aws_list, 
        credentials =  unmapped(AWS_CREDENTIALS),
        bucket = unmapped('some_bucket'),
        upstream_tasks=[unmapped(aws_list)])
    format_df = format_data.map(file, aws_list, upstream_tasks=[unmapped(file)])
    move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS), upstream_tasks=[unmapped(format_df)])
    load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_task=[move_to_cloud])
the problem is that the load_files_gcs_bq starts once all the mapped tasks enter a mapped state but none have actually run. I have been reading the mapping docs but I feel like I am missing something? any help would be greatly appreciated
k
Hey @Nick Coy, will took at this more but is it
upstream_task=[move_to_cloud]
in your code? Might have to be
upstream_tasks
? I think you can do away with specifying the upstream tasks. It’s automatically set when the input is used downstream like this
Copy code
with Flow('mapped_flow') as flow:
    aws_list = aws_list_files(AWS_CREDENTIALS)
    file = aws_download.map(
        key = aws_list, 
        credentials =  unmapped(AWS_CREDENTIALS),
        bucket = unmapped('some_bucket')
        )
    format_df = format_data.map(file, aws_list)
    move_to_cloud = move_to_cloud_storage.map(format_df, aws_list, unmapped(GCP_CREDENTIALS))
    load_job = load_files_gcs_bq(GCP_CREDENTIALS, upstream_tasks=[move_to_cloud])
It would error out though if it were
upstream_task
unless
load_files_gcs_bq
takes kwargs?
n
@Kevin Kho oh I actually just copied that incorrectly, I have
upstream_tasks=[move_to_cloud]
in my code. So I can remove the
upstream_tasks
for tasks that take an input from an upstream task. But
load_files_gcs_bq
does not take inputs and that is what is causing the issue. That task fires before the other mapped tasks actually run
k
It looks right. What executor are you using? Is this when doing
flow.run()
or a run with a backend (Cloud or Server)? Just being super sure, was this something you changed and re-registered? What storage are you using also?
n
I believe its the local executor. When I do flow.run() locally it works as expected. When I run from backend cloud, all the mapped tasks enter a mapped state but have not run yet.
k
Can you try
load_files_gcs_bq(GCP_CREDENTIALS, upstream_tasks=[unmapped(move_to_cloud)])
?
Will try with a backend tomorrow
🙂 1
n
yes, I will try that. thank you
@Kevin Kho that worked after making that change and re-registering the flow and running from the could ui
thank you!
k
I created a flow with the same structure and ran with a backend and it gets respected with
upstream_tasks=[move_to_cloud]
Well….at least that’s working. 😅
n
lol, yea! glad it worked