Stephen Lloyd
04/18/2022, 4:22 AMLeanna Morinishi
04/18/2022, 6:07 AMany_successful
for several upstream tasks. When I write it like below, task_5
doesn’t fail, even if task_3
and task_4
fail. Is it because the creation of the list [task_3, task_4]
succeeds? How should I write this flow instead? Many thanks!
task_3 = my_task(
"input1", task_args=dict(name="input1")
).set_upstream(task_1)
task_4 = my_task(
"input2", task_args=dict(name="input2")
).set_upstream(task_2)
task_5 = my_task("all_inputs",
task_args=dict(name="all_inputs", trigger=any_successful),
).set_upstream([task_3, task_4])
Amir Shur
04/18/2022, 11:19 AMMatthew Seligson
04/18/2022, 12:44 PMXavier Babu
04/18/2022, 3:45 PMJack Sundberg
04/18/2022, 4:42 PM@flow
decorator, I'm guessing this will be possible, but don't see any docs on it.Dylan
04/18/2022, 6:32 PMDylan
04/18/2022, 6:32 PM[{
"data": {
"set_schedule_active": {
"success": false,
"__typename": "success_payload"
}
}
}]
Shuchita Tripathi
04/18/2022, 7:12 PMdef task1():
@task
def run_terraform_lt():
tf = Terraform(working_dir="law_tf")
tf.init()
tf.apply()
task2.py
def task2():
@task
def run_terraform_rt():
tf = Terraform(working_dir="rt_tf")
tf.init()
tf.apply()
The input is similar to this:
{
"task1": {
"id": "foo",
"task_name": "task1"
},
"task2": {
"id": "bar",
"task_name": "task2",
}
}
I am getting the task name from this dictionary. Based on the value of "task_name", I have to create a flow combining all tasks.
I am creating a flow where I am trying to add them, but the tasks are not getting added in the flow. Anyone has any idea on how this can be achieved? Here is the snippet of my flow creation code. It is inside a for loop through which I am extracting the task name and other variables.Fina Silva-Santisteban
04/18/2022, 8:54 PMwith Flow('Parent Flow') as flow:
dates = ['2021-01-04', '2021-01-05', '2021-01-06'] // a list of dates
for i in range(len(dates)):
//run child flow with i as parameter
When I check the Prefect UI it seems like the child flows are running in parallel (I’m using threads) which is in general great but in this case I’d like the child flows to be run sequentially. Is there a way to force the parent flow to run them that way?? 🤔RAISS Zineb
04/18/2022, 11:39 PMJacob Blanco
04/19/2022, 4:23 AMOmar Sultan
04/19/2022, 8:13 AMSergey Gerasimov
04/19/2022, 8:24 AMJoshua Greenhalgh
04/19/2022, 12:31 PMMini Khosla
04/19/2022, 1:31 PMJoshua Greenhalgh
04/19/2022, 1:33 PMDekel R
04/19/2022, 2:11 PMPedro Machado
04/19/2022, 2:20 PMwait_for_flow_run
to wait for a subflow run. The subflow is a long-running job that can take more than 12 hours and sometimes it times out.
I noticed that wait_for_flow_run
uses watch_flow_run
which raises an exception if the flow has been running for more than 12 hours. The 12 hours timeout is hardcoded.
See https://github.com/PrefectHQ/prefect/blob/afda99411f91582ad187bf33671268d8d3c3c2c0/src/prefect/backend/flow_run.py#L95
We plan to upgrade to 2.0 shortly after it's released. Is this timeout limitation for subflows also implemented in Orion?Hugo Shi
04/19/2022, 2:42 PMShuchita Tripathi
04/19/2022, 3:24 PMwiretrack
04/19/2022, 3:34 PMPrasanth Kothuri
04/19/2022, 3:51 PMTask 'copy_from_s3_to_sftp': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "flows/k8s/my_flow_name.py", line 46, in copy_from_s3_to_sftp
SystemError: unknown opcode
Adi Gandra
04/19/2022, 3:58 PMprefect agent kubernetes install -k {key} --mem-request 4G --mem-limit 6G --cpu-request 2 --rbac | kubectl apply -f -
Nothing seems to happen
I just get the message:
deployment.apps/prefect-agent configured
<http://role.rbac.authorization.k8s.io/prefect-agent-rbac|role.rbac.authorization.k8s.io/prefect-agent-rbac> unchanged
<http://rolebinding.rbac.authorization.k8s.io/prefect-agent-rbac|rolebinding.rbac.authorization.k8s.io/prefect-agent-rbac> unchanged
Any idea’s on how to successfully upgrade my prefect agent?Chris Reuter
04/19/2022, 4:28 PMPhilip MacMenamin
04/19/2022, 7:41 PMupstream_tasks
on a ShellTask
This works:
brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
brt_commands_logged = log(item=brt_commands, desc="BRT commands")
brts = shell_task.map(
command=brt_commands, upstream_tasks=[tomogram_fps]
)
This fails:
brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
brt_commands_logged = log(item=brt_commands, desc="BRT commands")
brts = shell_task.map(
command=brt_commands, upstream_tasks=[tomogram_fps, brt_commands_logged]
)
Philip MacMenamin
04/19/2022, 8:12 PMexport PREFECT__LOGGING__LEVEL=INFO
I see:
python3 tmp/shell_task.py
[2022-04-19 21:09:30+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
[2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'MyShellTask': Starting task run...
[2022-04-19 21:09:30+0100] INFO - prefect.MyShellTask | lsto echo
[2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'MyShellTask': Finished task run for task with final state: 'Success'
[2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'problem': Starting task run...
[2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Oh no!')
[2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'problem': Finished task run for task with final state: 'Failed'
[2022-04-19 21:09:30+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
If I
export PREFECT__LOGGING__LEVEL=ERROR
I see nothing. Ideally I'd like to only see messages about broken stuff. Ideas?Jason
04/19/2022, 10:53 PMMakefile
would spin up a few Docker containers that one could test flows on. How can I abstract between a local and ECS agent in my flows to allow something like a env
var to swap between?Jai P
04/20/2022, 12:58 AMJosh
04/20/2022, 3:21 AMError during execution of task: ClientError([{'path': ['create_task_run_artifact'], 'message': 'Task run <task_run_id> not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Josh
04/20/2022, 3:21 AMError during execution of task: ClientError([{'path': ['create_task_run_artifact'], 'message': 'Task run <task_run_id> not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Kevin Kho
04/20/2022, 3:23 AMJosh
04/20/2022, 5:09 PMKevin Kho
04/20/2022, 5:11 PMtask.run()
inside a task
inside a Flow
? Or just a task.run not in a Flow?Josh
04/20/2022, 5:16 PMclass Task():
def run():
# do something
create_markdown()
task = Task()
with Flow("my flow") as Flow:
task()
Kevin Kho
04/20/2022, 5:17 PMJosh
04/20/2022, 10:07 PMKevin Kho
04/20/2022, 10:20 PMfrom prefect import Task, Flow
from prefect.backend.artifacts import create_markdown_artifact
class MyTask(Task):
def run(self, x):
create_markdown_artifact("# Heading\n\nText with [link](<https://www.prefect.io/>).")
mytask = MyTask()
with Flow("artifact_test") as flow:
mytask.map(list(range(100)))
flow.register("databricks")
Josh
04/21/2022, 6:58 PMKevin Kho
04/21/2022, 6:59 PMVinny Tunnell
05/15/2022, 10:48 PMcreate_markdown_artifact
. This was working fine for us for the last few weeks, but this error was thrown multiple times yesterday. We are using LocalDaskExecutor with a Kubernetes agent.