Eric
10/02/2023, 8:11 PMEric
10/02/2023, 8:18 PMBrian Newman
10/02/2023, 9:34 PMclass User(BaseModel):
"""Pydantic model for the User."""
email: Optional[str]
firstName: Optional[str]
lastName: Optional[str]
@task()
def user_json(user: User):
"""Should return a JSON representation of only the set fields in the User model."""
print(user)
# Outputs: email=None firstName='John' lastName=None
print(user.json(exclude_unset=True))
# Outputs: {"email": null, "firstName": "John", "lastName": null}
return user.json(exclude_unset=True)
@flow()
def fl_create_user():
"""Creates a new user with only the first_name set and calls the user_json task."""
user = User(firstName="John")
print(user)
# Outputs: email=None firstName='John' lastName=None
print(user.json(exclude_unset=True))
# Outputs: {"firstName": "John"}
json_user = user_json(user)
print(json_user)
# Outputs: {"email": null, "firstName": "John", "lastName": null}
Matt Alhonte
10/03/2023, 1:42 AMrun_deployment
(run from a task). I notice that if one fails, then the downstream ones don't fail. How do I make subflow failures propagate to the parent flow? I'm using the ConcurrentTaskRunner
. Some need to run concurrently with each other.Matt Alhonte
10/03/2023, 2:07 AMwait_for
argument to all run at the same time, while any Failure kills the whole thing.
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
def fail_hook(flow, flow_run, state):
print("Failed")
raise Exception(state.message)
@task(log_prints=True, on_failure=[fail_hook])
def my_task():
time.sleep(15)
# some code here
raise Exception("This task has failed.") # this will set the task to Failed state
@task(log_prints=True, on_failure=[fail_hook])
def my_task2():
time.sleep(25)
return "sdfsdf"
@flow(
task_runner=ConcurrentTaskRunner(),
)
def my_flow():
result1 = my_task.submit(return_state=True)
result2 = my_task2.submit(return_state=True)
result3 = my_task.submit(return_state=True)
result4 = my_task2.submit(return_state=True, wait_for=[result1])
Matt Fysh
10/03/2023, 3:13 AMLaszlo [Gimlet.io]
10/03/2023, 11:36 AMTarun Reddy
10/03/2023, 12:23 PMTarun Reddy
10/03/2023, 12:24 PMJason Motley
10/03/2023, 2:39 PMJeffrey Lam
10/03/2023, 2:58 PMFailed to submit flow run '8093ef5c-64f8-42eb-9f4b-045ed7e7d6ad' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 174, in _new_conn
conn = connection.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 95, in create_connection
raise err
File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 85, in create_connection
sock.connect(sa)
OSError: [Errno 99] Cannot assign requested address
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 714, in urlopen
httplib_response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 403, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 1053, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 363, in connect
self.sock = conn = self._new_conn()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 186, in _new_conn
raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7f60db485950>: Failed to establish a new connection: [Errno 99] Cannot assign requested address
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 834, in _submit_run_and_capture_errors
result = await self.run(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 506, in run
job = await run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 628, in _create_job
job = batch_client.create_namespaced_job(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
return self.api_client.call_api(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request
return <http://self.rest_client.POST|self.rest_client.POST>(url,
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 276, in POST
return self.request("POST", url,
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 169, in request
r = self.pool_manager.request(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 78, in request
return self.request_encode_body(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 170, in request_encode_body
return self.urlopen(method, url, **extra_kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/poolmanager.py", line 376, in urlopen
response = conn.urlopen(method, u.request_uri, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 826, in urlopen
return self.urlopen(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 826, in urlopen
return self.urlopen(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 826, in urlopen
return self.urlopen(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 798, in urlopen
retries = retries.increment(
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/urllib3/util/retry.py", line 592, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='172.30.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect-2x/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f60db485950>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))
Dominic Tarro
10/03/2023, 5:57 PMrun_deployment
, are only the parameters I submit overridden, or are all parameters overridden and any not explicitly passed fall back on the flow function's defaults?Theis Ferré Hjortkjær
10/03/2023, 6:27 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def process(x):
return x + 1
@flow(task_runner=RayTaskRunner("ray://<my-ray-service>:10001"))
def my_flow():
# equivalent to setting @ray.remote(num_cpus=4)
with remote_options(num_cpus=4):
process.submit(42)
It fails with a runtime error:
RuntimeError: There is no current event loop in thread 'ray_client_server_1'.
If i do not specify resources for my task, everything works as intended.
I have submitted an issue, but it has not been picked up yet, so i thought maybe i could get some help here 🙂
https://github.com/PrefectHQ/prefect/issues/10542Erik Amundson
10/03/2023, 8:29 PMCrash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Client error '408 Request Timeout' for url '<https://api.prefect.cloud/api/accounts/><xxx>/workspaces/<xxx>/task_runs/'
Samuel Hinton
10/03/2023, 10:55 PMprefect-aws
to launch ECS tasks for longer running compute. Im unsure how a "worker" would change or improve these two use casesMatt Alhonte
10/04/2023, 12:50 AMdef make_fail(flow, flow_run, state):
print("Failed")
flow_run_ctx = FlowRunContext.get()
print(flow_run_ctx)
print("Flow")
print(flow)
print("Flow run")
@flow(
log_prints=True,
on_completion=[make_fail],
on_failure=[make_fail],
task_runner=ConcurrentTaskRunner(),
result_serializer=JSONSerializer(),
result_storage=S3(bucket_path=bucket_path),
persist_result=True,
)
def my_flow():
result1 = my_task.submit()
result2 = my_task2.submit()
result3 = my_task.submit()
result4 = my_task2.submit(wait_for=[result1])
None of the print
statements show upMatt Fysh
10/04/2023, 12:52 AMtig
10/04/2023, 1:37 AMMatt Fysh
10/04/2023, 3:53 AM/task-runs/task-run/:uuid
route?
• the task inputs tab is showing {"id": []}
when it should be (for example) {"id": "123"}
• the cache key in the sidebar is not populated, but I provided a cache_key_fn
and the caching feature itself seems to be working (re-running task with same ID uses cached result)Deepanshu Aggarwal
10/04/2023, 12:32 PM# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.
# Generic metadata about this project
name: activities-split-sync
prefect-version: 2.13.3
# build section allows you to manage and build docker images
build: null
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_aws.deployments.steps.push_to_s3:
id: push_code
requires: prefect-aws>=0.3.4
bucket: prod-prefect-s3-block
folder: prefect
credentials: "{{ prefect.blocks.aws-credentials.s3cred }}"
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect_aws.deployments.steps.pull_from_s3:
id: pull_code
requires: prefect-aws>=0.3.4
bucket: '{{ push_code.bucket }}'
folder: '{{ push_code.folder }}'
credentials: "{{ prefect.blocks.aws-credentials.s3cred }}"
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: activities-mode-split-sync-flow
version: '1.0'
tags: [
activities_mode_split_sync_flow
]
description: null
schedule: {}
flow_name: activities-mode-split-sync-flow
entrypoint: flows/process_sso_integrations_flows/activities_mode_split_sync_flow.py:activities_mode_split_sync_flow
parameters: {}
work_pool:
name: sso
work_queue_name: activities-sso-queue
job_variables: {}
Sunjay
10/04/2023, 12:58 PMEvan
10/04/2023, 1:31 PMLior Barak
10/04/2023, 2:29 PMpull
stage
1. cloning from git
2. installing requirements
one of my flow requirements is boto3 (and obviously it is in the requirements.txt file)
every first flow run after agent's startup fails with this error:
"Flow could not be retrieved from deployment."
"exc_info":{"type":"ScriptError","message":"Script at 'flows/full_scan.py' encountered an exception: ModuleNotFoundError(\"No module named 'boto3'\")"
and the second flow onwards succeeds
if I install boto3 manually on the agent's terminal it works fine
but if I add this step to pull:
- prefect.deployments.steps.run_shell_script:
id: force-install-boto3
script: python3 -m pip install boto3==1.28.0
stream_output: true
the aforementioned behaviour continues
help?Alex S.
10/04/2023, 2:51 PMtriggers:
- enabled: true
match:
prefect.resource.id: prefect.flow-run.*
expect:
- prefect.flow-run.Completed
match_related:
# This doesn't work prefect.resource.name: prefect.flow.hello_world_flow1
prefect.resource.id: prefect.flow.24b5f5df-145c-4ce7-bca2-b0159159d4f1
prefect.resource.role: flow
Deceivious
10/04/2023, 3:24 PMHenri
10/04/2023, 6:19 PMRuntimeError: Failed to run ECS task, cluster 'default' not found. Confirm that the cluster is configured in your region.
How can I tell the deployment/prefect the name of the cluster? I really don't know where else to look.
ThxErik Amundson
10/04/2023, 6:24 PMfrom prefect.client.schemas import schedules
from prefect.client.schemas.schedules import DateTimeTZ
pst = "America/Los_Angeles"
daily_midnight_offset_30 = schedules.IntervalSchedule(
anchor_date=DateTimeTZ(year=2020, month=1, day=1, hour=0, minute=30), timezone=pst, interval=timedelta(hours=24)
)
But the schedule runs at 17:30 Pacific (00:30 UTC). Do python schedules not respect the time zone?Henri
10/04/2023, 6:42 PMSam McAlilly
10/04/2023, 8:30 PMprefect.yaml
to places that aren’t itSam McAlilly
10/04/2023, 8:33 PM