def test_register_flow_docker(self):
register_flow(
flow=flow_docker,
flow_name="test",
project_name="test-project",
labels=["test-label"],
host_type="docker"
)
config = get_config("docker")
with set_temporary_config(config):
runner = CliRunner()
result = runner.invoke(get, ["flows"])
assert result.exit_code == 0
assert (
"test-docker" in result.output
and "test-project" in result.output
)
I was wondering if there was another way for testing flow registration? I want to be able to register the flow and the test should confirm that the flow successfully reigstered
k
Kevin Kho
05/27/2021, 6:01 PM
Hi @Irvin Tang! I feel like in order to actually test a successful registration, we might need to use the GraphQL API and check for it. Would that work for you?
👍 1
Kevin Kho
05/27/2021, 6:02 PM
I’ll make a query for you in a bit. Just have a meeting first.
i
Irvin Tang
05/27/2021, 6:05 PM
would I be able to execute the query via python script or would it have to be through the graphql client?
k
Kevin Kho
05/27/2021, 6:13 PM
You can do something like this:
Copy code
client = Client()
begin_date = date.today() - timedelta(days=days)
query = """
query {
flow_run(where: { end_time: {_gt: \"""" + str(begin_date) + """\"} }) {
id
name
state
end_time
}
}
"""
result = json.loads(client.graphql(query).to_json())
result = result["data"]["flow_run"]
Kevin Kho
05/27/2021, 7:25 PM
Ok so
flow.register()
returns a flow_id, so the query you need would just be to check for the existence of that flow_id
🙌 1
Kevin Kho
05/27/2021, 7:25 PM
Copy code
query {
flow (where: { id: {_eq: "inser_id_here"} }) {
id
name
}
}
Kevin Kho
05/27/2021, 7:25 PM
And check if you get something
i
Irvin Tang
05/27/2021, 7:28 PM
awesome, thanks so much Kevin. will rework the tests and get back to you on results
Irvin Tang
06/03/2021, 7:40 PM
hey Kevin, the client works perfectly for what we’re trying to achieve. thanks again!
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.