Rahat Zaman
09/25/2020, 11:20 PMyield
result for B. B will start when it gets first yield
from AChris White
@task
def task_A():
return 3
@task
def task_B(a):
return a + 1
with Flow("data") as flow:
task_B(task_A)
the moment that A finishes, B will begin with sub-second latency. Is there something else you’re looking for?Rahat Zaman
09/28/2020, 3:15 AMfrom test_custom import HelloWorld
helloworld_task = HelloWorld()
with Flow("outside-task") as flow:
val = helloworld_task()
executor = DaskExecutor(address="<tcp://192.168.0.7:8786>")
flow_state = flow.run(executor=executor)
And here is what it is in test_custom.py
from prefect import Task
class HelloWorld(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self):
return val
I used dask-scheduler
and dask-worker <IP>
in local for debugging. All run from the same directory. And, if the flow is run without passing the executor
from that dir, it runs perfectly fine.def task_A():
for i in range(3):
yield i
def task_B(a):
for i in task_A():
return i + 1
# The flow
task_B(task_A)
And, I want the flow to do something like this:
1. First run task_A
2. Whenever task_A yields something: pass it to task_B
3. Perellelly keep running task_A and task_B (when task_B has a single data from task_A).Skip Breidbach
09/29/2020, 4:21 PMRahat Zaman
09/30/2020, 6:10 AM