Kayvan Shah
04/21/2021, 4:35 PMKayvan Shah
04/21/2021, 4:35 PMKayvan Shah
04/21/2021, 4:37 PMnicholas
spath_config['reference_urls_path']
and dpath_config['dummy_clean_csv']
); when your flow is registered, Prefect has no knowledge of those values and so creates constant tasks as intermediaries to hold thoseKayvan Shah
04/21/2021, 4:45 PMnicholas
@task(log_stdout=True)
def read_data_from_bucket(bucket,path):
df = read_csv_from_buffer_bucket(bucket['reference_urls_path'],rel_path=path)
return df
@task(log_stdout=True)
def push_data_to_bucket(bucket,dataframe,path):
push_csv_to_buffer_bucket(bucket['dummy_clean_csv'],dataframe,rel_path=path)
...
with Flow("gcp_demo_dag") as flow:
spath_config = load_scraper_path_config()
dpath_config = load_dummy_path_config()
BUCKET = make_bucket_connection()
data = read_data_from_bucket(BUCKET,spath_config)
clean_data = clean_data(data,upstream_tasks=[data])
push_data_to_bucket(BUCKET,clean_data,dpath_config,upstream_tasks=[clean_data])
Notice that I've moved the references into the tasks themselves and out of the flow contextBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by