https://prefect.io logo
Title
y

Yury Cheremushkin

08/08/2022, 12:14 PM
Hello! What is a replacement for prefect.signals from 1.0? I used it a lot, especially for skipping unnecessary tasks. Should I do it in a native pythonic way, like passing None as a result of a task to show that there are no need to do something in downstream tasks? The second question is about manually setting downstream/upstream tasks. I was able to do it with
.set_upstream()
method in 1.0. But it looks like there are no more such method. I.e. there are two tasks: loading data into some BigQuery table and then merging it with another table. Obviously these tasks need to be run in exact order. But there are no need to pass any results from the first task to the second, so it wasn’t obvious for Prefect 1.0 that there is some kind of order. That’s why I used
.set_upstream()
. What should i do now? And the third question is about
.map()
for tasks. So, now i should just use standard pythonic map? Will it be parallelized in the same manner as it was in Prefect 1.0 with Concurrent/Parallel task runner?
copy to answer your question inline: In Orion, you can raise a custom exception directly in the flow, which will result in a
Failed
task run:
from prefect import task

@task
def signal_task(message):
    if message == 'stop_immediately!':
        raise RuntimeError(message='Got a signal to end the task run!')
Alternatively, your flow may return a specific state 3:
from prefect import task
from prefect.orion.schemas.states import Failed

@task
def signal_task(message):
    if message == 'stop_immediately!':
        return Failed(message='Stopping the task run immediately!')
y

Yury Cheremushkin

08/08/2022, 2:52 PM
okay, and what about setting upstream tasks and mapping them?
a

Anna Geller

08/08/2022, 2:58 PM
There are also migration topics for that Answers here: wait_for for dependencies and mapping exists in 2.0 too
👀 1
m

Michael Law

08/29/2022, 4:40 PM
@Anna Geller Do't suppose you could point us in the direction of the upstream task documentation?
a

Anna Geller

08/30/2022, 11:16 AM
It may be missing in the docs, but check out the arguments of a task in your IDE or API reference for a task - I believe the current syntax is task2.submit(wait_for=[task1])
👍 1