Jon Young
01/11/2023, 2:57 PMMatthew Scanlon
01/11/2023, 3:33 PMeddy davies
01/11/2023, 3:56 PMJason Noxon
01/11/2023, 4:04 PMIlya Galperin
01/11/2023, 4:28 PMBrokenPipeError: [Errno 32] Broken pipe
crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of these tasks will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!Jeremiah
01/11/2023, 5:29 PMjack
01/11/2023, 5:41 PMYaron Levi
01/11/2023, 5:53 PMFuETL
01/11/2023, 6:20 PMcase
with flow when is true
and when is false
But i noticed that when the case is false
the task_b()
is always SKIPPED.
After some struggling i figure out that the reason that the task was being SKIPPED was because inside the case true
i'm changing the task_result variable (if i rename the task run on false cond).
Is this the desired behaviour?
from prefect import task, Flow, case
@task()
def task_a():
print("task_a")
return "task_a"
@task()
def task_condition():
return False
@task()
def task_b(value: str):
print("task_b")
return f"task_b={value}"
@task()
def task_c():
print("task_c")
with Flow("test-flow") as flow:
task_result = task_a()
cond = task_condition()
with case(cond, True):
task_result = task_b(task_result)
task_c()
with case(cond, False):
task_result = task_b(task_result)
flow.run()
Ilya Galperin
01/11/2023, 6:29 PMBrokenPipeError: [Errno 32]
Broken pipe crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of them will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!Jeremiah
01/11/2023, 6:38 PMAnna Geller
01/11/2023, 6:56 PMAlexander Kloumann
01/11/2023, 7:10 PMJeff Hale
01/11/2023, 7:30 PMeddy davies
01/11/2023, 9:33 PM/Users/macbook/.pyenv/versions/3.10.5/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
Aaron Goebel
01/11/2023, 10:27 PMdeployments
dynamically at run time. I already have a DAG defined with how the deployments should be chained, and I also determine that some outputs of deployment runs should be used as input to other run_deployment
invocations.
I'd like to use Prefect for the scheduling of everything, so my approach has been to wrap the run_deployment
in a task
. e.g.:
@task
async def run_deployment_task(depl_id: str, parameters: dict):
client = await get_client()
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
So, given a set of deployments, their dependencies, and their parameters, I basically want to run something like this:
@flow
async def run_deployments_flow(graph: dict):
results = {}
# create tasks
tasks = {name: run_deployment_task.submit(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in graph.items()}
# set dependencies
for flow_name, flow_params in graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].wait_for(tasks[dependency])
#await all tasks
In this way, the taskrunner deals with scheduling, managing concurrency etc.
HOWEVER. My issue is that the output of some task runs needs piping into downstream tasks. This doesn't work natively with tasks, so with this approach I'd need to deal with the looping, polling of task status, etc. manually.
Is there any way I can use prefect to get this kind of desired result?
tldr: want to dynamically chain deployments together where outptus of one may be inputs of another, and I want prefect manage the orchestration of it all.YD
01/11/2023, 10:37 PMKhyaati Jindal
01/12/2023, 8:10 AMkent
01/12/2023, 8:13 AM@task
def my_favorite_function():
raise ValueError("This flow immediately fails")
@task
def one_return():
return 1
@task
def tow_return():
return 2
@flow
def run_flow():
my_favorite_function()
one_return()
tow_return()
I want to PEND or skip an upcoming schedule.
I read the documentation but couldn't figure out how to implement the specifics.Tim Galvin
01/12/2023, 8:21 AMif 'PATH' in os.environ.keys()
is there a way to test the names of blocks that have been stored? Is there a best practise use case here? Or is it just a case one has to catch the error if it does not existPB
01/12/2023, 8:45 AMTorstein Molland
01/12/2023, 9:59 AMJames Zhang
01/12/2023, 10:03 AMrun_deployment(flow_run_name="xxx")
because I want to start the flow run on the UI…Nimesh Kumar
01/12/2023, 10:58 AMJo Tryti
01/12/2023, 12:22 PMAnkit Choudhary
01/12/2023, 12:25 PMDenis
01/12/2023, 2:08 PMSean Talia
01/12/2023, 2:22 PMStartFlowRun
tasks at runtime? My scenario is that I want a flow (call it orchestratorFlow
) to orchestrate the execution of several instances of another flow that I have (call that one ecsFlow
). The issue at hand is, I don't know how many instances of my ecsFlow
I need to kick off until orchestratorFlow
runtime; it could be 1, 8, or 50+ (each flow run of ecsFlow
would be run with a different set of parameters), and would be determined based on a parameter passed to orchestratorFlow
.
I've tried to follow the Prefect v1 map paradigm by doing StartFlowRun.map([<config_parameters>])
, where [<config_parameters>]
is a dynamically generated list of flow run kwargs returned by an upstream task, but that's not doing the trick. Thank you for any help!Paul Lucas
01/12/2023, 3:16 PMMrityunjoy Das
01/12/2023, 9:42 AMMrityunjoy Das
01/12/2023, 9:42 AMChristopher Boyd
01/12/2023, 3:25 PM