Kevin Weiler
01/04/2023, 10:22 PMadd_task
method from the flow
class. I don’t think the new API has this. Is there a way to iteratively add tasks to a flow?Gabriela Palacios
01/05/2023, 4:27 AMHamza Naanani
01/05/2023, 8:30 AMTorstein Molland
01/05/2023, 9:34 AMfrom prefect import flow, get_run_logger, task
@task
async def async_task():
return "async_task"
@task
def synchronous_task():
return "synchronous_task"
@flow
async def my_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing async task:")
async_result = await async_task()
<http://logger.info|logger.info>(f"Result of async task was {async_result}")
<http://logger.info|logger.info>("Executing synchronous task")
synchronous_result = synchronous_task()
<http://logger.info|logger.info>(f"Result of synchronous task was {synchronous_result}")
Marion Sauvage
01/05/2023, 10:17 AMError occured when trying to create new work queue
in a 405 response for a request to
<http://xx.xx.xx.xx:4200/xx.xx.xx.xx:4200/api/work_queues/>
We use this Dockerfile to run the server:
FROM python:3.9.16
RUN pip install prefect prefect[aws]
RUN prefect config set PREFECT_API_URL=http:/xx.xx.xx.xx:4200/api
CMD ["prefect", "orion", "start", "--host", "0.0.0.0"]
Thanks a lot for your help !Aniruddha Bharadwaj
01/05/2023, 10:34 AMDanilo Drobac
01/05/2023, 3:45 PMJean-Michel Provencher
01/05/2023, 4:11 PMTim-Oliver
01/05/2023, 4:36 PMThet Naing
01/05/2023, 5:08 PMThet Naing
01/05/2023, 5:08 PMAnders Segerberg
01/05/2023, 5:46 PMwait_for_flow_run
several times.
I want this parent flow's success to depend on the success of all child flows.
I've tried setting upstream_tasks
to the result of wait_for_flow_run
, but I realize that that's a FlowRunView
object, not a Task
.
From the child flow, I can get_tasks
, and set upstream_tasks
to that. But I have some tasks I allow to fail in the child flows (they are not reference tasks of the child flow.)
What I'd really like to do is to be able to reference <child_flow>.state
, and set the parent flow's reference tasks to be expecting a
<Success: "All reference tasks succeeded.">
Is there a built-in way of going about this, instead of just manually inspecting the child flow results, and raising the appropriate SIGNAL ?Tuoyi Zhao
01/05/2023, 6:33 PMTrevor Kramer
01/05/2023, 7:09 PMElliott Wilson
01/05/2023, 7:42 PMFlow could not be retrieved from deployment.
from boto3 when I try and run the deployment into the ec2 instance. I can deploy to s3 locally and connect to bucket from the ec2 using the AWS cli. Please can anyone help me debug this?Trevor Kramer
01/05/2023, 7:58 PMTuoyi Zhao
01/05/2023, 8:35 PMTuoyi Zhao
01/05/2023, 8:37 PMJean-Michel Provencher
01/05/2023, 9:38 PMalex
01/05/2023, 10:32 PMget_run_context().flow.name
from the flow context but am unable to find an equivalent on the task levelEdmondo Porcu
01/05/2023, 10:46 PMTuoyi Zhao
01/05/2023, 10:45 PMMiller Jiang
01/05/2023, 11:09 PMboto3.exceptions.S3UploadFailedError: Failed to upload. An error occurred (SignatureDoesNotMatch) when calling the PutObject operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.
Marcelo Santoro
01/06/2023, 1:43 AMsafe-to-evict: false
on my template file, but it is not working
<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: "false"
Does anyone knows how to prevent k8s scale down while a job is still running ?Santhosh Solomon (Fluffy)
01/06/2023, 2:31 AMfrom prefect import task, flow
from time import sleep
from prefect.task_runners import ConcurrentTaskRunner
@task(tags=['concurrent-task'])
def perf_task(input):
sleep(25)
print(input)
@flow(task_runner=ConcurrentTaskRunner)
def test_flow():
for i in range(100):
perf_task(i)
if __name__ == '__main__':
test_flow()
I have created a deployment with queue concurrent_test
and set the concurrency limit to 3. In my understanding this is responsible for the number of flow runs which can be executed concurrently.
after that I have created a concurrency limit for the tag i have included in the task.
prefect concurrency-limit create concurrent-task 5
My understanding of this setting is for number of tasks in which could be executed concurrently.
Once both the settings are done, I have started an agent with
prefect agent start -q concurrent_test
When I initiate the flow run, below is the execution log, in which it is very clear there is no concurrency (check the time stamp for every task execution)
Starting v2.7.6 agent with ephemeral API...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): concurrent_test...
07:54:51.280 | INFO | prefect.agent - Submitting flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
07:54:51.336 | INFO | prefect.infrastructure.process - Opening process 'strange-starling'...
07:54:51.348 | INFO | prefect.agent - Completed submission of flow run '9034dd75-b32e-4b39-8f48-eb03c55e3fe5'
/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
07:54:55.882 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-0' for task 'perf_task'
07:54:55.883 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-0' immediately...
07:54:55.909 | INFO | Task run 'perf_task-c7bf4036-0' - Finished in state Completed()
07:54:55.921 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-1' for task 'perf_task'
07:54:55.921 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-1' immediately...
07:54:55.945 | INFO | Task run 'perf_task-c7bf4036-1' - Finished in state Completed()
07:54:55.955 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-2' for task 'perf_task'
07:54:55.955 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-2' immediately...
07:54:55.979 | INFO | Task run 'perf_task-c7bf4036-2' - Finished in state Completed()
07:54:55.989 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-3' for task 'perf_task'
07:54:55.990 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-3' immediately...
07:54:56.012 | INFO | Task run 'perf_task-c7bf4036-3' - Finished in state Completed()
07:54:56.022 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-4' for task 'perf_task'
07:54:56.022 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-4' immediately...
07:54:56.045 | INFO | Task run 'perf_task-c7bf4036-4' - Finished in state Completed()
07:54:56.054 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-5' for task 'perf_task'
07:54:56.054 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-5' immediately...
07:54:56.077 | INFO | Task run 'perf_task-c7bf4036-5' - Finished in state Completed()
07:54:56.086 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-6' for task 'perf_task'
07:54:56.087 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-6' immediately...
07:54:56.109 | INFO | Task run 'perf_task-c7bf4036-6' - Finished in state Completed()
07:54:56.120 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-7' for task 'perf_task'
07:54:56.121 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-7' immediately...
07:54:56.144 | INFO | Task run 'perf_task-c7bf4036-7' - Finished in state Completed()
07:54:56.153 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-8' for task 'perf_task'
07:54:56.153 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-8' immediately...
07:54:56.177 | INFO | Task run 'perf_task-c7bf4036-8' - Finished in state Completed()
07:54:56.187 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-9' for task 'perf_task'
07:54:56.188 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-9' immediately...
07:54:56.210 | INFO | Task run 'perf_task-c7bf4036-9' - Finished in state Completed()
07:54:56.221 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-10' for task 'perf_task'
07:54:56.222 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-10' immediately...
07:54:56.244 | INFO | Task run 'perf_task-c7bf4036-10' - Finished in state Completed()
07:54:56.253 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-11' for task 'perf_task'
07:54:56.253 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-11' immediately...
07:54:56.278 | INFO | Task run 'perf_task-c7bf4036-11' - Finished in state Completed()
07:54:56.289 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-12' for task 'perf_task'
07:54:56.289 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-12' immediately...
07:54:56.311 | INFO | Task run 'perf_task-c7bf4036-12' - Finished in state Completed()
07:54:56.323 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-13' for task 'perf_task'
07:54:56.323 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-13' immediately...
07:54:56.345 | INFO | Task run 'perf_task-c7bf4036-13' - Finished in state Completed()
07:54:56.355 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-14' for task 'perf_task'
07:54:56.355 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-14' immediately...
07:54:56.380 | INFO | Task run 'perf_task-c7bf4036-14' - Finished in state Completed()
07:54:56.389 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-15' for task 'perf_task'
07:54:56.389 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-15' immediately...
07:54:56.411 | INFO | Task run 'perf_task-c7bf4036-15' - Finished in state Completed()
07:54:56.424 | INFO | Flow run 'strange-starling' - Created task run 'perf_task-c7bf4036-16' for task 'perf_task'
07:54:56.424 | INFO | Flow run 'strange-starling' - Executing 'perf_task-c7bf4036-16' immediately...
07:54:56.447 | INFO | Task run 'perf_task-c7bf4036-16' - Finished in state Completed()
I am using prefect 2.7.6.
Thanks in advanceSiva Balusu
01/06/2023, 3:29 AMSiva Balusu
01/06/2023, 3:30 AMSiva Balusu
01/06/2023, 3:46 AMKhyaati Jindal
01/06/2023, 10:32 AMTraceback (most recent call last):
Khyaati Jindal
01/06/2023, 10:34 AMTraceback (most recent call last):
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
result = func(*args)
File "/usr/lib/python3.10/ssl.py", line 917, in read
v = self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2548)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
Traceback (most recent call last):
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/anyio/streams/tls.py", line 195, in receive
data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
data = await self.transport_stream.receive()
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
await self._protocol.read_event.wait()
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
with anyio.fail_after(timeout):
File "/home/ubuntu/function-triggers/env/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/functions-triggers/env/lib/python3.10/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ReadTimeout
Any idea what does this error mean or why is it crashing ?