I tried: ```def run(self, parameter): result="he...
# ask-community
t
I tried:
Copy code
def run(self, parameter):
  result="hello"
  raise signals.SUCCESS(
    message=f'{parameter}',
    result=result
  )
But this make result to be None in another task that depends on this result
When run locally with flow.run() it works ok, but on server with agents it does not. it seems checkpointing does not kick in, and result is lost.
k
Hey @Tomasz Szuba, did you try turning checkpointing on like this ?
t
Everything works ok, if I just return result and then I see result in prefect gui
k
I see. Will test it on my end
t
Looking at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L887 I see that checkpointing is when: • task does not raise exception • task times out • task raises loop signal
My gut feeling tells me, that it should also run in success case (maybe on all signals?) but I am very knew to codebase
k
Do you have a result configured like
flow.result = PrefectResult()
?
t
yes
k
I have a code snippet that seems to be working for me. Could you tell me if this is similar to what you have?
Copy code
from prefect import Flow, Task
from prefect import task
from prefect.engine import signals
from prefect.engine.results.prefect_result import PrefectResult
import prefect

class MyTask(Task):
    def run(self, parameter):
        result="hello"
        raise signals.SUCCESS(
            message=f'{parameter}',
            result=result
        )

@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return x

a = MyTask()

with Flow("test") as flow:
    z = a("test")
    other_task(z)

flow.result = PrefectResult()
flow.register('omlds')
This is working in Cloud for me. The
other_task
prints out “hello”
t
I tried to make the example as simple as possible, and I think I made it too simple
I will be back in a moment with tou
👍 1
On first run parameter is passed fine, the problem happens when we raise PAUSE in other task on second run:
Copy code
@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    if x is not None:
        start_time = datetime.utcnow() + timedelta(seconds=10)
        raise prefect.engine.signals.PAUSE(result=x, start_time=start_time)
    return x
k
Ok will try this
t
This should loop indefinitely (in real code it waits on condition). But it run two times. First time printing hello, seconf one printing None
k
Ok I replicated
Will look into this more
t
Thanks!
If you have any other option to set state message (that depends on runtime parameter) and return value that would be helpful
k
Could you show me how you did the loop? I think that’s the place we can adjust this
t
It's the same as in the code excerpt I pasted above
What we want to achieve is: • task
a
creates entity in external system, sets state success message to url to entity (that depends on passed parameters). return entity id • task
b
waits for certain status on entity passed from
a
raising PAUSE signal if entity state is not the one we expect
I tried to use state handler to set message, but https://github.com/PrefectHQ/prefect/issues/3921 blocks us
k
Can you just make a task with a
while loop
inside that keeps checking and then raise SUCCESS when the condition is met?
t
That would be possible, but the condition may take days to be met
I have a feeling that agent restart during that time will give same result
k
Oh wow I see. Ok will take a look at this
t
Thanks!
k
Can you try
Copy code
@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    if x is not None:
        start_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
        raise prefect.engine.signals.PAUSE(result=PrefectResult(x), start_time=start_time)
    return x
Note the
PrefectResult(x)
. This seems to hold the result for me
t
Let's see
I had to change
PrefectResult(x)
to
PrefectResult(value=x)
otherwise I got an Error
But this still does not work...
k
What Prefect version are you on?
t
0.14.19
It works when I run it with flow.run() locally, but not when registered and run from server
k
Ok that’s weird. The script above worked for me on Cloud. What is your server prefect version?
Are you on 0.14.19 for the flow, agent and server?
t
0.14.19 for agent and server. local prefect cli is 0.14.20
k
Ok thanks! Will get back to you
t
We use docker storage (if it's helpful) and docker-compose type of deployment
k
I talked to the team and it seems like PAUSE was not meant for this kind of infinite loop. Do you need to start immediately once the state changes? The current suggestion is very generous retries.
t
The loop is not inifnite. It will change eventually. We use PAUSE for pooling. And it works OK, when we just return from task, but not when we raise SUCCESS
When Entity state is in desired state, we just return from PAUSING task. It's a way for us to do long polling.
Docs suggest that using
raise SUCCESS(result="something"
should work the same as
return "something"
and this is not the case
n
Hi @Tomasz Szuba - could you provide a more complete example, including how you're resuming the task and the dependencies? It's difficult to understand where this might be breaking down with just the raise statement, and as @Kevin Kho’s example suggests, raising a success signal with a result works in at least the cases we've tested
t
The flow looks exactly how presented here already 🙂
The only addition is some business logic instead of passing constants
I was also able to reproduce it on cloud with docker agent running on my machine
Copy code
import os
from datetime import datetime, timedelta

import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker


@task(log_stdout=True)
def create_entity() -> None:
    key = "key"  # entityManager.create()
    raise prefect.engine.signals.SUCCESS(
        result=key,
        message=f'message {key}'
    )


@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        start_time = datetime.utcnow() + timedelta(seconds=30)
        raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
    return key


with Flow("wait-entity") as flow:
    created = create_entity()
    wait_entity_ready(created)

flow.result = PrefectResult()
The wait_for_entity_ready task is just polling every 30 seconds for status of entity
While experimenting, it seems the same error occurs, when I try to change
new_state.result
in state handler.
write
is not executed on result and checkpointing does not work
Everything is working fine if I pass
PrefectResult().write(key)
to SUCCESS signal:
Copy code
import os
from datetime import datetime, timedelta

import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker


@task(log_stdout=True)
def create_entity() -> None:
    key = "key"  # entityManager.create()
    raise prefect.engine.signals.SUCCESS(
        result=PrefectResult().write(key),
        message=f'message {key}'
    )


@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        start_time = datetime.utcnow() + timedelta(seconds=30)
        raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
    return key


with Flow("wait-entity") as flow:
    created = create_entity()
    wait_entity_ready(created)

flow.result = PrefectResult()
n
Hi @Tomasz Szuba - it looks like you've got this working as you'd expect, am i correct?
t
Yes, it works, but not as I would expect it to
Results that are passed to signals are not being checkpointed automatically and I need to do it myself
Documentation does not hint that, and I would still consider it a bug (or lack of feature?) that I need a workaround for
@nicholas @Kevin Kho
k
Hey @Tomasz Szuba, I’ll take care of opening an issue. I just need to do a couple of tests to really pinpoint the issue.
Hey I talked to our core engineers. The suggestion is to actually use
RETRY
here like:
Copy code
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    import time
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        time.sleep(30)
        raise prefect.engine.signals.RETRY(message=f'message {key}')
    return key
PAUSE
is not intended for this operation but
RETRY
will keep the value when run again
t
RETRY signal works (even with start_time passed, instead of
time.sleep
)
What I would like to understand is why triggering signal.SUCCESS does not invoke checkpointing, when return value does
@Kevin Kho When I just
Copy code
raise prefect.engine.signals.SUCCESS(
    result=key,
    message=f'message {key}'
)
The Result is not PrefectResult, and location is None
And modify add task so it looks like:
Copy code
@prefect.task(checkpoint=checkpoint, result=PrefectResult())
def add(x, y):
    result = x+y
    raise SUCCESS(result=result)
The test will fail
If this is the case, that when raising Signals we should manually do checkpointing with Result.write() that's kinda ok, but I haven't found such info in docs
Also this would complicate things, as task would need to know which kind of Result class was picked by flow, which is not ideal if we would like to change it in future to S3
k
Thanks for the very detailed response @Tomasz Szuba, we’re opening an issue now and we’ll post it here
z
Thanks again for the detail here Tomasz! I've opened an issue here and we'll look into it further https://github.com/PrefectHQ/prefect/issues/4633.
🙏 1
t
Glad to be helpful 🙂 We really enjoy working with the project and would like to get rid of as many rough edges as possible
🙏 2
I would even try to write implementation for it, but the codebase is a quite a mystery in a lot of places