Grant Cloud
01/10/2023, 6:50 PMjack
01/10/2023, 8:47 PMAlla Polisskaya
01/10/2023, 10:10 PMFinished in state Failed('Flow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.\n')
at the end. Manually retrying the flow succeeds, but I do have retries=3
in the flow
decorator and the flow doesn't appear to be retriedSlackbot
01/11/2023, 12:32 AMAaron Goebel
01/11/2023, 1:40 AMcreate_flow_run_from_deployment
/ run_deployment
APIs to compose my deployments on http request. E.g. I get a payload defining a set of flows to chain together, that triggers a parent container flow, and that container flow does the orchestration of tying things together.
The way I'm thinking of running it, because there are dependencies, is to have the run_deployment
code wrapped in a task like this :
@task
async def run_deployment(depl_id: str, parameters: dict):
async with prefect.context(**prefect.context.run_params):
async with prefect.Client() as client:
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
and then in an orchestration flow I think I want to do something like this where I create these tasks for each deployment.
@flow
async def container_flow(flow_graph: dict):
results = {}
ordered_flows = order(flow_graph)
# create tasks
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in flow_graph.items()}
# set dependencies
for flow_name, flow_params in flow_graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].set_upstream(tasks[dependency], flow=True)
# run tasks concurrently
flow_results = await prefect.engine.run(tasks, return_tasks=True)
# store results
for flow_name, flow_result in flow_results.items():
results[flow_name] = flow_result.result
return results
Aaron Goebel
01/11/2023, 2:20 AMrun_deployment
orchestrator pattern to chain together deployments dynamically. Some of these deployments depend on others. I've attempted to finagle a way around this by wrapping run_deployment
in a task as such:
@task
async def run_deployment(depl_id: str, parameters: dict):
async with prefect.context(**prefect.context.run_params):
async with prefect.Client() as client:
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
I then create tasks as such
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in graph.items()}
and set their dependencies:
# set dependencies
for flow_name, flow_params in graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].set_upstream(tasks[dependency], flow=True)
the goal here is to dogfood the taskrunner to chain these together and kick off the flow with something like
await task_runner.submit(tasks)
Two issues I see with this though:
1. Some of the parameters
to downstream create_deployment
runs are derived from upstream deployment runs. I wonder if anyone knows an elegant way of doing that?
2. await task_runner.submit(tasks)
would actually work as anticipated?wonsun
01/11/2023, 6:43 AMwonsun
01/11/2023, 7:22 AMRuntimeWarning: coroutine 'Block.load' was never awaited
secret_block = Secret.load('3f')
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
• So I make a new secret block and that's also don't working with below error message..
C:\Users\user\AppData\Local\Temp\ipykernel_12716\3323813457.py:3: RuntimeWarning: coroutine 'Block.load' was never awaited
secret_block = Secret.load("new-net")
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In [27], line 6
3 secret_block = Secret.load("new-net")
5 # Access the stored secret
----> 6 secret_block.get()
AttributeError: 'coroutine' object has no attribute 'get'
Is there are happen to trouble by new network?
Thanks for your reply.Stéphan Taljaard
01/11/2023, 9:18 AMBBPP
01/11/2023, 9:35 AMVadym Dytyniak
01/11/2023, 10:01 AMState data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Will Truong
01/11/2023, 10:02 AMMicrosoft Windows [Version 10.0.22621.963]
(c) Microsoft Corporation. All rights reserved.
D:\Athena\etl>prefect agent start "work_queue_for_test"
Agents now support multiple work queues. Instead of passing a single argument, provide work queue names with the `-q` or`--work-queue` flag: `prefect agent start -q work_queue_for_test`
Starting v2.7.4 agent with ephemeral API...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): work_queue_for_test...
16:18:20.487 | INFO | prefect.agent - Submitting flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22'
16:18:20.604 | INFO | prefect.infrastructure.process - Opening process 'beta736-bajor'...
'C:\Users\Will' is not recognized as an internal or external command,
operable program or batch file.
16:18:20.622 | ERROR | prefect.infrastructure.process - Process 'beta736-bajor' exited with status code: 1
16:18:20.651 | INFO | prefect.agent - Completed submission of flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22'
16:18:20.668 | INFO | prefect.agent - Reported flow run 'fd08f02a-18ca-4538-bdf1-728dc7e88e22' as crashed: Flow run infrastructure exited with non-zero status code 1.
Satyasheel
01/11/2023, 10:31 AMmax
01/11/2023, 10:46 AMTim-Oliver
01/11/2023, 12:39 PMKirill Egorov
01/11/2023, 1:42 PMJason Noxon
01/11/2023, 2:26 PMJohn-Craig Borman
01/11/2023, 2:30 PMprefect.deployments.run_deployment()
function:
If I deploy a flow without specifying a deployment name, how can I use run_deployment()
to trigger a flow run? Would it just be run_deployment(name='flow_name')
?
related docs: https://docs.prefect.io/concepts/deployments/#create-a-flow-run-in-a-python-scriptDavid Elliott
01/11/2023, 2:43 PMJon
01/11/2023, 2:57 PMMatthew Scanlon
01/11/2023, 3:33 PMeddy davies
01/11/2023, 3:56 PMprefect-flow.py
file that imports my class. Should I create an instance in the flow and pass to each task? Or create a class instance in each task?Jason Noxon
01/11/2023, 4:04 PMIlya Galperin
01/11/2023, 4:28 PMBrokenPipeError: [Errno 32] Broken pipe
crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of these tasks will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!Jeremiah
jack
01/11/2023, 5:41 PMYaron Levi
01/11/2023, 5:53 PMFuETL
01/11/2023, 6:20 PMcase
with flow when is true
and when is false
But i noticed that when the case is false
the task_b()
is always SKIPPED.
After some struggling 🥹 i figure out that the reason that the task was being SKIPPED because inside the case true
i'm changing the task_result
variable (if i rename the task run on false cond).
Is this the desired behaviour? Why is like that? Is there a way to bypass without renaming?
from prefect import task, Flow, case
@task()
def task_a():
print("task_a")
return "task_a"
@task()
def task_condition():
return False
@task()
def task_b(value: str):
print("task_b")
return f"task_b={value}"
@task()
def task_c():
print("task_c")
with Flow("test-flow") as flow:
task_result = task_a()
cond = task_condition()
with case(cond, True):
task_result = task_b(task_result)
task_c()
with case(cond, False):
task_result = task_b(task_result)
flow.run()
Ilya Galperin
01/11/2023, 6:29 PMBrokenPipeError: [Errno 32]
Broken pipe crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of them will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!Jeremiah