Kayvan Shah
04/21/2021, 4:35 PMKayvan Shah
04/21/2021, 4:35 PMKayvan Shah
04/21/2021, 4:37 PMnicholas
spath_config['reference_urls_path']
and dpath_config['dummy_clean_csv']
); when your flow is registered, Prefect has no knowledge of those values and so creates constant tasks as intermediaries to hold thoseKayvan Shah
04/21/2021, 4:45 PMnicholas
@task(log_stdout=True)
def read_data_from_bucket(bucket,path):
df = read_csv_from_buffer_bucket(bucket['reference_urls_path'],rel_path=path)
return df
@task(log_stdout=True)
def push_data_to_bucket(bucket,dataframe,path):
push_csv_to_buffer_bucket(bucket['dummy_clean_csv'],dataframe,rel_path=path)
...
with Flow("gcp_demo_dag") as flow:
spath_config = load_scraper_path_config()
dpath_config = load_dummy_path_config()
BUCKET = make_bucket_connection()
data = read_data_from_bucket(BUCKET,spath_config)
clean_data = clean_data(data,upstream_tasks=[data])
push_data_to_bucket(BUCKET,clean_data,dpath_config,upstream_tasks=[clean_data])
Notice that I've moved the references into the tasks themselves and out of the flow context