Thread
#prefect-community
    i

    Irvin Tang

    1 year ago
    Hi there, I’m currently testing a function that registers a flow to a specified environment like so (got inspiration from this: https://github.com/PrefectHQ/prefect/blob/master/tests/cli/test_get.py)
    def test_register_flow_docker(self):
            register_flow(
                flow=flow_docker,
                flow_name="test",
                project_name="test-project",
                labels=["test-label"],
                host_type="docker"
            )
    
            config = get_config("docker")
            with set_temporary_config(config):
                runner = CliRunner()
                result = runner.invoke(get, ["flows"])
                assert result.exit_code == 0
                assert (
                    "test-docker" in result.output
                    and "test-project" in result.output
                )
    I was wondering if there was another way for testing flow registration? I want to be able to register the flow and the test should confirm that the flow successfully reigstered
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Irvin Tang! I feel like in order to actually test a successful registration, we might need to use the GraphQL API and check for it. Would that work for you?
    I’ll make a query for you in a bit. Just have a meeting first.
    i

    Irvin Tang

    1 year ago
    would I be able to execute the query via python script or would it have to be through the graphql client?
    Kevin Kho

    Kevin Kho

    1 year ago
    You can do something like this:
    client = Client()
    
    begin_date = date.today() - timedelta(days=days)
    query = """
                query { 
                    flow_run(where: { end_time: {_gt: \"""" + str(begin_date) + """\"} }) {
                    id
                    name
                    state
                    end_time
                    }
                }
                """
    result = json.loads(client.graphql(query).to_json())
    result = result["data"]["flow_run"]
    Ok so
    flow.register()
    returns a flow_id, so the query you need would just be to check for the existence of that flow_id
    query { 
                    flow (where: { id: {_eq: "inser_id_here"} }) {
                    id
                    name
                    }
                }
    And check if you get something
    i

    Irvin Tang

    1 year ago
    awesome, thanks so much Kevin. will rework the tests and get back to you on results
    hey Kevin, the client works perfectly for what we’re trying to achieve. thanks again!
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for circling back!