Thread
#prefect-community
    Rahat Zaman

    Rahat Zaman

    2 years ago
    I was searching for a paradigm where a task will create small results and another task will take that data and start instantly. So something like a flow (A -> B) where A, B are tasks and A will
    yield
    result for B. B will start when it gets first
    yield
    from A
    Chris White

    Chris White

    2 years ago
    Hi @Rahat Zaman! It sounds like you are describing Prefect data dependencies:
    @task
    def task_A():
        return 3
    
    @task
    def task_B(a):
        return a + 1
    
    with Flow("data") as flow:
        task_B(task_A)
    the moment that A finishes, B will begin with sub-second latency. Is there something else you’re looking for?
    Rahat Zaman

    Rahat Zaman

    1 year ago
    Hi Chris, watched almost all of your videos on youtube. I have another problem
    With running dask executor, dask-worker do not seem to recognize a module in its working directory.
    Here is a minimal reproducable.
    from test_custom import HelloWorld
    helloworld_task = HelloWorld()
    
    with Flow("outside-task") as flow:
        val = helloworld_task()
    
    executor = DaskExecutor(address="<tcp://192.168.0.7:8786>")
    flow_state = flow.run(executor=executor)
    And here is what it is in test_custom.py
    from prefect import Task
    class HelloWorld(Task):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
    
        def run(self):
            return val
    I used
    dask-scheduler
    and
    dask-worker <IP>
    in local for debugging. All run from the same directory. And, if the flow is run without passing the
    executor
    from that dir, it runs perfectly fine.
    Oh, and sorry for not responding on your previous message @Chris White No, I wasn't looking for data dependency. I want the below code to "taskIZE" in prefect.
    def task_A():
        for i in range(3):
            yield i
    
    def task_B(a):
        for i in task_A():
            return i + 1
    
    # The flow
    task_B(task_A)
    And, I want the flow to do something like this:1. First run task_A 2. Whenever task_A yields something: pass it to task_B 3. Perellelly keep running task_A and task_B (when task_B has a single data from task_A).
    Skip Breidbach

    Skip Breidbach

    1 year ago
    Rahat Zaman

    Rahat Zaman

    1 year ago
    Yes, thank you. I was exactly looking for this feature.