Tomasz Szuba
06/02/2021, 2:23 PMdef run(self, parameter):
  result="hello"
  raise signals.SUCCESS(
    message=f'{parameter}',
    result=result
  )
But this make result to be None in another task that depends on this resultTomasz Szuba
06/02/2021, 2:24 PMKevin Kho
Tomasz Szuba
06/02/2021, 2:30 PMKevin Kho
Tomasz Szuba
06/02/2021, 2:33 PMTomasz Szuba
06/02/2021, 2:33 PMKevin Kho
flow.result = PrefectResult()?Tomasz Szuba
06/02/2021, 2:45 PMKevin Kho
from prefect import Flow, Task
from prefect import task
from prefect.engine import signals
from prefect.engine.results.prefect_result import PrefectResult
import prefect
class MyTask(Task):
    def run(self, parameter):
        result="hello"
        raise signals.SUCCESS(
            message=f'{parameter}',
            result=result
        )
@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return x
a = MyTask()
with Flow("test") as flow:
    z = a("test")
    other_task(z)
flow.result = PrefectResult()
flow.register('omlds')Kevin Kho
other_task prints out “hello”Tomasz Szuba
06/02/2021, 2:51 PMTomasz Szuba
06/02/2021, 2:51 PMTomasz Szuba
06/02/2021, 2:56 PM@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    if x is not None:
        start_time = datetime.utcnow() + timedelta(seconds=10)
        raise prefect.engine.signals.PAUSE(result=x, start_time=start_time)
    return xKevin Kho
Tomasz Szuba
06/02/2021, 2:57 PMTomasz Szuba
06/02/2021, 2:59 PMKevin Kho
Kevin Kho
Tomasz Szuba
06/02/2021, 3:23 PMTomasz Szuba
06/02/2021, 3:24 PMKevin Kho
Tomasz Szuba
06/02/2021, 3:25 PMTomasz Szuba
06/02/2021, 3:27 PMa creates entity in external system, sets state success message to url to entity (that depends on passed parameters). return entity id
• task b waits for certain status on entity passed from a raising PAUSE signal if entity state is not the one we expectTomasz Szuba
06/02/2021, 3:27 PMKevin Kho
while loop inside that keeps checking and then raise SUCCESS when the condition is met?Tomasz Szuba
06/02/2021, 3:29 PMTomasz Szuba
06/02/2021, 3:29 PMKevin Kho
Tomasz Szuba
06/02/2021, 3:38 PMKevin Kho
@task
def other_task(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    if x is not None:
        start_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
        raise prefect.engine.signals.PAUSE(result=PrefectResult(x), start_time=start_time)
    return xKevin Kho
PrefectResult(x). This seems to hold the result for meTomasz Szuba
06/02/2021, 3:56 PMTomasz Szuba
06/02/2021, 4:02 PMPrefectResult(x) to PrefectResult(value=x) otherwise I got an ErrorTomasz Szuba
06/02/2021, 4:02 PMKevin Kho
Tomasz Szuba
06/02/2021, 4:04 PMTomasz Szuba
06/02/2021, 4:05 PMKevin Kho
Kevin Kho
Tomasz Szuba
06/02/2021, 4:11 PMKevin Kho
Tomasz Szuba
06/02/2021, 4:12 PMKevin Kho
Tomasz Szuba
06/03/2021, 9:09 AMTomasz Szuba
06/03/2021, 9:10 AMTomasz Szuba
06/03/2021, 9:11 AMraise SUCCESS(result="something" should work the same as return "something" and this is not the casenicholas
Tomasz Szuba
06/04/2021, 5:36 AMTomasz Szuba
06/04/2021, 5:36 AMTomasz Szuba
06/04/2021, 6:01 AMTomasz Szuba
06/04/2021, 6:02 AMimport os
from datetime import datetime, timedelta
import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker
@task(log_stdout=True)
def create_entity() -> None:
    key = "key"  # entityManager.create()
    raise prefect.engine.signals.SUCCESS(
        result=key,
        message=f'message {key}'
    )
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        start_time = datetime.utcnow() + timedelta(seconds=30)
        raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
    return key
with Flow("wait-entity") as flow:
    created = create_entity()
    wait_entity_ready(created)
flow.result = PrefectResult()Tomasz Szuba
06/04/2021, 6:02 AMTomasz Szuba
06/04/2021, 6:03 AMTomasz Szuba
06/04/2021, 11:35 AMnew_state.result in state handler. write is not executed on result and checkpointing does not workTomasz Szuba
06/04/2021, 11:49 AMPrefectResult().write(key) to SUCCESS signal:
import os
from datetime import datetime, timedelta
import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker
@task(log_stdout=True)
def create_entity() -> None:
    key = "key"  # entityManager.create()
    raise prefect.engine.signals.SUCCESS(
        result=PrefectResult().write(key),
        message=f'message {key}'
    )
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        start_time = datetime.utcnow() + timedelta(seconds=30)
        raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
    return key
with Flow("wait-entity") as flow:
    created = create_entity()
    wait_entity_ready(created)
flow.result = PrefectResult()nicholas
Tomasz Szuba
06/07/2021, 5:56 AMTomasz Szuba
06/07/2021, 5:57 AMTomasz Szuba
06/07/2021, 5:57 AMTomasz Szuba
06/07/2021, 6:25 PMKevin Kho
Kevin Kho
RETRY here like:
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
    import time
    print(key) # key is none here on second execution after pause
    status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
    if status is not None: #status.isReady()
        time.sleep(30)
        raise prefect.engine.signals.RETRY(message=f'message {key}')
    return keyKevin Kho
PAUSE is not intended for this operation but RETRY will keep the value when run againTomasz Szuba
06/08/2021, 7:15 AMtime.sleep)Tomasz Szuba
06/08/2021, 7:15 AMTomasz Szuba
06/08/2021, 7:15 AMTomasz Szuba
06/08/2021, 7:16 AMraise prefect.engine.signals.SUCCESS(
    result=key,
    message=f'message {key}'
)
The Result is not PrefectResult, and location is NoneTomasz Szuba
06/08/2021, 7:24 AMTomasz Szuba
06/08/2021, 7:24 AM@prefect.task(checkpoint=checkpoint, result=PrefectResult())
def add(x, y):
    result = x+y
    raise SUCCESS(result=result)Tomasz Szuba
06/08/2021, 7:24 AMTomasz Szuba
06/08/2021, 7:25 AMTomasz Szuba
06/08/2021, 7:26 AMKevin Kho
Zach Angell
Tomasz Szuba
06/08/2021, 2:07 PMTomasz Szuba
06/08/2021, 2:07 PM