Darragh
09/10/2020, 8:38 AMMarvin
09/10/2020, 8:38 AMDarragh
09/10/2020, 8:41 AMMarvin
09/10/2020, 8:42 AMDarragh
09/10/2020, 8:42 AMRobin
09/10/2020, 8:44 AMMarvin
09/10/2020, 8:44 AMMatt Wong-Kemp
09/10/2020, 9:01 AMAccess to fetch at '<https://api.prefect.io/graphql>' from origin '<https://cloud.prefect.io>' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource. If an opaque response serves your needs, set the request's mode to 'no-cors' to fetch the resource with CORS disabled.
Arsenii
09/10/2020, 9:38 AMfrom random import randrange
from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor
@task
def random_num(stop):
logger = prefect.context.get("logger")
number = randrange(stop)
time.sleep(5) # just so that we can tell by looking at logs
print(f"Your number is {number}")
return number
@task
def sum_numbers(numbers):
print(sum(numbers))
with Flow("parallel-execution") as flow:
stop = Parameter("stop")
number_1 = random_num(stop)
number_2 = random_num(stop)
number_3 = random_num(stop)
sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3])
Now, it does run the flow concurrently if I specify:
if __name__ == "__main__":
state = flow.run(parameters={"stop": 5}, DaskExecutor())
But it runs task sequentially if I set the executor before running, either with
flow.environment.executor = DaskExecutor(cluster_kwargs={"n_workers": 3})
or
with Flow(
"parallel-execution", environment=LocalEnvironment(executor=DaskExecutor())
) as flow:
The interesting thing is, if I print(flow.environment.executor)
in the last two cases, it does show DaskExecutor, but tasks are still being executed in order.
All of the above was done locally, but the problem persists even if I try to deploy it to Cloud. Any advice is very appreciated, I'm not experienced with Dask that muchsark
09/10/2020, 10:33 AMUnexpected error: ImageNotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create'>))
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/docker/api/client.py", line 261, in _raise_for_status
response.raise_for_status()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create>
Nelson
09/10/2020, 11:00 AMLocalExecutor
is not capable of parallelism” so I presume that is the issue https://docs.prefect.io/api/latest/engine/executors.html#executor but the other executors require a separate environment in Dask. Someone mentioned a “Fargate Executor” in this Slack but I assume that’s an error and this doesn’t exist from what I can find.
I see others using the Fargate Agent and the FargateTaskEnvironment so I wonder if that’s related. The Fargate Agent “deploy[s] flows as Tasks using AWS Fargate”. And “FargateTaskEnvironment is an environment which deploys your flow as a Fargate task”, so I don’t understand the use of it with the Fargate Agent? Also in your docs: “we recommend all users deploy their Flow using the LocalEnvironment configured with the appropriate choice of executor”.
How do we get tasks to run in parallel inside a Flow run in Fargate without Dask?
Also, @Marvin 🙂Marvin
09/10/2020, 11:00 AMJeff Brainerd
09/10/2020, 3:19 PMMarvin
09/10/2020, 3:19 PMirchan bani
09/10/2020, 3:58 PMRobin
09/10/2020, 4:02 PMirchan bani
09/10/2020, 4:02 PMMarvin
09/10/2020, 4:02 PMDavid Jin
09/10/2020, 4:06 PMDavid Jin
09/10/2020, 4:07 PMfabian wolfmann
09/10/2020, 4:08 PMMichael Reeves
09/10/2020, 4:16 PMMarvin
09/10/2020, 4:16 PMJu Lee
09/10/2020, 6:11 PMCharles Leung
09/10/2020, 6:14 PMMitchell Bregman
09/10/2020, 6:59 PMMarvin
09/10/2020, 6:59 PMChris Goddard
09/10/2020, 7:05 PMflow.run()
). The problem is, after a period of time, I start getting a persistent error - prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured
.
I'm a little confused because I though if you were just calling flow.run()
I wouldn't be interacting with the cloud API at all but clearly because it's in the same environment it's detecting the connection but then perhaps a token is becoming invalid or something? Any ideas?kevin
09/10/2020, 7:36 PMJohn Song
09/10/2020, 8:07 PM