Tomasz Szuba

    Tomasz Szuba

    1 year ago
    I tried:
    def run(self, parameter):
      result="hello"
      raise signals.SUCCESS(
        message=f'{parameter}',
        result=result
      )
    But this make result to be None in another task that depends on this result
    When run locally with flow.run() it works ok, but on server with agents it does not. it seems checkpointing does not kick in, and result is lost.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Tomasz Szuba, did you try turning checkpointing on like this ?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Everything works ok, if I just return result and then I see result in prefect gui
    Kevin Kho

    Kevin Kho

    1 year ago
    I see. Will test it on my end
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Looking at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L887 I see that checkpointing is when: • task does not raise exception • task times out • task raises loop signal
    My gut feeling tells me, that it should also run in success case (maybe on all signals?) but I am very knew to codebase
    Kevin Kho

    Kevin Kho

    1 year ago
    Do you have a result configured like
    flow.result = PrefectResult()
    ?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    yes
    Kevin Kho

    Kevin Kho

    1 year ago
    I have a code snippet that seems to be working for me. Could you tell me if this is similar to what you have?
    from prefect import Flow, Task
    from prefect import task
    from prefect.engine import signals
    from prefect.engine.results.prefect_result import PrefectResult
    import prefect
    
    class MyTask(Task):
        def run(self, parameter):
            result="hello"
            raise signals.SUCCESS(
                message=f'{parameter}',
                result=result
            )
    
    @task
    def other_task(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        return x
    
    a = MyTask()
    
    with Flow("test") as flow:
        z = a("test")
        other_task(z)
    
    flow.result = PrefectResult()
    flow.register('omlds')
    This is working in Cloud for me. The
    other_task
    prints out “hello”
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    I tried to make the example as simple as possible, and I think I made it too simple
    I will be back in a moment with tou
    On first run parameter is passed fine, the problem happens when we raise PAUSE in other task on second run:
    @task
    def other_task(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        if x is not None:
            start_time = datetime.utcnow() + timedelta(seconds=10)
            raise prefect.engine.signals.PAUSE(result=x, start_time=start_time)
        return x
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok will try this
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    This should loop indefinitely (in real code it waits on condition). But it run two times. First time printing hello, seconf one printing None
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok I replicated
    Will look into this more
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Thanks!
    If you have any other option to set state message (that depends on runtime parameter) and return value that would be helpful
    Kevin Kho

    Kevin Kho

    1 year ago
    Could you show me how you did the loop? I think that’s the place we can adjust this
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    It's the same as in the code excerpt I pasted above
    What we want to achieve is: • task
    a
    creates entity in external system, sets state success message to url to entity (that depends on passed parameters). return entity id • task
    b
    waits for certain status on entity passed from
    a
    raising PAUSE signal if entity state is not the one we expect
    I tried to use state handler to set message, but https://github.com/PrefectHQ/prefect/issues/3921 blocks us
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you just make a task with a
    while loop
    inside that keeps checking and then raise SUCCESS when the condition is met?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    That would be possible, but the condition may take days to be met
    I have a feeling that agent restart during that time will give same result
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh wow I see. Ok will take a look at this
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Thanks!
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you try
    @task
    def other_task(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        if x is not None:
            start_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
            raise prefect.engine.signals.PAUSE(result=PrefectResult(x), start_time=start_time)
        return x
    Note the
    PrefectResult(x)
    . This seems to hold the result for me
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Let's see
    I had to change
    PrefectResult(x)
    to
    PrefectResult(value=x)
    otherwise I got an Error
    But this still does not work...
    Kevin Kho

    Kevin Kho

    1 year ago
    What Prefect version are you on?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    0.14.19
    It works when I run it with flow.run() locally, but not when registered and run from server
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok that’s weird. The script above worked for me on Cloud. What is your server prefect version?
    Are you on 0.14.19 for the flow, agent and server?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    0.14.19 for agent and server. local prefect cli is 0.14.20
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok thanks! Will get back to you
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    We use docker storage (if it's helpful) and docker-compose type of deployment
    Kevin Kho

    Kevin Kho

    1 year ago
    I talked to the team and it seems like PAUSE was not meant for this kind of infinite loop. Do you need to start immediately once the state changes? The current suggestion is very generous retries.
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    The loop is not inifnite. It will change eventually. We use PAUSE for pooling. And it works OK, when we just return from task, but not when we raise SUCCESS
    When Entity state is in desired state, we just return from PAUSING task. It's a way for us to do long polling.
    Docs suggest that using
    raise SUCCESS(result="something"
    should work the same as
    return "something"
    and this is not the case
    nicholas

    nicholas

    1 year ago
    Hi @Tomasz Szuba - could you provide a more complete example, including how you're resuming the task and the dependencies? It's difficult to understand where this might be breaking down with just the raise statement, and as @Kevin Kho’s example suggests, raising a success signal with a result works in at least the cases we've tested
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    The flow looks exactly how presented here already 🙂
    The only addition is some business logic instead of passing constants
    I was also able to reproduce it on cloud with docker agent running on my machine
    import os
    from datetime import datetime, timedelta
    
    import prefect.engine.signals
    from prefect import Flow, task
    from prefect.engine.results import PrefectResult
    from prefect.storage import Docker
    
    
    @task(log_stdout=True)
    def create_entity() -> None:
        key = "key"  # entityManager.create()
        raise prefect.engine.signals.SUCCESS(
            result=key,
            message=f'message {key}'
        )
    
    
    @task(log_stdout=True)
    def wait_entity_ready(key) -> str:
        print(key) # key is none here on second execution after pause
        status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
        if status is not None: #status.isReady()
            start_time = datetime.utcnow() + timedelta(seconds=30)
            raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
        return key
    
    
    with Flow("wait-entity") as flow:
        created = create_entity()
        wait_entity_ready(created)
    
    flow.result = PrefectResult()
    The wait_for_entity_ready task is just polling every 30 seconds for status of entity
    While experimenting, it seems the same error occurs, when I try to change
    new_state.result
    in state handler.
    write
    is not executed on result and checkpointing does not work
    Everything is working fine if I pass
    PrefectResult().write(key)
    to SUCCESS signal:
    import os
    from datetime import datetime, timedelta
    
    import prefect.engine.signals
    from prefect import Flow, task
    from prefect.engine.results import PrefectResult
    from prefect.storage import Docker
    
    
    @task(log_stdout=True)
    def create_entity() -> None:
        key = "key"  # entityManager.create()
        raise prefect.engine.signals.SUCCESS(
            result=PrefectResult().write(key),
            message=f'message {key}'
        )
    
    
    @task(log_stdout=True)
    def wait_entity_ready(key) -> str:
        print(key) # key is none here on second execution after pause
        status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
        if status is not None: #status.isReady()
            start_time = datetime.utcnow() + timedelta(seconds=30)
            raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
        return key
    
    
    with Flow("wait-entity") as flow:
        created = create_entity()
        wait_entity_ready(created)
    
    flow.result = PrefectResult()
    nicholas

    nicholas

    1 year ago
    Hi @Tomasz Szuba - it looks like you've got this working as you'd expect, am i correct?
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Yes, it works, but not as I would expect it to
    Results that are passed to signals are not being checkpointed automatically and I need to do it myself
    Documentation does not hint that, and I would still consider it a bug (or lack of feature?) that I need a workaround for
    @nicholas @Kevin Kho
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Tomasz Szuba, I’ll take care of opening an issue. I just need to do a couple of tests to really pinpoint the issue.
    Hey I talked to our core engineers. The suggestion is to actually use
    RETRY
    here like:
    @task(log_stdout=True)
    def wait_entity_ready(key) -> str:
        import time
        print(key) # key is none here on second execution after pause
        status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
        if status is not None: #status.isReady()
            time.sleep(30)
            raise prefect.engine.signals.RETRY(message=f'message {key}')
        return key
    PAUSE
    is not intended for this operation but
    RETRY
    will keep the value when run again
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    RETRY signal works (even with start_time passed, instead of
    time.sleep
    )
    What I would like to understand is why triggering signal.SUCCESS does not invoke checkpointing, when return value does
    @Kevin Kho When I just
    raise prefect.engine.signals.SUCCESS(
        result=key,
        message=f'message {key}'
    )
    The Result is not PrefectResult, and location is None
    And modify add task so it looks like:
    @prefect.task(checkpoint=checkpoint, result=PrefectResult())
    def add(x, y):
        result = x+y
        raise SUCCESS(result=result)
    The test will fail
    If this is the case, that when raising Signals we should manually do checkpointing with Result.write() that's kinda ok, but I haven't found such info in docs
    Also this would complicate things, as task would need to know which kind of Result class was picked by flow, which is not ideal if we would like to change it in future to S3
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for the very detailed response @Tomasz Szuba, we’re opening an issue now and we’ll post it here
    Zach Angell

    Zach Angell

    1 year ago
    Thanks again for the detail here Tomasz! I've opened an issue here and we'll look into it further https://github.com/PrefectHQ/prefect/issues/4633.
    Tomasz Szuba

    Tomasz Szuba

    1 year ago
    Glad to be helpful 🙂 We really enjoy working with the project and would like to get rid of as many rough edges as possible
    I would even try to write implementation for it, but the codebase is a quite a mystery in a lot of places