Mark McDonald

    Mark McDonald

    2 years ago
    Hi - I've been testing out the s3Result object in my flows and I generally find it amazing - nice work! However, I have one flow we're I'm struggling with it. I'm specifying my s3Result at a flow level. I'm using a control flow "case" task and downstream tasks are failing because the downstream task is trying to retrieve the result from s3 for this case task and there is nothing available. How is this supposed to work? Is the case task supposed to return a result which then gets written to s3? I assume it should.
    the schematic gives a good view of what's happening
    Dylan

    Dylan

    2 years ago
    Hi @Mark McDonald, Do you have a reproducible example? Case statements are new-ish and I’d like to make sure this isn’t a bug
    As far as I know, this should work the way you intend it to
    Mark McDonald

    Mark McDonald

    2 years ago
    ya, let me simplify my flow and share
    Dylan

    Dylan

    2 years ago
    Thank you!
    Mark McDonald

    Mark McDonald

    2 years ago
    @Dylan
    from datetime import timedelta
    import os
    
    from prefect.engine.results import S3Result
    from prefect import task, Flow, Parameter
    from prefect.tasks.control_flow import case
    from prefect.engine.result import NoResult
    
    TABLES = [
        "table1",
        "table2",
        "table3",
        "table4",
        "table5",
    ]
    
    @task(max_retries=2, retry_delay=timedelta(seconds=60))
    def get_params(env):
        if env == "staging":
            os.environ["ENVAR_FOR_DB"] = "staging_envar"
        if env == "production":
            os.environ["ENVAR_FOR_DB"] = "production_envar"
        return NoResult
    
    
    @task(max_retries=2, retry_delay=timedelta(seconds=60))
    def download_from_s3(table):
        print(table)
    
    
    @task(max_retries=2, retry_delay=timedelta(seconds=60))
    def extract_postgres(table):
        print(table)
    
    with Flow("new_flow", result=S3Result(bucket="fake-bucket", location="/test/result/new_flow/{task_name}/{date:%Y-%m-%d}/{task_run_id}.prefect")) as flow:
        env = Parameter("environment", default="staging", required=True,)
        envars_dependency = get_params(env)
    
        with case(env, "production"):
            download_from_s3.map(TABLES).set_upstream(envars_dependency)
    
        with case(env, "staging"):
            extract_postgres.map(TABLES).set_upstream(envars_dependency)
    
    
    if __name__ == "__main__":
        from prefect.engine.executors import DaskExecutor
    
        executor = DaskExecutor(n_workers=1, threads_per_worker=3)
        flow.run(executor=executor, parameters={"environment": "staging"})
    Dylan

    Dylan

    2 years ago
    @Marvin open “S3 Result Fails to Read Downstream from Case Statement”
    Marvin

    Marvin

    2 years ago