Jean-Michel Provencher

    Jean-Michel Provencher

    6 months ago
    Hi, do you guys know if it’s possible to pass
    upstream_tasks
    with methods that actually requires parameter? The documentation is not really clear regarding how to chain multiple upstream_tasks to create dependencies between them and I was wondering if some of you had some more complex examples. For example, I don’t think I can do this
    with Flow(f"{environment_prefix}-test", storage=S3(bucket=storage_location_bucket_name)) as flow:
    
    
        dbt_run(organization_id_param, data_processing_start_date_param, data_processing_end_date_param, should_process_last_period, period, period_value,
                upstream_tasks=[pull_snowflake_secret(a,b), pull_repo(b,c)])
    emre

    emre

    6 months ago
    This should be working. Both
    pull_snowflake_secret
    and
    pull_repo
    receive parameters, and possibly return some values. By putting them inside
    upstream_tasks
    you are simply stating that
    dbt_run
    shouldn't use the outputs of tasks
    pull_snowflake_secret
    or
    pull_repo
    for anything. It should simply wait for its upstream tasks to finish before it starts itself. Honestly, I don't think I fully understand your issue 😅. Could you explain further?
    Kevin Kho

    Kevin Kho

    6 months ago
    I think this should worth, but the preferred would be:
    with Flow(...) as flow:
        snowflake = pull_snowflake_secrete(a,b)
        dbt_run(, upstream_tasks=[snowflake])