John Shearer
09/02/2021, 4:55 PMcondition = True
resultType = LocalResult if condition else S3Result
@task()
def create_list():
return [1,2,3]
@task(result=resultType(serializer=PandasSerializer(file_type='parquet')))
def to_df(a_list):
return pd.DataFrame({'col1': a_list})
with Flow(
name="my-flow",
result=resultType(serializer=PickleSerializer())
) as my_flow:
my_list = create_list()
my_df = to_df(my_list)
Dustin Ngo
09/02/2021, 5:28 PMenvironment = 'DEV'
task_config = {
'DEV': LocalResult(serializer=PandasSerializer(file_type='parquet')),
'PROD': S3Result(serializer=PandasSerializer(file_type='parquet')),
}
@task()
def create_list():
return [1,2,3]
@task(result=task_config[environment])
def to_df(a_list):
return pd.DataFrame({'col1': a_list})
with Flow(
name="my-flow",
result=resultType(serializer=PickleSerializer())
) as my_flow:
my_list = create_list()
my_df = to_df(my_list)
Of course, you risk issues arising in your flows that can happen outside of Prefect by handling multiple result types, so if it's possible in the future to unify them and put as much of the logic within flows and tasks as possible, you might have a better experience!John Shearer
09/02/2021, 5:51 PM