https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jean-Michel Provencher

03/18/2022, 7:05 PM
Hi, do you guys know if it’s possible to pass
upstream_tasks
with methods that actually requires parameter? The documentation is not really clear regarding how to chain multiple upstream_tasks to create dependencies between them and I was wondering if some of you had some more complex examples. For example, I don’t think I can do this
Copy code
with Flow(f"{environment_prefix}-test", storage=S3(bucket=storage_location_bucket_name)) as flow:


    dbt_run(organization_id_param, data_processing_start_date_param, data_processing_end_date_param, should_process_last_period, period, period_value,
            upstream_tasks=[pull_snowflake_secret(a,b), pull_repo(b,c)])
e

emre

03/18/2022, 7:35 PM
This should be working. Both
pull_snowflake_secret
and
pull_repo
receive parameters, and possibly return some values. By putting them inside
upstream_tasks
you are simply stating that
dbt_run
shouldn't use the outputs of tasks
pull_snowflake_secret
or
pull_repo
for anything. It should simply wait for its upstream tasks to finish before it starts itself. Honestly, I don't think I fully understand your issue 😅. Could you explain further?
k

Kevin Kho

03/18/2022, 8:00 PM
I think this should worth, but the preferred would be:
Copy code
with Flow(...) as flow:
    snowflake = pull_snowflake_secrete(a,b)
    dbt_run(, upstream_tasks=[snowflake])
🙌 1
3 Views