Christian Sicari
06/04/2024, 3:56 PMprefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
For who is more interested, this the code I am testing
from random import randint
from prefect import task, flow
class DummyError(Exception):
pass
@task
def say_hello():
print("Hello, world!")
@task
def t1():
print("t1")
@task
def t2():
print("t2")
@task
def t3():
raise DummyError("t3")
@task
def mapper():
task_id = randint(1, 2)
task_instance = globals()[f"t{task_id}"] # dynamically select a task
return [
task_instance.submit(),
t3.submit()
]
@task
def reducer(futures):
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error: {e}")
@flow
def myflow():
say_hello()
futures = mapper()
reducer(futures)
say_hello()
return "OK"
if __name__ == "__main__":
result = myflow()
Nate
06/04/2024, 6:41 PM.map
method on tasks? where did it come short for you here?Christian Sicari
06/04/2024, 8:57 PMNate
06/04/2024, 9:14 PMfrom random import randint
from prefect import Task, flow, task
from prefect.futures import resolve_futures_to_states
from prefect.states import State
class DummyError(Exception):
pass
@task
def say_hello():
print("Hello, world!")
@task
def t1(params: dict | None = None):
print("t1")
return "Result from t1"
@task
def t2(params: dict | None = None):
print("t2")
return "Result from t2"
@task
def t3(params: dict | None = None):
raise DummyError("t3")
@task
def mapper(tasks: list[Task], params: dict | None = None):
selected_task: Task = tasks[randint(0, len(tasks) - 1)]
return [selected_task.submit(params), t3.submit(params)]
@flow
def myflow():
say_hello()
futures = mapper([t1, t2, t3], params=dict(foo="bar"))
states: list[State] = resolve_futures_to_states(futures)
print(f"Resulting states: {states}")
results = [state.result(raise_on_failure=False) for state in states]
print(f"Results: {results}")
say_hello()
return "OK"
if __name__ == "__main__":
result = myflow()
Christian Sicari
06/05/2024, 6:15 AM