https://prefect.io logo
Title
m

Mark McDonald

08/06/2020, 8:14 PM
Hi - I've been testing out the s3Result object in my flows and I generally find it amazing - nice work! However, I have one flow we're I'm struggling with it. I'm specifying my s3Result at a flow level. I'm using a control flow "case" task and downstream tasks are failing because the downstream task is trying to retrieve the result from s3 for this case task and there is nothing available. How is this supposed to work? Is the case task supposed to return a result which then gets written to s3? I assume it should.
👀 1
the schematic gives a good view of what's happening
d

Dylan

08/06/2020, 8:39 PM
Hi @Mark McDonald, Do you have a reproducible example? Case statements are new-ish and I’d like to make sure this isn’t a bug
As far as I know, this should work the way you intend it to
m

Mark McDonald

08/06/2020, 8:40 PM
ya, let me simplify my flow and share
d

Dylan

08/06/2020, 8:40 PM
Thank you!
m

Mark McDonald

08/06/2020, 9:29 PM
@Dylan
from datetime import timedelta
import os

from prefect.engine.results import S3Result
from prefect import task, Flow, Parameter
from prefect.tasks.control_flow import case
from prefect.engine.result import NoResult

TABLES = [
    "table1",
    "table2",
    "table3",
    "table4",
    "table5",
]

@task(max_retries=2, retry_delay=timedelta(seconds=60))
def get_params(env):
    if env == "staging":
        os.environ["ENVAR_FOR_DB"] = "staging_envar"
    if env == "production":
        os.environ["ENVAR_FOR_DB"] = "production_envar"
    return NoResult


@task(max_retries=2, retry_delay=timedelta(seconds=60))
def download_from_s3(table):
    print(table)


@task(max_retries=2, retry_delay=timedelta(seconds=60))
def extract_postgres(table):
    print(table)

with Flow("new_flow", result=S3Result(bucket="fake-bucket", location="/test/result/new_flow/{task_name}/{date:%Y-%m-%d}/{task_run_id}.prefect")) as flow:
    env = Parameter("environment", default="staging", required=True,)
    envars_dependency = get_params(env)

    with case(env, "production"):
        download_from_s3.map(TABLES).set_upstream(envars_dependency)

    with case(env, "staging"):
        extract_postgres.map(TABLES).set_upstream(envars_dependency)


if __name__ == "__main__":
    from prefect.engine.executors import DaskExecutor

    executor = DaskExecutor(n_workers=1, threads_per_worker=3)
    flow.run(executor=executor, parameters={"environment": "staging"})
d

Dylan

08/06/2020, 9:45 PM
@Marvin open “S3 Result Fails to Read Downstream from Case Statement”