Mark McDonald
08/06/2020, 8:14 PMDylan
08/06/2020, 8:39 PMMark McDonald
08/06/2020, 8:40 PMDylan
08/06/2020, 8:40 PMMark McDonald
08/06/2020, 9:29 PMfrom datetime import timedelta
import os
from prefect.engine.results import S3Result
from prefect import task, Flow, Parameter
from prefect.tasks.control_flow import case
from prefect.engine.result import NoResult
TABLES = [
"table1",
"table2",
"table3",
"table4",
"table5",
]
@task(max_retries=2, retry_delay=timedelta(seconds=60))
def get_params(env):
if env == "staging":
os.environ["ENVAR_FOR_DB"] = "staging_envar"
if env == "production":
os.environ["ENVAR_FOR_DB"] = "production_envar"
return NoResult
@task(max_retries=2, retry_delay=timedelta(seconds=60))
def download_from_s3(table):
print(table)
@task(max_retries=2, retry_delay=timedelta(seconds=60))
def extract_postgres(table):
print(table)
with Flow("new_flow", result=S3Result(bucket="fake-bucket", location="/test/result/new_flow/{task_name}/{date:%Y-%m-%d}/{task_run_id}.prefect")) as flow:
env = Parameter("environment", default="staging", required=True,)
envars_dependency = get_params(env)
with case(env, "production"):
download_from_s3.map(TABLES).set_upstream(envars_dependency)
with case(env, "staging"):
extract_postgres.map(TABLES).set_upstream(envars_dependency)
if __name__ == "__main__":
from prefect.engine.executors import DaskExecutor
executor = DaskExecutor(n_workers=1, threads_per_worker=3)
flow.run(executor=executor, parameters={"environment": "staging"})
Dylan
08/06/2020, 9:45 PMMarvin
08/06/2020, 9:45 PM