Irvin Tang
05/27/2021, 5:28 PMdef test_register_flow_docker(self):
register_flow(
flow=flow_docker,
flow_name="test",
project_name="test-project",
labels=["test-label"],
host_type="docker"
)
config = get_config("docker")
with set_temporary_config(config):
runner = CliRunner()
result = runner.invoke(get, ["flows"])
assert result.exit_code == 0
assert (
"test-docker" in result.output
and "test-project" in result.output
)
I was wondering if there was another way for testing flow registration? I want to be able to register the flow and the test should confirm that the flow successfully reigsteredKevin Kho
Kevin Kho
Irvin Tang
05/27/2021, 6:05 PMKevin Kho
client = Client()
begin_date = date.today() - timedelta(days=days)
query = """
query {
flow_run(where: { end_time: {_gt: \"""" + str(begin_date) + """\"} }) {
id
name
state
end_time
}
}
"""
result = json.loads(client.graphql(query).to_json())
result = result["data"]["flow_run"]
Kevin Kho
flow.register()
returns a flow_id, so the query you need would just be to check for the existence of that flow_idKevin Kho
query {
flow (where: { id: {_eq: "inser_id_here"} }) {
id
name
}
}
Kevin Kho
Irvin Tang
05/27/2021, 7:28 PMIrvin Tang
06/03/2021, 7:40 PMKevin Kho