Hey Prefect. I am trying to wrap my head around wh...
# ask-community
f
Hey Prefect. I am trying to wrap my head around whats happening when using .bind() vs not using it.
post_runner.set_upstream(all_pushes)
post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
Using bind() works as I would expect: all_pushes completes entirely and then post_runner starts and completes. Not using bind() post_runner does not wait for all_pushes to complete and then mayhem ensues and I get run() missing X required positional arguments messages. Thoughts?
Copy code
import os
import json
import datetime
from prefect import Flow, prefect, Parameter, task, unmapped, Task

logger = prefect.context.get("logger")

client_secret = os.getenv("CLIENT_SECRET")
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASS")


class PushDataTask(Task):

    def __init__(self,client_secret, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_secret = client_secret

    def run(self, run_name, data_root, collection_name):
        <http://logger.info|logger.info>("Run PushData " + run_name)


class WrapperTask(Task):
    def __init__(self, db_user, db_pass, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.db_user = db_user
        self.db_pass = db_pass

    def run(self, run_names, data_root, collections_list):

        <http://logger.info|logger.info>("Start WrapperTask Runner")

        iterations = len(run_names)
        RenameTask(self.db_user, self.db_pass, name="push1").run(data_root, collections_list)

        for i in range(1, iterations):
            RenameTask(self.db_user, self.db_pass, name=f"push{i + 1}").run(data_root, collections_list)

        <http://logger.info|logger.info>("Finish WrapperTask Runner")


class RenameTask(Task):

    def __init__(self, db_user, db_pass, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.db_user = db_user
        self.db_pass = db_pass

    def run(self, data_root, collections_list, rollback=False):
        <http://logger.info|logger.info>("Start Rename: " + self.name + " : " + data_root)
        # do real work 
        <http://logger.info|logger.info>("Finish Rename: " + self.name)


push_data = PushDataTask(client_secret, name="push_data")
post_runner = WrapperTask(db_user, db_pass, name="post_runner")

with Flow("data_flow") as data_flow:

    mapped_run_name = Parameter("mapped_run_name", required=True)
    mapped_data_root = Parameter("mapped_data_root", required=True)
    mapped_collection_name = Parameter("mapped_collection_name", required=True)

    all_pushes = push_data.map(
        run_name=mapped_run_name,
        data_root=unmapped(mapped_data_root),
        collection_name=unmapped(mapped_collection_name)
    )

    post_runner.set_upstream(all_pushes)
    # post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
    post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)

demo_params = dict(
    mapped_run_name=["demo_push1", "demo_push2", "demo_push3"],
    mapped_data_root="<adl://data_data_data>",
    mapped_collection_name="",
)

data_flow.run(parameters=demo_params)
k
Hey @Felipe Saldana, can you try this syntax instead?
Copy code
all_pushes = push_data.map(
        run_name=mapped_run_name,
        data_root=unmapped(mapped_data_root),
        collection_name=unmapped(mapped_collection_name)
    )
    post_runner_task = post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
    post_runner_task.set_upstream(all_pushes)
We don’t recommend the
bind
syntax in general
f
you mentioned that to me the other day and that got me down this road 🙂 thanks for the heads up on that. Let me give this a try
k
I did? Was that when you were asking about multiple clocks/envs?
f
I believe that was the one 😆
That looked to do the trick ... tasks are running how I would expect
Copy code
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Starting task run...
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Finished task run for task with final state: 'Mapped'
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Starting task run...
[2021-05-27 17:56:24+0000] INFO - prefect | Run PushData demo_push1
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push2
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push3
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Start WrapperTask Runner
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push1 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push1
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push2 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push2
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push3 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push3
[2021-05-27 17:56:25+0000] INFO - prefect | Finish WrapperTask Runner
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
That would be weird. I don’t use
bind
myself xD
f
I will keep that in mind ... thanks for the help!