Rahat Zaman

09/25/2020, 11:20 PM
I was searching for a paradigm where a task will create small results and another task will take that data and start instantly. So something like a flow (A -> B) where A, B are tasks and A will
result for B. B will start when it gets first
from A

Chris White

09/25/2020, 11:23 PM
Hi @Rahat Zaman! It sounds like you are describing Prefect data dependencies:
Copy code
def task_A():
    return 3

def task_B(a):
    return a + 1

with Flow("data") as flow:
the moment that A finishes, B will begin with sub-second latency. Is there something else you’re looking for?
👀 1
❤️ 1

Rahat Zaman

09/28/2020, 3:15 AM
Hi Chris, watched almost all of your videos on youtube. I have another problem
With running dask executor, dask-worker do not seem to recognize a module in its working directory.
Here is a minimal reproducable.
Copy code
from test_custom import HelloWorld
helloworld_task = HelloWorld()

with Flow("outside-task") as flow:
    val = helloworld_task()

executor = DaskExecutor(address="<tcp://>")
flow_state =
And here is what it is in
Copy code
from prefect import Task
class HelloWorld(Task):
    def __init__(self, **kwargs):

    def run(self):
        return val
I used
dask-worker <IP>
in local for debugging. All run from the same directory. And, if the flow is run without passing the
from that dir, it runs perfectly fine.
Oh, and sorry for not responding on your previous message @Chris White No, I wasn't looking for data dependency. I want the below code to "taskIZE" in prefect.
Copy code
def task_A():
    for i in range(3):
        yield i

def task_B(a):
    for i in task_A():
        return i + 1

# The flow
And, I want the flow to do something like this: 1. First run task_A 2. Whenever task_A yields something: pass it to task_B 3. Perellelly keep running task_A and task_B (when task_B has a single data from task_A).

Skip Breidbach

09/29/2020, 4:21 PM

Rahat Zaman

09/30/2020, 6:10 AM
Yes, thank you. I was exactly looking for this feature.
💥 1