k
sample.py
Why are the tasks in blue showing up twice? The sample snippet is attached above
n
Hi @Kayvan Shah - I believe that's happening because you're referencing the return values of those tasks as constants (
spath_config['reference_urls_path']
and
dpath_config['dummy_clean_csv']
); when your flow is registered, Prefect has no knowledge of those values and so creates constant tasks as intermediaries to hold those
k
So how to avoid it?
n
You could modify your code slightly like this:
Copy code
@task(log_stdout=True)
def read_data_from_bucket(bucket,path):
    df = read_csv_from_buffer_bucket(bucket['reference_urls_path'],rel_path=path)
    return df

@task(log_stdout=True)
def push_data_to_bucket(bucket,dataframe,path):
    push_csv_to_buffer_bucket(bucket['dummy_clean_csv'],dataframe,rel_path=path)

...

with Flow("gcp_demo_dag") as flow:
    spath_config = load_scraper_path_config()
    dpath_config = load_dummy_path_config()
    BUCKET = make_bucket_connection()

    data = read_data_from_bucket(BUCKET,spath_config)
    clean_data = clean_data(data,upstream_tasks=[data])
    push_data_to_bucket(BUCKET,clean_data,dpath_config,upstream_tasks=[clean_data])
Notice that I've moved the references into the tasks themselves and out of the flow context