Hi - I've been testing out the s3Result object in ...
# prefect-community
m
Hi - I've been testing out the s3Result object in my flows and I generally find it amazing - nice work! However, I have one flow we're I'm struggling with it. I'm specifying my s3Result at a flow level. I'm using a control flow "case" task and downstream tasks are failing because the downstream task is trying to retrieve the result from s3 for this case task and there is nothing available. How is this supposed to work? Is the case task supposed to return a result which then gets written to s3? I assume it should.
đź‘€ 1
the schematic gives a good view of what's happening
d
Hi @Mark McDonald, Do you have a reproducible example? Case statements are new-ish and I’d like to make sure this isn’t a bug
As far as I know, this should work the way you intend it to
m
ya, let me simplify my flow and share
d
Thank you!
m
@Dylan
Copy code
from datetime import timedelta
import os

from prefect.engine.results import S3Result
from prefect import task, Flow, Parameter
from prefect.tasks.control_flow import case
from prefect.engine.result import NoResult

TABLES = [
    "table1",
    "table2",
    "table3",
    "table4",
    "table5",
]

@task(max_retries=2, retry_delay=timedelta(seconds=60))
def get_params(env):
    if env == "staging":
        os.environ["ENVAR_FOR_DB"] = "staging_envar"
    if env == "production":
        os.environ["ENVAR_FOR_DB"] = "production_envar"
    return NoResult


@task(max_retries=2, retry_delay=timedelta(seconds=60))
def download_from_s3(table):
    print(table)


@task(max_retries=2, retry_delay=timedelta(seconds=60))
def extract_postgres(table):
    print(table)

with Flow("new_flow", result=S3Result(bucket="fake-bucket", location="/test/result/new_flow/{task_name}/{date:%Y-%m-%d}/{task_run_id}.prefect")) as flow:
    env = Parameter("environment", default="staging", required=True,)
    envars_dependency = get_params(env)

    with case(env, "production"):
        download_from_s3.map(TABLES).set_upstream(envars_dependency)

    with case(env, "staging"):
        extract_postgres.map(TABLES).set_upstream(envars_dependency)


if __name__ == "__main__":
    from prefect.engine.executors import DaskExecutor

    executor = DaskExecutor(n_workers=1, threads_per_worker=3)
    flow.run(executor=executor, parameters={"environment": "staging"})
d
@Marvin open “S3 Result Fails to Read Downstream from Case Statement”