Abhishek
09/11/2021, 9:09 AMwith Flow(
name=flow_name,
# schedule=schedule,
state_handlers=[slack_notifier(only_states=[Success]), flow_state_handler],
) as flow:
---some logic---
for sync_type, cmd_parameter in sync_commands:
---some logic --
sync_command = (
f"{command_prefix} {source_bucket} {destination_bucket} {options}"
)
s3_sync_task = ShellTask(
command=sync_command,
return_all=True,
name=task_id,
slug=task_id,
timeout=timedelta(minutes=task_execution_timeout),
)()
# Create a task to validate sync commands.
validate_s3_sync_task = FunctionTask(
fn=validate_s3_sync,
name="".join(["validate_", task_id]),
slug="".join(["validate_", task_id]),
)(
timeout=timedelta(minutes=task_execution_timeout),
s3_sync_command=sync_command,
)
# Define dependencies between tasks.
s3_sync_task.set_downstream(validate_s3_sync_task)
# flow.run()
flow.register(project_name="demo-project", labels=tags)
Jenny
09/11/2021, 1:01 PMAbhishek
09/11/2021, 1:03 PM--no-hostname-label
so the agent label was empty whereas the flow had many tags.
so this was the issue.Abhishek
09/11/2021, 1:03 PMJenny
09/11/2021, 1:05 PM