Rikimaru Yamaguchi
02/28/2023, 7:05 AMflapili
02/28/2023, 10:28 AMTon Steijvers
02/28/2023, 10:41 AM@task
def mytask(obj):
get_run_logger().info(f"in mytask: {id(obj)}") # task shows different id than flow
with obj.get_connection() as connection:
connection.execute_string("SELECT 1;")
@flow
def myflow():
obj = SnowflakeConnector.load(block_name)
get_run_logger().info(f"in flow: {id(obj)}")
mytask(obj)
mytask.submit(obj)
Is this behaviour by design that I can rely upon? Are there cases where I would get the actual reference instead of a copy?
_After having used the (SnowflakeConnector) object in the task, its underlying connection might have been closed. If subsequent tasks would receive the same instance, this would result in a "Connection already closed" exception. Getting a copy ensures that I can safely call get_connection in every task._jpuris
02/28/2023, 2:09 PMtask_1 >> task_2
task_2 >> task_3
task_2 >> task_4
...
Now prefect determines this implicitly by the "parameter" direction from task to task i.e.
result_1 = task_1
result_2 = task_2(result_1)
result_3 = task_3(result_2)
result_4 = task_4(result_2)
Which is fine, no problems there.. but what do we do when there is task 5, that does not require any result from the tasks it is dependent on? For example task 5 absolutely must run, if task 4 is done, but has no other relationship with it, like
result_1 = task_1
result_2 = task_2(result_1)
result_3 = task_3(result_2)
result_4 = task_4(result_2)
result_5 = task_5()
With sequential task runner (default), this is OK, but if I were to use concurrent one (I want to have the tasks 3 and 4 run at the same time, but 5 must not run before task 4 is done?Thet Naing
02/28/2023, 2:53 PMDave D
02/28/2023, 4:48 PMjpuris
02/28/2023, 6:39 PMif not is_success:
logger.error(error_msg)
return Failed(message="I've failed because ...!")
19:31:34.931 | ERROR | Flow run 'sociable-mantis' - Traceback (most recent call last):
... actually relevant stack trace from "error_msg" ...
19:31:35.161 | ERROR | Flow run 'sociable-mantis' - Finished in state Failed('I've failed because ...!')
Traceback (most recent call last):
<... a massive stack trace on raising a failure from prefect :(>
raise await get_state_exception(state)
prefect.exceptions.FailedRun: I've failed because ...!
The last stack trace is really unnecessary. I'd only want to output information that the flow has ended in failure state 😞Tony Alfonse
02/28/2023, 8:37 PMAnand Soleti
02/28/2023, 10:17 PMMaikel Penz
03/01/2023, 2:01 AMexport PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/><ACCOUNT-ID>/workspaces/<WORKSPACE-ID>"
export PREFECT_API_KEY="<API-KEY>"
prefect kubernetes manifest agent -q <MY-QUEUE> | kubectl apply --namespace=default -f -
And the error..
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): <MY-QUEUE>...
An exception occurred.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 230, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 181, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.8/site-packages/prefect/cli/agent.py", line 201, in start
tg.start_soon(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/services.py", line 46, in critical_service_loop
await workload()
File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 265, in check_for_cancelled_flow_runs
named_cancelling_flow_runs = await self.client.read_flow_runs(
File "/usr/local/lib/python3.8/site-packages/prefect/client/orchestration.py", line 1715, in read_flow_runs
response = await <http://self._client.post|self._client.post>(f"/flow_runs/filter", json=body)
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 253, in send
response.raise_for_status()
File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 130, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/><ACCOUNT-ID>/workspaces/<WORKSPACE-ID>/flow_runs/filter'
Response: {'detail': 'Not Found'}
For more information check: <https://httpstatuses.com/404>
Why do I get Not Found
on <https://api.prefect.cloud/api/accounts/><ACCOUNT-ID>/workspaces/<WORKSPACE-ID>/flow_runs/filter
?Samuel Hinton
03/01/2023, 2:22 AMtask
that fetches data returns it in the same pd.DataFrame
format, so our save_to_db
task is very general.
Now what I was wanting to do was something like the following:
for data_task in list_of_data_tasks:
@flow(name=f"{data_task.__name__} flow", task_runner=dask_task_runner)
def anonymous_flow():
data = data_task.submit()
save_to_db_task.submit(data)
Deployment.build_from_flow(...).apply()
I thought “yeah this is good, it saves tons and tons of duplicated @flow
declarations and duplicated save_to_db
lines. However, this won’t work because the flow is dynamic, so an agent wont be able to actually find it.
What would be the best practise solution for the above?Jacob Bedard
03/01/2023, 2:26 AMJacob Bedard
03/01/2023, 2:30 AMDenys Volokh
03/01/2023, 8:58 AMDeceivious
03/01/2023, 9:37 AMBharat Nedungadi
03/01/2023, 10:04 AMTolga Karahan
03/01/2023, 11:55 AMLee Mendelowitz
03/01/2023, 1:41 PMfrom prefect.states import Completed, Failed
from prefect import task, flow
@flow
def flow_that_fails():
failed_values = ['e', 'f', 'g']
return Failed(message = 'some flows failed', data = failed_values)
@flow
def my_flow():
result = flow_that_fails(return_state = True)
data = result.result(raise_on_failure = False)
print(data)
my_flow()
gives:
TypeError: Unexpected result for failed state: ['e', 'f', 'g'] —— list cannot be resolved into an exception
I’m on prefect 2.8.3Slackbot
03/01/2023, 3:33 PMJames Gatter
03/01/2023, 4:02 PMprefect deployment build ... --apply
how does my flow code get accessed?
Should it be getting pulled from the code that is uploaded to S3, or does the flow code have to be included in the docker image? I know the latter is true for the dataflowops tutorial, but I also just ran a deployment that did not have the flow code included in the docker image.
Furthermore is there any further documentation for the deployment.yaml fields?Deepak Pilligundla
03/01/2023, 4:21 PMwhen ever we update the DB passwords flows are not working until you register all the flows together , is there any alternative way to avoid to register all the flows when passwords updated , please let me if any one across similar issue ,Thanks in advance for time and consideration.
Ilya Galperin
03/01/2023, 5:30 PMFlow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
We are on Prefect version 2.8.2 but this has been occurring for a few versions now. Other users seem to have experienced this behavior as well (see thread here). Is this being worked on and if so, where can we track it?Zack
03/01/2023, 5:56 PMOfir
03/01/2023, 6:38 PMpandas
or numpy
or any other 3rd party dependency from my workflow, what is the best practice to do that?
The agent running my deployment/workflow might not have these packages in place, right?
Should I build a custom agent Dockerfile with all of the dependencies, or is there a better approach to it? what are the tradeoffs between the different solutions?Tomás Emilio Silva Ebensperger
03/01/2023, 6:53 PMXinchi He
03/01/2023, 8:57 PMJosh Paulin
03/01/2023, 8:59 PMchicago-joe
03/01/2023, 8:59 PMJai P
03/01/2023, 9:18 PMjack
03/01/2023, 10:08 PMtask_args=dict(name='Name of Task')
?
In this case there is a single task that is called multiple times, with different arguments each time. Hoping to make it easier to visualize by having descriptive names for each task run.