Lee Mendelowitz
05/25/2022, 12:47 PMRobin Weiß
05/25/2022, 12:53 PMNaga Sravika Bodapati
05/25/2022, 1:13 PMFlorian Guily
05/25/2022, 1:14 PMlocal dask executor
in this cluster or if there is an aditionnal configuration to use dask as an executor in a k8's cluster ?Andrew Lawlor
05/25/2022, 1:47 PMConstantino Schillebeeckx
05/25/2022, 1:58 PMsuccess
though. why wouldn't this flow's overall state be set to failed
?Anna Geller
05/25/2022, 3:11 PMXavier Babu
05/25/2022, 4:34 PMwill milner
05/25/2022, 5:18 PMJohn Kang
05/25/2022, 6:04 PMhelper_script
argument. Does anyone have any idea how I would write that script up?Kyle Pierce
05/25/2022, 6:53 PMFlorian Kühnlenz
05/25/2022, 7:06 PMJohn O'Farrell
05/25/2022, 7:31 PMprefect.Docker:Building the flow's Docker storage...
after creating a new temp directory with the dockerfile and healthcheck.py
Jared Noynaert
05/25/2022, 7:34 PMNikhil Jain
05/26/2022, 12:08 AMECSRun
as follows:
ecs_run = ECSRun(
run_task_kwargs={'cluster': 'dev-prefect-cluster'},
task_role_arn='arn:aws:iam::<>:role/dev-task-runner-ecs-task-role',
execution_role_arn='arn:aws:iam::<>:role/dev-task-runner-ecs-task-execution-role',
image='<>.<http://dkr.ecr.us-west-1.amazonaws.com/dev-automation-scripts-ecr:latest|dkr.ecr.us-west-1.amazonaws.com/dev-automation-scripts-ecr:latest>',
labels=['ecs', 'dev']
)
flow.run_config = ecs_run
...
And then in my terraform config for the prefect-agent
, I am setting a task_definition
. And this task_definition_file
is stored in s3 and contains a list of env
parameters and secrets
that are pulled from SSM parameter store.
What’s happening is that when the flow-task is created, it is not getting any of the env
and secrets
that I supply in the task_definition_file. I can kinda see why this is happening: I am not supplying any task_definition in the ECSRun
config during flow registration. The reason is 2-fold: I don’t want to hard-code these env/SSM parameters in python code, they are supposed to live in terraform configs. And also, we want to share these env and ssm parameters across many tasks, so don’t want to have to define them everytime.
Is there a way to accomplish this?Alex de Geofroy
05/26/2022, 1:13 AMThomas Henn
05/26/2022, 5:25 AMDharit Sura
05/26/2022, 5:55 AMDharit Sura
05/26/2022, 5:56 AMSumant Agnihotri
05/26/2022, 6:16 AMAPI Tokens
option is disabled. How do I connect to my server, to register Flows, etc?Samuel Hinton
05/26/2022, 6:52 AMKayvan Shah
05/26/2022, 7:36 AMTraceback (most recent call last):
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 55, in spire_current_weather
sample = random.choice(conf)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/random.py", line 346, in choice
return seq[self._randbelow(len(seq))]
TypeError: object of type 'PrefectFuture' has no len()
using random.choice() to select randomly from a listVadym Dytyniak
05/26/2022, 8:47 AMAbokor Ahmed
05/26/2022, 8:50 AMShrikkanth
05/26/2022, 9:04 AM<http://logger.info|logger.info>(lines)
contains the error message. Any suggestions will be helpful, thanks.Sanjay Patel
05/26/2022, 10:32 AMoutput = flow.run(executor=DaskExecutor(address=XXX, client_kwargs=XXX))
however the flow contains a task that actually has a prefect flow embedded in it. An unknown number of parallel tasks that the primary task prepares (lets say 15 of them) and then tries to execute. I would like the 15 to be done in parallel managed by dask. I don't want to create a new dask cluster from within the primary task (not even sure if that would work) but instead want to execute the flow in the same cluster.
The following line works but i'm not sure it's going to utilize all the available workers and may only execute on the worker that initiates the flow. Any guidance on how I should be running the flow2 which is prepared and called the by the primary task in 'flow' above?
output2 = flow2.run()
I believe i'll need to use something like this if i was using pure dask - https://distributed.dask.org/en/stable/task-launch.html but i'm not sure the prefect method which will achieve this and would like to keep the additional features that prefect offers.
Thanks so much in advance!Stéphan Taljaard
05/26/2022, 10:45 AMimport os
from prefect.client import Secret
secret_name = 'JSON_SECRET'
# os.environ[
# f"PREFECT__CONTEXT__SECRETS__{secret_name}"
# ] = '{"db1": {"password": "password", "user": "userA"}, "db2": {"password": "P@$$w0rd", "user": "userB"}}'
secret_value = Secret(secret_name).get()
print(secret_value['db2']['password'])
# This prints 'P@$w0rd', not 'P@$$w0rd'
Kayvan Shah
05/26/2022, 11:06 AM2022-05-26 11:04:12 | [INFO]:numexpr.utils - NumExpr defaulting to 2 threads.
Traceback (most recent call last):
File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 65, in <module>
spire_current_weather()
File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 62, in spire_current_weather
spire_pipe(obj, sample)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/flows.py", line 319, in __call__
return enter_flow_run_engine_from_flow_call(self, parameters)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 110, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 158, in create_then_begin_flow_run
flow_run = await client.create_flow_run(
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 538, in create_flow_run
response = await <http://self._client.post|self._client.post>("/flow_runs/", json=flow_run_create_json)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_client.py", line 1820, in post
return await self.request(
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_client.py", line 1506, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_models.py", line 1510, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Server error '500 Internal Server Error' for url '<https://api-beta.prefect.io/api/accounts/2693bb0e-7537-4327-83f2-06354b5d2b4d/workspaces/d8a21cf4-12f7-45f8-9323-5fe529e45148/flow_runs/>'
For more information check: <https://httpstatuses.com/500>
While running flow in flowSULEMAN KHAN
05/26/2022, 12:10 PMScheduled Flow
fails because task is unable to return value when Prefect Server
runs the flow with CloudFlowRunner
. Same Flow runs successfully when I use flow.run()
. I have attached the code, Prefect UI screenshot, and terminal output of flow.run().
if-else block at lineno 920 in task_runner.py
is causing issue. If I return a value from the function/task
it runs the if-block otherwise else block. line self.result.write(value, **formatting_kwargs)
is calling the function with format self.
and dictionary formatting_kwargs
also contains the self
variable, this self variable contains the object of Demo
class.Rei Mendel
05/26/2022, 12:29 PMparameters = ['~/folder1', '~/folder2', '~/folder3']
create_flow_run.map(flow_name="List dir", parameters={"folder_path": ???})
Rei Mendel
05/26/2022, 12:29 PMparameters = ['~/folder1', '~/folder2', '~/folder3']
create_flow_run.map(flow_name="List dir", parameters={"folder_path": ???})
Kevin Kho
05/26/2022, 2:40 PM[{"folder_path": '~/folder1'}, {"folder_path": '~/folder2'}]