https://prefect.io logo
Title
d

Darragh

06/19/2020, 6:21 PM
Hey guys, 2 questions on the
0.12.0
release: • Has something changed with the FlowRunTask? We were using without any issues up until we upgraded to 0.12.0 , but now our flow fails looking for a project_name. I can see in the code this is specific to backed being cloud, but we set ‘prefect backend server’ before starting the server and haven’t had issues with it before.. • I had asked about a week ago about DFE for mapped tasks on Fargate, and was told it would be in 0.12.0, but doesn’t appear to be working for us - mapped tasks are still running sequentially…
n

nicholas

06/19/2020, 6:29 PM
Hi @Darragh: • It's been a few releases since anything has changed with the
FlowRunTask
, can you provide some more details/a min reproducible example? • I think there was a misunderstanding there - DFE is available with the
0.12.0
release as part of the
DaskExecutor
, switching to that will give you the parallelism you're looking for
d

Darragh

06/19/2020, 6:42 PM
Hi @nicholas thanks for clarifying - that's a bit of a bummer about the DFE , as I was very specific when asking about executing on Fargate. Unless I'm misunderstanding something about DaskExecytir - does it need dask infrastructure, or should I just be able to use the executor on Fargate and achieve DFE?
n

nicholas

06/19/2020, 7:00 PM
@Darragh sorry for the confusion there; you're still able to use the Fargate environment while also leveraging the Dask executor; just define the DaskExecutor instead of whichever you're currently using (or define it period if you're using the default)
d

Darragh

06/19/2020, 7:01 PM
My assumption, probably wrong, was that DaskExecytir needed to run on Dask 😁
Apologies @nicholas, using DaskExecutor got me DFE!! As for the FlowRunTask, still a bit stumped on that one . Here’s a representative sample:
@task(trigger=triggers.all_successful,  max_retries=3, retry_delay=timedelta(seconds=60), log_stdout=True, result=LocalResult())
def run_collector(partition, bucket_name, collector_name):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Run Collector SubFlows")
    FlowRun = FlowRunTask()
    sub_flow_id = FlowRun.run(flow_name=collector_name,
                              parameters={"bucket_name": bucket_name, "input_location": partition["input"],
                                          "output_location":  partition["output"]})

    if not sub_flow_id:
        raise Exception("Flow run [%s] failed to start for input: %s", sub_flow_id, partition["input"])
    else:
        <http://logger.info|logger.info>("Started Collector Flow ID: %s", sub_flow_id)
        time.sleep(partition["id"] * 2)

    return sub_flow_id
And this fails with File “/usr/local/lib/python3.6/site-packages/prefect/tasks/prefect/flow_run.py”, line 71, in run raise ValueError(“Must provide a project name.“) ValueError: Must provide a project name.
c

Chris White

06/19/2020, 7:42 PM
FYI Dask has many modes of execution - it can schedule tasks to run in parallel across threads, processes, or a cluster of workers. Using the
DaskExecutor
without configuration is running on the dask distributed scheduler but using multiprocessing, so no cluster required
ah sorry to jump in but I think I know what the issue is w/ the `FlowRunTask`; one sec and I can recommend a fix
when you start your agent, start it with
-e PREFECT__BACKEND=server
d

Darragh

06/19/2020, 7:54 PM
Ok - I didn't have that previosuly and it was running fine, but I'll give it a go and report back shortly, thanks guys 👍 does the env bar take precedence over a manual
prefect backend server
or setting
backend = "server"
in the config.toml? Because I have both of those , but I'll test the env var now
c

Chris White

06/19/2020, 7:55 PM
yea env var always takes precedence; this actually is a small bug with the task, will fix for 0.12.1
d

Darragh

06/19/2020, 9:05 PM
@Chris White If I’m starting the agent programmatically rather than command line, what syntax do I need to use for making sure that var gets sent through? I ran the following:
export PREFECT__BACKEND=server
started my agent using:
agent = FargateAgent(ALL_THE_CONFIG)
agent.start()
But still have the same error :
Must provide a project name
If I add a project name to the params I then get a different error:
400 Client Error: Bad Request for url: <http://X.X.X.X:4200/graphql/graphql/alpha>
c

Chris White

06/19/2020, 9:11 PM
agent = FargateAgent(ALL_THE_CONFIG, env_vars={"PREFECT__BACKEND": "server"})
should do the trick
d

Darragh

06/20/2020, 9:54 AM
Yep that one did it, thanks for the help guys!