Michael
09/10/2021, 8:08 AMThomas Fredriksen
09/10/2021, 8:10 AMTerawat T
09/10/2021, 8:50 AMDEBUG:agent:Querying for ready flow runs...
[xx] DEBUG - agent | Creating namespaced job prefect-job-c4a440b5
DEBUG:agent:Creating namespaced job prefect-job-c4a440b5
[xx] DEBUG - agent | Job prefect-job-c4a440b5 created
DEBUG:agent:Job prefect-job-c4a440b5 created
[xx] INFO - agent | Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
INFO:agent:Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
Nacho Rodriguez
09/10/2021, 8:53 AMTomoyuki NAKAMURA
09/10/2021, 8:55 AMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services \"foobar\" already exists","reason":"AlreadyExists","details":{"name":"foobar","kind":"services"},"code":409}
It seems that the error occurs when I try to create a service with the same name. How can I rerun the flow when using DaskExecutor?Abhishek
09/10/2021, 9:58 AMprefect.utilities.notifications.notifications.slack_notifier()
) as a state handler for Flow?Jan Vlčinský
09/10/2021, 10:31 AMResourceManager
`map`ped to list of days.
We have a task, archiving data for one day. The task is using temporary dir (mapped to shared memory) and we use ResourceManager to cleanup the tmp dir when a day is processed (there is day subdir per processed day).
Now we want to run the same for list of days and we struggle with mapping days to resource manager.Marc
09/10/2021, 12:43 PMBrad I
09/10/2021, 12:45 PMdefault
and was wondering if anyone ran into the same issue. Example executor code is in the thread.Filip Lindvall
09/10/2021, 1:01 PMNikolay
09/10/2021, 1:32 PMKyle McChesney
09/10/2021, 2:52 PMLATEST_FLOW_BY_NAME = gql(
'''
query LatestFlowByName($name: String) {
flow(
where: {name: {_eq: $name}},
order_by: {version: desc},
limit: 1,
)
{
id
}
}
''',
)
with: variable_values={'name': name}
• trigger a flow run
CREATE_FLOW_RUN = gql(
'''
mutation CreateFlowRun($input: create_flow_run_input!) {
create_flow_run(input: $input) {id}
}
''',
)
with:
variable_values={
'input': {
'flow_id': flow_id,
'parameters': parameters,
},
},
My goal here is to specify as little as possible, to ensure the flow runs with the defaults configured for it. I.E. I don’t want to muck with the RunConfig, etc. I am mostly concerned about my logic for getting the latest flow run. I tried to grok the flow groups / versions tables but did not have much luck.Ben Sack
09/10/2021, 3:00 PMflow_runs
by parameter by using something like: flow_runs(where:{parameters: {_has_key: "process_date"}}
but what I would like to do is filter the query for any flows that ran on a specific process date, such as, 8/17/2021, rather than filtering for flows that have the process_date
parameter. Thanks!Martim Lobao
09/10/2021, 4:06 PMperson-build
in the schematic view, but that just takes me to the local (“zoomed-in”) view of the task itself, i can’t see the DAG for the person-build flow run. the only workaround I’ve found is to inspect the logs and copy the link to the flow run from there.
am i missing something obvious or is there no easy way to access a subflow’s details?
this is basically our setup:
person_build_flow = StartFlowRun(flow_name="person-build", project_name=get_stage(), wait=True)
release_flow = StartFlowRun(flow_name="release", project_name=get_stage(), wait=True)
with Flow(
"build-then-release",
executor=LocalDaskExecutor(num_workers=8),
result=PrefectResult(),
) as flow:
release_flow(upstream_tasks=[person_build_flow])
Pedro Machado
09/10/2021, 4:12 PMPedro Machado
09/10/2021, 4:14 PMCharles Liu
09/10/2021, 5:24 PMDavid Jenkins
09/10/2021, 7:08 PMdef handler(task, old_state, new_state):
if new_state.is_failed():
// do stuff here
return Skipped()
return new_state
the consequence of this is that the flow finishes with a SUCCESS state, even though a task actually failed, but I need to show the flow as FAILED. To solve this, I created a new flow that calls a different state handler and checks its state. If the state is skipped, I return Failed() from the handler. Something like this:
@task(name="final check", state_handlers=[final_check_handler])
def final_check():
pass
and the state handler
def final_check_handler(task, old_state, new_state):
if new_state.is_skipped():
return Failed()
This accomplishes what I want, but surely there has to be a better way.Henry
09/10/2021, 8:56 PMAn Hoang
09/10/2021, 8:57 PMflow_result = flow.run()
and visualized it with state. The log says Succeed: all reference tasks succeeded
but I can see a failure right there in the stateful DAG. Also it doesn't show the full DAG with the downstream tasks!!!
I have also attached the output of flow_result.result
and it doesn't show the get_duplicated_pairs_info
task. Any tips on what went wrong and how I can debug from here? (Full log in comment)Trevor Campbell
09/11/2021, 12:33 AMtask_run
is what is causing a huge amount of overhead. I would expect context creation to be very light weight... more details in the thread below.Frederick Thomas
09/11/2021, 12:51 AMKen Nguyen
09/11/2021, 1:48 AMAbhishek
09/11/2021, 7:50 AMAbhishek
09/11/2021, 7:51 AMAbhishek
09/11/2021, 8:09 AMAbhishek
09/11/2021, 8:11 AMflow.run()
in code but triggering from UI is not working 🧐Abhishek
09/11/2021, 8:28 AMAbhishek
09/11/2021, 9:09 AMwith Flow(
name=flow_name,
# schedule=schedule,
state_handlers=[slack_notifier(only_states=[Success]), flow_state_handler],
) as flow:
---some logic---
for sync_type, cmd_parameter in sync_commands:
---some logic --
sync_command = (
f"{command_prefix} {source_bucket} {destination_bucket} {options}"
)
s3_sync_task = ShellTask(
command=sync_command,
return_all=True,
name=task_id,
slug=task_id,
timeout=timedelta(minutes=task_execution_timeout),
)()
# Create a task to validate sync commands.
validate_s3_sync_task = FunctionTask(
fn=validate_s3_sync,
name="".join(["validate_", task_id]),
slug="".join(["validate_", task_id]),
)(
timeout=timedelta(minutes=task_execution_timeout),
s3_sync_command=sync_command,
)
# Define dependencies between tasks.
s3_sync_task.set_downstream(validate_s3_sync_task)
# flow.run()
flow.register(project_name="demo-project", labels=tags)
Martim Lobao
09/12/2021, 9:52 AMMartim Lobao
09/12/2021, 9:52 AMKevin Kho
09/13/2021, 3:03 PMMartim Lobao
09/13/2021, 3:05 PMINFO
martim_peopledatalabs_com restarted this flow run
INFO
martim_peopledatalabs_com restarted this flow run
INFO agent
Submitted for execution: Task arn:aws:ecs:us-west-2:5567...
INFO GitHub
Downloading flow from GitHub storage - repo: 'peopledatalabs/prefect', path: 'src/pdlapps/orchestration/flows/build_then_release.py', ref: 'prefect-testing'
INFO GitHub
Flow successfully downloaded. Using commit: bac24...
INFO CloudFlowRunner
Beginning Flow run for 'build-then-release'
INFO CloudTaskRunner
Task 'Flow person-build': Starting task run...
INFO Flow person-build
Flow Run: <https://cloud.prefect.io/pdl/flow-run/5133d6be>...
INFO CloudTaskRunner
FAIL signal raised: FAIL('5133d6be... finished in state <Failed: "Some reference tasks failed.">')
flow setup:
person_build_flow = StartFlowRun(flow_name="person-build", project_name=get_stage(), wait=True)
release_flow = StartFlowRun(flow_name="release", project_name=get_stage(), wait=True)
with Flow(
"build-then-release",
executor=LocalDaskExecutor(num_workers=8),
result=PrefectResult(),
state_handlers=[slack_notifier, terminate_on_cancel],
) as flow:
release_flow(upstream_tasks=[person_build_flow])
Kevin Kho
09/13/2021, 3:09 PMMartim Lobao
09/13/2021, 3:11 PMKevin Kho
09/13/2021, 3:16 PMStartFlowRun
has an idempotency key by default. The idempotency key will try to start a new flow run, but because there was already an existing run with the same idempotency key so it won’t restart that sub flow. The subflow needs to be individually restartedMartim Lobao
09/13/2021, 3:19 PM