Tomasz Szuba
06/02/2021, 2:23 PMdef run(self, parameter):
result="hello"
raise signals.SUCCESS(
message=f'{parameter}',
result=result
)
But this make result to be None in another task that depends on this resultTomasz Szuba
06/02/2021, 2:24 PMKevin Kho
Tomasz Szuba
06/02/2021, 2:30 PMKevin Kho
Tomasz Szuba
06/02/2021, 2:33 PMTomasz Szuba
06/02/2021, 2:33 PMKevin Kho
flow.result = PrefectResult()
?Tomasz Szuba
06/02/2021, 2:45 PMKevin Kho
from prefect import Flow, Task
from prefect import task
from prefect.engine import signals
from prefect.engine.results.prefect_result import PrefectResult
import prefect
class MyTask(Task):
def run(self, parameter):
result="hello"
raise signals.SUCCESS(
message=f'{parameter}',
result=result
)
@task
def other_task(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return x
a = MyTask()
with Flow("test") as flow:
z = a("test")
other_task(z)
flow.result = PrefectResult()
flow.register('omlds')
Kevin Kho
other_task
prints out “hello”Tomasz Szuba
06/02/2021, 2:51 PMTomasz Szuba
06/02/2021, 2:51 PMTomasz Szuba
06/02/2021, 2:56 PM@task
def other_task(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
if x is not None:
start_time = datetime.utcnow() + timedelta(seconds=10)
raise prefect.engine.signals.PAUSE(result=x, start_time=start_time)
return x
Kevin Kho
Tomasz Szuba
06/02/2021, 2:57 PMTomasz Szuba
06/02/2021, 2:59 PMKevin Kho
Kevin Kho
Tomasz Szuba
06/02/2021, 3:23 PMTomasz Szuba
06/02/2021, 3:24 PMKevin Kho
Tomasz Szuba
06/02/2021, 3:25 PMTomasz Szuba
06/02/2021, 3:27 PMa
creates entity in external system, sets state success message to url to entity (that depends on passed parameters). return entity id
• task b
waits for certain status on entity passed from a
raising PAUSE signal if entity state is not the one we expectTomasz Szuba
06/02/2021, 3:27 PMKevin Kho
while loop
inside that keeps checking and then raise SUCCESS when the condition is met?Tomasz Szuba
06/02/2021, 3:29 PMTomasz Szuba
06/02/2021, 3:29 PMKevin Kho
Tomasz Szuba
06/02/2021, 3:38 PMKevin Kho
@task
def other_task(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
if x is not None:
start_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
raise prefect.engine.signals.PAUSE(result=PrefectResult(x), start_time=start_time)
return x
Kevin Kho
PrefectResult(x)
. This seems to hold the result for meTomasz Szuba
06/02/2021, 3:56 PMTomasz Szuba
06/02/2021, 4:02 PMPrefectResult(x)
to PrefectResult(value=x)
otherwise I got an ErrorTomasz Szuba
06/02/2021, 4:02 PMKevin Kho
Tomasz Szuba
06/02/2021, 4:04 PMTomasz Szuba
06/02/2021, 4:05 PMKevin Kho
Kevin Kho
Tomasz Szuba
06/02/2021, 4:11 PMKevin Kho
Tomasz Szuba
06/02/2021, 4:12 PMKevin Kho
Tomasz Szuba
06/03/2021, 9:09 AMTomasz Szuba
06/03/2021, 9:10 AMTomasz Szuba
06/03/2021, 9:11 AMraise SUCCESS(result="something"
should work the same as return "something"
and this is not the casenicholas
Tomasz Szuba
06/04/2021, 5:36 AMTomasz Szuba
06/04/2021, 5:36 AMTomasz Szuba
06/04/2021, 6:01 AMTomasz Szuba
06/04/2021, 6:02 AMimport os
from datetime import datetime, timedelta
import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker
@task(log_stdout=True)
def create_entity() -> None:
key = "key" # entityManager.create()
raise prefect.engine.signals.SUCCESS(
result=key,
message=f'message {key}'
)
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
print(key) # key is none here on second execution after pause
status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
if status is not None: #status.isReady()
start_time = datetime.utcnow() + timedelta(seconds=30)
raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
return key
with Flow("wait-entity") as flow:
created = create_entity()
wait_entity_ready(created)
flow.result = PrefectResult()
Tomasz Szuba
06/04/2021, 6:02 AMTomasz Szuba
06/04/2021, 6:03 AMTomasz Szuba
06/04/2021, 11:35 AMnew_state.result
in state handler. write
is not executed on result and checkpointing does not workTomasz Szuba
06/04/2021, 11:49 AMPrefectResult().write(key)
to SUCCESS signal:
import os
from datetime import datetime, timedelta
import prefect.engine.signals
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from prefect.storage import Docker
@task(log_stdout=True)
def create_entity() -> None:
key = "key" # entityManager.create()
raise prefect.engine.signals.SUCCESS(
result=PrefectResult().write(key),
message=f'message {key}'
)
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
print(key) # key is none here on second execution after pause
status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
if status is not None: #status.isReady()
start_time = datetime.utcnow() + timedelta(seconds=30)
raise prefect.engine.signals.PAUSE(result=key, start_time=start_time)
return key
with Flow("wait-entity") as flow:
created = create_entity()
wait_entity_ready(created)
flow.result = PrefectResult()
nicholas
Tomasz Szuba
06/07/2021, 5:56 AMTomasz Szuba
06/07/2021, 5:57 AMTomasz Szuba
06/07/2021, 5:57 AMTomasz Szuba
06/07/2021, 6:25 PMKevin Kho
Kevin Kho
RETRY
here like:
@task(log_stdout=True)
def wait_entity_ready(key) -> str:
import time
print(key) # key is none here on second execution after pause
status = key # entityManeger.getEntityStatus(key) # our internal code throws here exception
if status is not None: #status.isReady()
time.sleep(30)
raise prefect.engine.signals.RETRY(message=f'message {key}')
return key
Kevin Kho
PAUSE
is not intended for this operation but RETRY
will keep the value when run againTomasz Szuba
06/08/2021, 7:15 AMtime.sleep
)Tomasz Szuba
06/08/2021, 7:15 AMTomasz Szuba
06/08/2021, 7:15 AMTomasz Szuba
06/08/2021, 7:16 AMraise prefect.engine.signals.SUCCESS(
result=key,
message=f'message {key}'
)
The Result is not PrefectResult, and location is NoneTomasz Szuba
06/08/2021, 7:24 AMTomasz Szuba
06/08/2021, 7:24 AM@prefect.task(checkpoint=checkpoint, result=PrefectResult())
def add(x, y):
result = x+y
raise SUCCESS(result=result)
Tomasz Szuba
06/08/2021, 7:24 AMTomasz Szuba
06/08/2021, 7:25 AMTomasz Szuba
06/08/2021, 7:26 AMKevin Kho
Zach Angell
Tomasz Szuba
06/08/2021, 2:07 PMTomasz Szuba
06/08/2021, 2:07 PM