Tomoyuki NAKAMURA
09/10/2021, 8:55 AMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services \"foobar\" already exists","reason":"AlreadyExists","details":{"name":"foobar","kind":"services"},"code":409}
It seems that the error occurs when I try to create a service with the same name. How can I rerun the flow when using DaskExecutor?Abhishek
09/10/2021, 9:58 AMprefect.utilities.notifications.notifications.slack_notifier()
) as a state handler for Flow?Jan Vlčinský
09/10/2021, 10:31 AMResourceManager
`map`ped to list of days.
We have a task, archiving data for one day. The task is using temporary dir (mapped to shared memory) and we use ResourceManager to cleanup the tmp dir when a day is processed (there is day subdir per processed day).
Now we want to run the same for list of days and we struggle with mapping days to resource manager.Marc
09/10/2021, 12:43 PMBrad I
09/10/2021, 12:45 PMdefault
and was wondering if anyone ran into the same issue. Example executor code is in the thread.Filip Lindvall
09/10/2021, 1:01 PMNikolay
09/10/2021, 1:32 PMKyle McChesney
09/10/2021, 2:52 PMLATEST_FLOW_BY_NAME = gql(
'''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
''',
)
with: variable_values={'name': name}
• trigger a flow run
CREATE_FLOW_RUN = gql(
'''
mutation CreateFlowRun($input: create_flow_run_input!) {
create_flow_run(input: $input) {id}
}
''',
)
with:
variable_values={
'input': {
'flow_id': flow_id,
'parameters': parameters,
},
},
My goal here is to specify as little as possible, to ensure the flow runs with the defaults configured for it. I.E. I don’t want to muck with the RunConfig, etc. I am mostly concerned about my logic for getting the latest flow run. I tried to grok the flow groups / versions tables but did not have much luck.Ben Sack
09/10/2021, 3:00 PMflow_runs
by parameter by using something like: flow_runs(where:{parameters: {_has_key: "process_date"}}
but what I would like to do is filter the query for any flows that ran on a specific process date, such as, 8/17/2021, rather than filtering for flows that have the process_date
parameter. Thanks!Martim Lobao
09/10/2021, 4:06 PMperson-build
in the schematic view, but that just takes me to the local (“zoomed-in”) view of the task itself, i can’t see the DAG for the person-build flow run. the only workaround I’ve found is to inspect the logs and copy the link to the flow run from there.
am i missing something obvious or is there no easy way to access a subflow’s details?
this is basically our setup:
person_build_flow = StartFlowRun(flow_name="person-build", project_name=get_stage(), wait=True)
release_flow = StartFlowRun(flow_name="release", project_name=get_stage(), wait=True)
with Flow(
"build-then-release",
executor=LocalDaskExecutor(num_workers=8),
result=PrefectResult(),
) as flow:
release_flow(upstream_tasks=[person_build_flow])
Pedro Machado
09/10/2021, 4:12 PMPedro Machado
09/10/2021, 4:14 PMCharles Liu
09/10/2021, 5:24 PMDavid Jenkins
09/10/2021, 7:08 PMdef handler(task, old_state, new_state):
if new_state.is_failed():
// do stuff here
return Skipped()
return new_state
the consequence of this is that the flow finishes with a SUCCESS state, even though a task actually failed, but I need to show the flow as FAILED. To solve this, I created a new flow that calls a different state handler and checks its state. If the state is skipped, I return Failed() from the handler. Something like this:
@task(name="final check", state_handlers=[final_check_handler])
def final_check():
pass
and the state handler
def final_check_handler(task, old_state, new_state):
if new_state.is_skipped():
return Failed()
This accomplishes what I want, but surely there has to be a better way.Henry
09/10/2021, 8:56 PMAn Hoang
09/10/2021, 8:57 PMflow_result = flow.run()
and visualized it with state. The log says Succeed: all reference tasks succeeded
but I can see a failure right there in the stateful DAG. Also it doesn't show the full DAG with the downstream tasks!!!
I have also attached the output of flow_result.result
and it doesn't show the get_duplicated_pairs_info
task. Any tips on what went wrong and how I can debug from here? (Full log in comment)Trevor Campbell
09/11/2021, 12:33 AMtask_run
is what is causing a huge amount of overhead. I would expect context creation to be very light weight... more details in the thread below.Frederick Thomas
09/11/2021, 12:51 AMKen Nguyen
09/11/2021, 1:48 AMAbhishek
09/11/2021, 7:50 AMAbhishek
09/11/2021, 7:51 AMAbhishek
09/11/2021, 8:09 AMAbhishek
09/11/2021, 8:11 AMflow.run()
in code but triggering from UI is not working 🧐Abhishek
09/11/2021, 8:28 AMAbhishek
09/11/2021, 9:09 AMwith Flow(
name=flow_name,
# schedule=schedule,
state_handlers=[slack_notifier(only_states=[Success]), flow_state_handler],
) as flow:
---some logic---
for sync_type, cmd_parameter in sync_commands:
---some logic --
sync_command = (
f"{command_prefix} {source_bucket} {destination_bucket} {options}"
)
s3_sync_task = ShellTask(
command=sync_command,
return_all=True,
name=task_id,
slug=task_id,
timeout=timedelta(minutes=task_execution_timeout),
)()
# Create a task to validate sync commands.
validate_s3_sync_task = FunctionTask(
fn=validate_s3_sync,
name="".join(["validate_", task_id]),
slug="".join(["validate_", task_id]),
)(
timeout=timedelta(minutes=task_execution_timeout),
s3_sync_command=sync_command,
)
# Define dependencies between tasks.
s3_sync_task.set_downstream(validate_s3_sync_task)
# flow.run()
flow.register(project_name="demo-project", labels=tags)
Martim Lobao
09/12/2021, 9:52 AMBishwarup B
09/12/2021, 12:20 PMmap
to parallelise over the lat-lon pairs but i have a couple of questions here:
1. I know if i use a DaskExecutor
or LocalDaskExecutor
the flow is distributed, but is there any limit to applying map
over such a large collection?
2. instead of using threads to run the computation, is it possible to make use of async
as the tasks (most of them) are heavily IO bound?
What are some considerations I should make here? Thanks!Oliver Mannion
09/12/2021, 12:28 PMNadav Nuni
09/12/2021, 2:32 PMJacob Blanco
09/13/2021, 2:57 AM