Tim Galvin
11/16/2022, 11:59 AMEncountered exception during execution:
Traceback (most recent call last):
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/bin/process_holography.py", line 431, in holography_flow
return_me = log_and_wait_futures(return_me)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__
return enter_task_run_engine(
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
return await future._result()
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/states.py", line 86, in _get_state_result
raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Thomas Fredriksen
11/16/2022, 12:21 PM.flow
-postfix in Prefect 1. Is it still possible to store flows in this way in Prefect 2? From what I tell from the source, it may not be possible
https://github.com/PrefectHQ/prefect/blob/a24c725629e1a6aef31bcb577d4a853a86fc1cbe/src/prefect/utilities/importtools.py#L191Joshua Grant
11/16/2022, 3:22 PMDockerRegistry
block with AWS ECR?Jenia Varavva
11/16/2022, 4:21 PMteam1_url
and team2_api
. I wanted to use the create_flow_run() task, but since it constructs a Client()
with no args, I’m having trouble pointing it at the right API_URL. My initial instinct was to do something like:
config_team1 = prefect.context.config.copy()
config_team1.cloud.api = 'team1_url'
with Flow('parent'):
with prefect.context(config=config_team1):
create_flow_run(...)
with prefect.context(config=config_team2):
create_flow_run(...)
This, unfortunately, doesn’t work as the context is set at the flow definition time, not at task execution (i.e. not when Client()
is executed).
An alternative that got me closer was something like:
with prefect.context(config=config_team1):
flow.run()
which indeed overrides the config value for the Client()
call inside the create_flow_run()
task, but doesn’t look like it would allow me using two different urls in the same flow.
Is there a better way around this than copying/adjusting create_flow_run()
to allow overriding api_url
itself?Dev Dabke
11/16/2022, 4:28 PMlogger
created as a singleton in a python module. However, sometimes, this module is invoked in a prefect flow. The logs from this logger are not displaying during the flow run, nor or they logging to orion. I am not invoking get_run_logger
anywhere. The issue is that the module is only sometimes invoked in the context of a flow; it’s a shared library. Can I register prefect as a handle to this logger that will only log when in a prefect context?Mac
11/16/2022, 4:29 PMcreate_flow_run.map()
from prefect v1 in v2? I am trying to run a few subflows at once, then have the flow wait to get successful responses from all of the subflow before moving on the the next taskJai P
11/16/2022, 5:27 PMThomas Fredriksen
11/16/2022, 7:29 PMDaskTaskRunner
with the KubeCluster
-type from dask-kubernetes
, however I am encountering the following error:
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 161, in start
await self._start(exit_stack)
File "/usr/local/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 300, in _start
self._client = await exit_stack.enter_async_context(
File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1398, in __aenter__
await self
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1213, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 1276, in _ensure_connected
comm = await connect(
File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
return fut.result()
File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 487, in connect
ip, port = parse_host_port(address)
File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
port = _default()
File "/usr/local/lib/python3.10/site-packages/distributed/comm/addressing.py", line 73, in _default
raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address '<Not Connected>'
The taskrunner is set dynamically:
flow.task_runner = DaskTaskRunner(
cluster_class=KubeCluster,
cluster_kwargs={
"name": f"{flow.name}",
"namespace": "prefect",
"image": self._storage.get_name(),
"n_workers": 1,
"resources": {
"limits": {
"cpu": resources.cpu,
"memory": resources.mem,
}
},
},
adapt_kwargs={
"minimum": self._min_workers,
"maximum": self._max_workers,
},
client_kwargs={
"set_as_default": True
}
)
Does anyone who might be causing this?Vadym Dytyniak
11/16/2022, 7:32 PMLocalRun
in Prefect 2?badasstronaut
11/16/2022, 7:33 PMprefect_aws
and here’s an example of my ecs.py infra block:
ecs = ECSTask(image='custom-prefect-dask:latest',
cpu='512',
memory='2048',
cluster='arn:aws:ecs:us-east-1:...Cluster',
execution_role_arn='arn:aws:iam::...Role',
task_role_arn='arn:aws:iam:...Role',
task_start_timeout_seconds=300,
configure_cloudwatch_logs=True,
launch_type='FARGATE_SPOT',
vpc_id='vpc-...',
task_customizations=[dict(op='replace',
path='/tags',
value=[dict(key='TAG_1', value='Prefect'),
dict(key='TAG_2', value='POC')]),
dict(op='replace',
path='/networkConfiguration/awvpcConfiguration/assignPublicIp',
value='DISABLED')],
env={'PREFECT_API_URL': '<http://prefect.internal>..../api'})
Expected result: The prefect
task reflects the tags provided in task_customizations
Actual result: Only the default <http://prefect.io/|prefect.io/>
tags are generated on the prefect
task
EDIT: This turned out to be due to an outdated version of prefect-aws
. The code block above works as expected.Vishnu Duggirala
11/16/2022, 7:43 PMThe image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a Prefect base image matching your local versions.
it is not overriding the image when I gave the task definition ARN. Does it only override if I give the task definition?Madison Schott
11/16/2022, 7:54 PMAshley Felber
11/16/2022, 8:00 PMdados grupoa
11/16/2022, 8:07 PMPatrick Alves
11/16/2022, 8:15 PMSander
11/16/2022, 9:32 PMAndrew P
11/16/2022, 10:07 PM| ERROR | prefect.agent - Failed to create work queue 'default'.
when trying to connect to my Prefect Cloud work queue from ECS. I've verified that the PREFECT_API_URL
looks right (basically, "<https://api.prefect.cloud/api/accounts/${prefect_account_id}/workspaces/${prefect_workspace_id}>"
, and the two variables when substituted in the env var are the same as in my prefect cloud UI url). Is there any other failure mode that might result in this? Can I pass a debug flag to see if the failure is eg a 403 vs a lack of dns or something? No google results and the error seems pretty opaque. Starting with the command ["prefect","agent","start","-q","default"]
using the image prefecthq/prefect:2-python3.10
Scott Walsh
11/17/2022, 12:15 AMimport asyncio
from prefect.client import get_client
from prefect import flow, task
@task
async def child_task1():
async with get_client() as client:
deployment = await client.read_deployment_by_name(name='sub_flow1/sub_flow1')
response = await client.create_flow_run_from_deployment(deployment.id)
@task
async def child_task2():
async with get_client() as client:
deployment = await client.read_deployment_by_name(name='sub_flow2/sub_flow2')
response = await client.create_flow_run_from_deployment(deployment.id)
@flow
async def parent_flow():
await child_task1.submit()
await child_task2.submit()
if __name__ == "__main__":
asyncio.run(parent_flow())
Carlo
11/17/2022, 12:23 AMECSTask
with the task_definition
arg. Today we tried transitioning to task_definition_arn
However, it's still trying to register a task definition every time the agent picks up a flow to run. Details in thread...Ben Muller
11/17/2022, 12:57 AM@flow
def my_flow(
start: str = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d"),
end: str = datetime.utcnow().strftime("%Y-%m-%d"),
):
Will these default parameters be set at deploy time or will they update at run time ?Nick Batchelder
11/17/2022, 1:03 AMusername=nick
at runtime.Deepanshu Aggarwal
11/17/2022, 8:15 AMhttpx.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
anyone who faced similar issue ?
adding the trace stack in commentsBen Muller
11/17/2022, 8:31 AMlock document has schema checksum sha256:96937dc8f6bd314a1cbeae7623159848872249e742b3bac8d7af39da4e2dfe79 which does not match the schema checksum for class 'ECSTask'. This indicates the schema has changed and this block may not load.
How do i debug and or make sure everything is all good?Mohit Singhal
11/17/2022, 8:41 AMManuel Garrido Peña
11/17/2022, 9:32 AMparams= X=y
, why did we remove such awesome feature?Zinovev Daniil
11/17/2022, 11:26 AMHi all. I had a problem right after installing prefect orion. UI is not loading. I am getting a blank page with a title. JS is not fully loaded. It breaks at 1.1 mb. Perhaps someone solved such a problem?
Rajeshwar Agrawal
11/17/2022, 1:27 PMJared Robbins
11/17/2022, 2:06 PMJon
11/17/2022, 2:19 PMcreate_flow_run.map
. How to get the results of the mapped flow?
I'm getting this error:
└── 09:11:14 | ERROR | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 220, in get_task_run_result
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 585, in from_flow_run_id
flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 627, in _query_for_flow_run
result = client.graphql(flow_run_query)
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/client/client.py", line 464, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'message': 'parsing UUID failed, expected String, but encountered Array', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow_run'], 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
Taylor Curran
11/17/2022, 2:47 PM