Newskooler

    Newskooler

    1 year ago
    Hi prefecters 👋 I stumbled on an interesting case: how can I test (unit test) a task which fails and has a retry (but without waiting for all retries to go… ? Currently I do this, however this works only when all retires have passed (which in my case takes a long time). Is there a way around this?
    def test_smth():
        with Flow("test") as flow:
            outcome = my_funct()
        state = flow.run()
        assert state.result[outcome].is_failed()
    Dylan

    Dylan

    1 year ago
    Hi @Newskooler, In order to test at the level you’re looking at (flow-level outcomes with retries and whatnot) you may need to drop to the
    FlowRunner
    or
    TaskRunner
    level
    Newskooler

    Newskooler

    1 year ago
    Hi @Dylan what does that mean 🤔
    Dylan

    Dylan

    1 year ago
    For this specific question, you could try using environment variables when defining your flow to change
    max_retires
    in your Tasks to 0 for testing
    We don’t have a testing utility or a global setting you can flip for your flow to turn retries off, I believe
    This might be a good candidate for an issue if you can give some background on what you’re trying to achieve with your testing and what behavior you’re hoping to see!
    Newskooler

    Newskooler

    1 year ago
    Okay got it. I will inspect more with this info and if I see a case for an issue I will post one : ) Thanks!
    Dylan

    Dylan

    1 year ago
    You got it 👍