Joshua Greenhalgh
11/25/2022, 11:30 AMJames Zhang
11/25/2022, 1:43 PMThuy Tran
11/25/2022, 4:00 PM@task(cache_result_in_memory=False)
But I'm getting the error below that it's an unexpected keyword. Not sure what I'm doing wrong. It's running on prem using version 2.6.9.
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/opt/prefect/processor.py", line 3, in <module>
from data_import import data_import_process
File "/opt/prefect/data_import.py", line 8, in <module>
from data_cleaning import cleaning_process
File "/opt/prefect/data_cleaning.py", line 42, in <module>
@task(cache_result_in_memory=False)
TypeError: task() got an unexpected keyword argument 'cache_result_in_memory'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/engine.py", line 256, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/client.py", line 103, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 69, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(import_object, str(import_path))
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'processor.py' encountered an exception
Tibs
11/25/2022, 5:03 PMDeepak Pilligundla
11/25/2022, 5:25 PMTraceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/cli/build_register.py", line 134, in load_flows_from_script
namespace = runpy.run_path(abs_path, run_name="<flow>")
File "/usr/local/lib/python3.7/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/usr/local/lib/python3.7/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/repo/src/flows/4i_ssp_bene_data_shrng/4i_ssp_bene_data_shrng.py", line 14, in <module>
import snowflake.connector as sf
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/__init__.py", line 16, in <module>
from .connection import SnowflakeConnection
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/connection.py", line 25, in <module>
from . import errors, proxy
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/errors.py", line 18, in <module>
from .telemetry_oob import TelemetryService
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/telemetry_oob.py", line 20, in <module>
from .vendored import requests
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/requests/__init__.py", line 119, in <module>
from ..urllib3.contrib import pyopenssl
File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 50, in <module>
import OpenSSL.SSL
File "/usr/local/lib/python3.7/site-packages/OpenSSL/__init__.py", line 8, in <module>
from OpenSSL import SSL, crypto
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 19, in <module>
from OpenSSL.crypto import (
File "/usr/local/lib/python3.7/site-packages/OpenSSL/crypto.py", line 3232, in <module>
name="load_pkcs7_data",
TypeError: deprecated() got an unexpected keyword argument 'name'
eddy davies
11/25/2022, 6:10 PMThuy Tran
11/25/2022, 7:23 PM--memory="[memory_limit]"
argument to enable this?Trevor Campbell
11/25/2022, 8:11 PMA -> B -> C -> D
. Any of them can raise a SKIP signal, and if that happens, I want to skip all downstream tasks. It isn't a failure or a success, it's more of a "I'm not ready to run this yet, so don't run anything that depends on my output yet"
Is that possible to do in Orion? I saw one earlier thread here about it, but the outcome was inconclusive...
• one option is just to return a cancelled state, but that seems to suggest failure (which in my case would prompt a message to the admin, which I definitely don't want to happen for SKIPs. SKIPs happen very often in my particular case -- far more common than any other outcome)
• another is to return a completed state, but then I need annoying if
statements everywhere checking the outcome of previous tasks (skip vs. was actually run successfully). Actually, the whole reason I started using Prefect in the first place was for its ability to easily control flows where things get skipped 😉Anqi Lu
11/28/2022, 4:03 AMMahesh
11/28/2022, 8:27 AMSylvain Hazard
11/28/2022, 8:43 AMprefect orion start --port 5000
as well as the very simple flow copied below. Running the flow with python log_flow.py
randomly ends up crashing with this error : RuntimeError: The connection pool was closed while 2 HTTP requests/responses were still in-flight
. Is this an issue related to the server ? Am I forgetting to await something ? Anything I could do to fix it ?Malavika S Menon
11/28/2022, 10:35 AMPuneetjindal 11
11/28/2022, 10:58 AMAndreas Nord
11/28/2022, 11:09 AMSecret.load("secret")
this was working on Prefect 1 but on 2 I get this error:
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
Any ideas? ThanksDitlev Stjerne
11/28/2022, 11:13 AMEsdras Lopes Nani
11/28/2022, 1:15 PMThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/agent.py", line 154, in get_and_submit_f>
queue_runs = await self.client.get_runs_in_work_queue(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/orion.py", line 763, in get_runs_>
response = await <http://self._client.post|self._client.post>(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/base.py", line 160, in send
await super().send(*args, **kwargs)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1642, in _send_handling_>
response = await self._send_handling_redirects(
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1679, in _send_handling_>
response = await self._send_single_request(request)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1716, in _send_single_re>
response = await transport.handle_async_request(request)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in hand>
resp = await self._pool.handle_async_request(req)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_h>
raise mapped_exc(message) from exc
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
Currently Prefect is deployed on EC2 with agent running as process with systemd.
Prefect version is 2.6.9
Tnks!Bigya Man Pradhan
11/28/2022, 1:47 PMNic
11/28/2022, 2:56 PMWojciech Kieliszek
11/28/2022, 2:59 PMserialized_hash()
value as an idempotency_key
and as a docker storage tag. That way we don’t bump the versions when there is no change in a flow “schema”. But this mechanism allows us also to redeploy a docker image for the same flow version (no change in idempotency_key
) to change logic of particular tasks when there is no change of the “contract”. So for long running flows runs we can change their behaviours to some extent during their execution or resume them after a failure with new bug-free logic. We are now in transition to Prefect v2. Is there any kind of similar mechanism available in Prefect v2?Joshua Grant
11/28/2022, 3:24 PMThomas Opsomer
11/28/2022, 3:41 PM...
some logs from the running task
...
Downloading flow-name/2022-11-25t14-47-58-727718-00-00 from bucket
Beginning Flow run for 'flow-name'
Task 'accounts.link': Starting task run...
...
Flow run RUNNING: terminal tasks are incomplete.
Sean Conroy
11/28/2022, 3:52 PMsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table flow_run has no column named state_timestamp"
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown
EVENT_LOOP_GC_REFS.pop(key) KeyError: 140157491380864
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow
Has anyone see this kind of things before?Denis
11/28/2022, 4:10 PM# len(param_dict) => 100
for params in param_dict:
copy_files.submit(params)
will this create 100 threads or is it using async to run it concurrently in one thread. The reason I am asking is because we're facing issues of hanging flown runs when we submit a larger number of tasks, (in last attempt we had cca 48 tasks and flow just hanged)
Any clarification on how it works would be appreciated.Jon Young
11/28/2022, 5:30 PMJon Young
11/28/2022, 6:19 PMget_task_run_result
to get the results of a task called within a resource_manager
. Seems the resource manager cleans up the task's result so it is unavailable to another flow?
└── 12:43:06 | ERROR | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 239, in get_task_run_result
return task_run.get_result()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 79, in get_result
self._result = self._load_result()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 85, in _load_result
self._load_child_results()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 139, in _load_child_results
task_run._assert_result_type_is_okay()
File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 162, in _assert_result_type_is_okay
raise ValueError(
ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
Joshua Grant
11/28/2022, 6:36 PMcontext
key for the payload in https://orion-docs.prefect.io/api-ref/rest-api/#/Deployments/create_flow_run_from_deployment_deployments__id__create_flow_run_post? Specifically interested in use cases for configuring this.Mihai H
11/28/2022, 9:11 PMPhilip MacMenamin
11/28/2022, 10:01 PMPREFECT__USER_CONFIG_PATH
Is the expected behavior once this is exported, eg
export PREFECT__USER_CONFIG_PATH=/a/b/c
the .prefect
dir should be at /a/b/c/.prefect
?
By default, Prefect will look for a user configuration file at, but you can change that location by setting the environment variable$HOME/.prefect/config.toml
appropriately. Please note the double-underscore (PREFECT__USER_CONFIG_PATH
) in the variable name__
An Ninh Vũ
11/29/2022, 2:07 AMfrom prefect import Parameter
). Let's say I have the parameter "`account_id`", with an int value type. This parameter is parsed when I run the flow with account_id = Parameter("account_id", default=None)
. After that I have a task that is running with a timeout. And in that task's state handler, I reuse the parameter account_id
and parse that parameter into a dictionary (code is below). Here is where the error comes up, as `account_id`'s type is Parameter, but it doesn't accept a parameter in a dictionary if you want to json.dumps
that dict.
My question is (also TL;DR): How can I turn this parameter's type into an int type (originally prefect.Parameter type) to use in json.dumps
?
P/s: Is there another way to do what I'm trying to achieve? Thank you so much!
P/ss: My Prefect version: 0.14.22
def timeout_state_handler(task, old_state, new_state):
if isinstance(new_state, state.TimedOut):
logger.error("rerun this flow")
flow = StartFlowRun(flow_name='rerun_flow', project_name='test')
run_config = DockerRun(
image="mydocker/test-project:latest",
env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}
)
flow.run(
parameters=dict(account_id=account_id, dates=dates),
run_config=run_config, run_name="RERUN_FLOW",
idempotency_key=gen_idempotency_key(account_id)
)
raise signals.SUCCESS("This task timed out with status SUCCESS. Re-run this flow.")
Kishan
11/29/2022, 3:21 AMStarting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
10:18:46 PM
Crash detected! Execution was interrupted by an unexpected exception: TypeError: object NoneType can't be used in 'await' expression
10:18:46 PM
Crash details:
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 1332, in report_flow_run_crashes
yield
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 357, in begin_flow_run
flow_run_context.result_factory = await ResultFactory.from_flow(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 161, in from_flow
return await cls.default_factory(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 123, in default_factory
return await cls.from_settings(**kwargs, client=client)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 229, in from_settings
storage_block_id, storage_block = await cls.resolve_storage_block(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 257, in resolve_storage_block
or await storage_block._save(is_anonymous=True, overwrite=True)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/blocks/core.py", line 751, in _save
await self.register_type_and_schema(client=client)
TypeError: object NoneType can't be used in 'await' expression
Kishan
11/29/2022, 3:21 AMStarting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
10:18:46 PM
Crash detected! Execution was interrupted by an unexpected exception: TypeError: object NoneType can't be used in 'await' expression
10:18:46 PM
Crash details:
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 1332, in report_flow_run_crashes
yield
File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 357, in begin_flow_run
flow_run_context.result_factory = await ResultFactory.from_flow(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 161, in from_flow
return await cls.default_factory(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 123, in default_factory
return await cls.from_settings(**kwargs, client=client)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 229, in from_settings
storage_block_id, storage_block = await cls.resolve_storage_block(
File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 257, in resolve_storage_block
or await storage_block._save(is_anonymous=True, overwrite=True)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/homebrew/lib/python3.10/site-packages/prefect/blocks/core.py", line 751, in _save
await self.register_type_and_schema(client=client)
TypeError: object NoneType can't be used in 'await' expression
Khuyen Tran
11/29/2022, 4:43 PM