Malavika S Menon
11/28/2022, 10:35 AMpuneet jindal
11/28/2022, 10:58 AMAndreas Nord
11/28/2022, 11:09 AMSecret.load("secret")
this was working on Prefect 1 but on 2 I get this error:
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
Any ideas? ThanksDitlev Stjerne
11/28/2022, 11:13 AMEsdras Lopes Nani
11/28/2022, 1:15 PMThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/agent.py", line 154, in get_and_submit_f>
queue_runs = await self.client.get_runs_in_work_queue(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/orion.py", line 763, in get_runs_>
response = await <http://self._client.post|self._client.post>(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/base.py", line 160, in send
await super().send(*args, **kwargs)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1642, in _send_handling_>
response = await self._send_handling_redirects(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1679, in _send_handling_>
response = await self._send_single_request(request)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1716, in _send_single_re>
response = await transport.handle_async_request(request)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in hand>
resp = await self._pool.handle_async_request(req)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_h>
raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
Currently Prefect is deployed on EC2 with agent running as process with systemd.
Prefect version is 2.6.9
Tnks!Deceivious
11/28/2022, 1:47 PMNic
11/28/2022, 2:56 PMWojciech Kieliszek
11/28/2022, 2:59 PMserialized_hash()
value as an idempotency_key
and as a docker storage tag. That way we don’t bump the versions when there is no change in a flow “schema”. But this mechanism allows us also to redeploy a docker image for the same flow version (no change in idempotency_key
) to change logic of particular tasks when there is no change of the “contract”. So for long running flows runs we can change their behaviours to some extent during their execution or resume them after a failure with new bug-free logic. We are now in transition to Prefect v2. Is there any kind of similar mechanism available in Prefect v2?Joshua Grant
11/28/2022, 3:24 PMThomas Opsomer
11/28/2022, 3:41 PM...
some logs from the running task
...
Downloading flow-name/2022-11-25t14-47-58-727718-00-00 from bucket
Beginning Flow run for 'flow-name'
Task 'accounts.link': Starting task run...
...
Flow run RUNNING: terminal tasks are incomplete.
Sean Conroy
11/28/2022, 3:52 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table flow_run has no column named state_timestamp"
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown
EVENT_LOOP_GC_REFS.pop(key) KeyError: 140157491380864
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow
Has anyone see this kind of things before?Denis
11/28/2022, 4:10 PM# len(param_dict) => 100
for params in param_dict:
copy_files.submit(params)
will this create 100 threads or is it using async to run it concurrently in one thread. The reason I am asking is because we're facing issues of hanging flown runs when we submit a larger number of tasks, (in last attempt we had cca 48 tasks and flow just hanged)
Any clarification on how it works would be appreciated.Jon
11/28/2022, 5:30 PMJon
11/28/2022, 6:19 PMget_task_run_result
to get the results of a task called within a resource_manager
. Seems the resource manager cleans up the task's result so it is unavailable to another flow?
└── 12:43:06 | ERROR | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 239, in get_task_run_result
return task_run.get_result()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 79, in get_result
self._result = self._load_result()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 85, in _load_result
self._load_child_results()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 139, in _load_child_results
task_run._assert_result_type_is_okay()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 162, in _assert_result_type_is_okay
raise ValueError(
ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
Joshua Grant
11/28/2022, 6:36 PMcontext
key for the payload in https://orion-docs.prefect.io/api-ref/rest-api/#/Deployments/create_flow_run_from_deployment_deployments__id__create_flow_run_post? Specifically interested in use cases for configuring this.Mihai H
11/28/2022, 9:11 PMPhilip MacMenamin
11/28/2022, 10:01 PMPREFECT__USER_CONFIG_PATH
Is the expected behavior once this is exported, eg
export PREFECT__USER_CONFIG_PATH=/a/b/c
the .prefect
dir should be at /a/b/c/.prefect
?
By default, Prefect will look for a user configuration file at, but you can change that location by setting the environment variable$HOME/.prefect/config.toml
appropriately. Please note the double-underscore (PREFECT__USER_CONFIG_PATH
) in the variable name__
An Ninh Vũ
11/29/2022, 2:07 AMfrom prefect import Parameter
). Let's say I have the parameter "`account_id`", with an int value type. This parameter is parsed when I run the flow with account_id = Parameter("account_id", default=None)
. After that I have a task that is running with a timeout. And in that task's state handler, I reuse the parameter account_id
and parse that parameter into a dictionary (code is below). Here is where the error comes up, as `account_id`'s type is Parameter, but it doesn't accept a parameter in a dictionary if you want to json.dumps
that dict.
My question is (also TL;DR): How can I turn this parameter's type into an int type (originally prefect.Parameter type) to use in json.dumps
?
P/s: Is there another way to do what I'm trying to achieve? Thank you so much!
P/ss: My Prefect version: 0.14.22
def timeout_state_handler(task, old_state, new_state):
if isinstance(new_state, state.TimedOut):
logger.error("rerun this flow")
flow = StartFlowRun(flow_name='rerun_flow', project_name='test')
run_config = DockerRun(
image="mydocker/test-project:latest",
env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}
)
flow.run(
parameters=dict(account_id=account_id, dates=dates),
run_config=run_config, run_name="RERUN_FLOW",
idempotency_key=gen_idempotency_key(account_id)
)
raise signals.SUCCESS("This task timed out with status SUCCESS. Re-run this flow.")
Kishan
11/29/2022, 3:21 AMStarting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
10:18:46 PM
Crash detected! Execution was interrupted by an unexpected exception: TypeError: object NoneType can't be used in 'await' expression
10:18:46 PM
Crash details:
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 1332, in report_flow_run_crashes
yield
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 357, in begin_flow_run
flow_run_context.result_factory = await ResultFactory.from_flow(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 161, in from_flow
return await cls.default_factory(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 123, in default_factory
return await cls.from_settings(**kwargs, client=client)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 229, in from_settings
storage_block_id, storage_block = await cls.resolve_storage_block(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 257, in resolve_storage_block
or await storage_block._save(is_anonymous=True, overwrite=True)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/blocks/core.py", line 751, in _save
await self.register_type_and_schema(client=client)
TypeError: object NoneType can't be used in 'await' expression
Mohit Singhal
11/29/2022, 7:07 AMMohit Singhal
11/29/2022, 7:25 AMAndreas Nigg
11/29/2022, 8:19 AMJames Zhang
11/29/2022, 9:04 AMJavier Ruere
11/29/2022, 9:26 AMSeif Harrathi
11/29/2022, 9:38 AM@flow(name="Run Check", description="Flow that controls the data integrity")
def run_check(layers_data):
"""
1- Extract data from Arcgis databse
2- Transform data
3- Load result into Excel output
@param layers_data:
@return:
"""
# 1- Extract data from databse
state = extract_data(layers_data, return_state=True)
raw = state.result()
# 2- Transform data
state = DataTransformer.transform_results(raw, return_state=True)
data = state.result()
# 3- Load result into Excel output
state = DataWriter.load_results(data, return_state=True)
output = state.result()
return {"Result": {
"nb_errors": 12,
"nb_lines_checked": 456
},
"output_path": "path_to_s3"
}
My question is how to get the results returned by my flow run like I want to get
{
"Result": {
"nb_errors": 12,
"nb_lines_checked": 456
},
"output_path": "path_to_s3"
}
I used the Endpoint :/api/flow_runs/{id}
But I I dont the my result in the response 😕
Any help ? Any idea . I thought about saving the result in the S3 bucket than retrieve it but not sure if this is the best practice
Thaaaaaaaaaaaanks in advancealvin goh
11/29/2022, 10:23 AMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:31 PMEncountered exception during execution:
Traceback (most recent call last):
File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 612, in orchestrate_flow_run
waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 1325, in wait_for_task_runs_and_report_crashes
if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
03:25:35 PM
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'