Thread
#prefect-community
    f

    Felipe Saldana

    1 year ago
    Hey Prefect. I am trying to wrap my head around whats happening when using .bind() vs not using it.
    post_runner.set_upstream(all_pushes)
    post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
    post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
    Using bind() works as I would expect: all_pushes completes entirely and then post_runner starts and completes. Not using bind() post_runner does not wait for all_pushes to complete and then mayhem ensues and I get run() missing X required positional arguments messages. Thoughts?
    import os
    import json
    import datetime
    from prefect import Flow, prefect, Parameter, task, unmapped, Task
    
    logger = prefect.context.get("logger")
    
    client_secret = os.getenv("CLIENT_SECRET")
    db_user = os.getenv("DB_USER")
    db_pass = os.getenv("DB_PASS")
    
    
    class PushDataTask(Task):
    
        def __init__(self,client_secret, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.client_secret = client_secret
    
        def run(self, run_name, data_root, collection_name):
            <http://logger.info|logger.info>("Run PushData " + run_name)
    
    
    class WrapperTask(Task):
        def __init__(self, db_user, db_pass, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.db_user = db_user
            self.db_pass = db_pass
    
        def run(self, run_names, data_root, collections_list):
    
            <http://logger.info|logger.info>("Start WrapperTask Runner")
    
            iterations = len(run_names)
            RenameTask(self.db_user, self.db_pass, name="push1").run(data_root, collections_list)
    
            for i in range(1, iterations):
                RenameTask(self.db_user, self.db_pass, name=f"push{i + 1}").run(data_root, collections_list)
    
            <http://logger.info|logger.info>("Finish WrapperTask Runner")
    
    
    class RenameTask(Task):
    
        def __init__(self, db_user, db_pass, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.db_user = db_user
            self.db_pass = db_pass
    
        def run(self, data_root, collections_list, rollback=False):
            <http://logger.info|logger.info>("Start Rename: " + self.name + " : " + data_root)
            # do real work 
            <http://logger.info|logger.info>("Finish Rename: " + self.name)
    
    
    push_data = PushDataTask(client_secret, name="push_data")
    post_runner = WrapperTask(db_user, db_pass, name="post_runner")
    
    with Flow("data_flow") as data_flow:
    
        mapped_run_name = Parameter("mapped_run_name", required=True)
        mapped_data_root = Parameter("mapped_data_root", required=True)
        mapped_collection_name = Parameter("mapped_collection_name", required=True)
    
        all_pushes = push_data.map(
            run_name=mapped_run_name,
            data_root=unmapped(mapped_data_root),
            collection_name=unmapped(mapped_collection_name)
        )
    
        post_runner.set_upstream(all_pushes)
        # post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
        post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
    
    demo_params = dict(
        mapped_run_name=["demo_push1", "demo_push2", "demo_push3"],
        mapped_data_root="<adl://data_data_data>",
        mapped_collection_name="",
    )
    
    data_flow.run(parameters=demo_params)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Felipe Saldana, can you try this syntax instead?
    all_pushes = push_data.map(
            run_name=mapped_run_name,
            data_root=unmapped(mapped_data_root),
            collection_name=unmapped(mapped_collection_name)
        )
        post_runner_task = post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
        post_runner_task.set_upstream(all_pushes)
    We don’t recommend the
    bind
    syntax in general
    f

    Felipe Saldana

    1 year ago
    you mentioned that to me the other day and that got me down this road 🙂 thanks for the heads up on that. Let me give this a try
    Kevin Kho

    Kevin Kho

    1 year ago
    I did? Was that when you were asking about multiple clocks/envs?
    f

    Felipe Saldana

    1 year ago
    I believe that was the one 😆
    That looked to do the trick ... tasks are running how I would expect
    [2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Starting task run...
    [2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Finished task run for task with final state: 'Mapped'
    [2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Starting task run...
    [2021-05-27 17:56:24+0000] INFO - prefect | Run PushData demo_push1
    [2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Finished task run for task with final state: 'Success'
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Starting task run...
    [2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push2
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Finished task run for task with final state: 'Success'
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Starting task run...
    [2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push3
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Finished task run for task with final state: 'Success'
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Starting task run...
    [2021-05-27 17:56:25+0000] INFO - prefect | Start WrapperTask Runner
    [2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push1 : <adl://data_data_data>
    [2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push1
    [2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push2 : <adl://data_data_data>
    [2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push2
    [2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push3 : <adl://data_data_data>
    [2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push3
    [2021-05-27 17:56:25+0000] INFO - prefect | Finish WrapperTask Runner
    [2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Finished task run for task with final state: 'Success'
    [2021-05-27 17:56:25+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Kevin Kho

    Kevin Kho

    1 year ago
    That would be weird. I don’t use
    bind
    myself xD
    f

    Felipe Saldana

    1 year ago
    I will keep that in mind ... thanks for the help!