Hey guys, 2 questions on the `0.12.0` release: - ...
# prefect-community
Hey guys, 2 questions on the
release: • Has something changed with the FlowRunTask? We were using without any issues up until we upgraded to 0.12.0 , but now our flow fails looking for a project_name. I can see in the code this is specific to backed being cloud, but we set ‘prefect backend server’ before starting the server and haven’t had issues with it before.. • I had asked about a week ago about DFE for mapped tasks on Fargate, and was told it would be in 0.12.0, but doesn’t appear to be working for us - mapped tasks are still running sequentially…
Hi @Darragh: • It's been a few releases since anything has changed with the
, can you provide some more details/a min reproducible example? • I think there was a misunderstanding there - DFE is available with the
release as part of the
, switching to that will give you the parallelism you're looking for
Hi @nicholas thanks for clarifying - that's a bit of a bummer about the DFE , as I was very specific when asking about executing on Fargate. Unless I'm misunderstanding something about DaskExecytir - does it need dask infrastructure, or should I just be able to use the executor on Fargate and achieve DFE?
@Darragh sorry for the confusion there; you're still able to use the Fargate environment while also leveraging the Dask executor; just define the DaskExecutor instead of whichever you're currently using (or define it period if you're using the default)
My assumption, probably wrong, was that DaskExecytir needed to run on Dask 😁
Apologies @nicholas, using DaskExecutor got me DFE!! As for the FlowRunTask, still a bit stumped on that one . Here’s a representative sample:
Copy code
@task(trigger=triggers.all_successful,  max_retries=3, retry_delay=timedelta(seconds=60), log_stdout=True, result=LocalResult())
def run_collector(partition, bucket_name, collector_name):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Run Collector SubFlows")
    FlowRun = FlowRunTask()
    sub_flow_id = FlowRun.run(flow_name=collector_name,
                              parameters={"bucket_name": bucket_name, "input_location": partition["input"],
                                          "output_location":  partition["output"]})

    if not sub_flow_id:
        raise Exception("Flow run [%s] failed to start for input: %s", sub_flow_id, partition["input"])
        <http://logger.info|logger.info>("Started Collector Flow ID: %s", sub_flow_id)
        time.sleep(partition["id"] * 2)

    return sub_flow_id
And this fails with File “/usr/local/lib/python3.6/site-packages/prefect/tasks/prefect/flow_run.py”, line 71, in run raise ValueError(“Must provide a project name.“) ValueError: Must provide a project name.
FYI Dask has many modes of execution - it can schedule tasks to run in parallel across threads, processes, or a cluster of workers. Using the
without configuration is running on the dask distributed scheduler but using multiprocessing, so no cluster required
ah sorry to jump in but I think I know what the issue is w/ the `FlowRunTask`; one sec and I can recommend a fix
when you start your agent, start it with
Ok - I didn't have that previosuly and it was running fine, but I'll give it a go and report back shortly, thanks guys 👍 does the env bar take precedence over a manual
prefect backend server
or setting
backend = "server"
in the config.toml? Because I have both of those , but I'll test the env var now
yea env var always takes precedence; this actually is a small bug with the task, will fix for 0.12.1
@Chris White If I’m starting the agent programmatically rather than command line, what syntax do I need to use for making sure that var gets sent through? I ran the following:
export PREFECT__BACKEND=server
started my agent using:
agent = FargateAgent(ALL_THE_CONFIG)
But still have the same error :
Must provide a project name
If I add a project name to the params I then get a different error:
400 Client Error: Bad Request for url: <http://X.X.X.X:4200/graphql/graphql/alpha>
Copy code
agent = FargateAgent(ALL_THE_CONFIG, env_vars={"PREFECT__BACKEND": "server"})
should do the trick
Yep that one did it, thanks for the help guys!