Felipe Saldana
05/27/2021, 5:40 PMpost_runner.set_upstream(all_pushes)
post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
Using bind() works as I would expect: all_pushes completes entirely and then post_runner starts and completes.
Not using bind() post_runner does not wait for all_pushes to complete and then mayhem ensues and I get run() missing X required positional arguments messages.
Thoughts?Felipe Saldana
05/27/2021, 5:42 PMimport os
import json
import datetime
from prefect import Flow, prefect, Parameter, task, unmapped, Task
logger = prefect.context.get("logger")
client_secret = os.getenv("CLIENT_SECRET")
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASS")
class PushDataTask(Task):
def __init__(self,client_secret, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client_secret = client_secret
def run(self, run_name, data_root, collection_name):
<http://logger.info|logger.info>("Run PushData " + run_name)
class WrapperTask(Task):
def __init__(self, db_user, db_pass, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db_user = db_user
self.db_pass = db_pass
def run(self, run_names, data_root, collections_list):
<http://logger.info|logger.info>("Start WrapperTask Runner")
iterations = len(run_names)
RenameTask(self.db_user, self.db_pass, name="push1").run(data_root, collections_list)
for i in range(1, iterations):
RenameTask(self.db_user, self.db_pass, name=f"push{i + 1}").run(data_root, collections_list)
<http://logger.info|logger.info>("Finish WrapperTask Runner")
class RenameTask(Task):
def __init__(self, db_user, db_pass, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db_user = db_user
self.db_pass = db_pass
def run(self, data_root, collections_list, rollback=False):
<http://logger.info|logger.info>("Start Rename: " + self.name + " : " + data_root)
# do real work
<http://logger.info|logger.info>("Finish Rename: " + self.name)
push_data = PushDataTask(client_secret, name="push_data")
post_runner = WrapperTask(db_user, db_pass, name="post_runner")
with Flow("data_flow") as data_flow:
mapped_run_name = Parameter("mapped_run_name", required=True)
mapped_data_root = Parameter("mapped_data_root", required=True)
mapped_collection_name = Parameter("mapped_collection_name", required=True)
all_pushes = push_data.map(
run_name=mapped_run_name,
data_root=unmapped(mapped_data_root),
collection_name=unmapped(mapped_collection_name)
)
post_runner.set_upstream(all_pushes)
# post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
demo_params = dict(
mapped_run_name=["demo_push1", "demo_push2", "demo_push3"],
mapped_data_root="<adl://data_data_data>",
mapped_collection_name="",
)
data_flow.run(parameters=demo_params)
Kevin Kho
all_pushes = push_data.map(
run_name=mapped_run_name,
data_root=unmapped(mapped_data_root),
collection_name=unmapped(mapped_collection_name)
)
post_runner_task = post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
post_runner_task.set_upstream(all_pushes)
Kevin Kho
bind
syntax in generalFelipe Saldana
05/27/2021, 5:52 PMKevin Kho
Felipe Saldana
05/27/2021, 5:55 PMFelipe Saldana
05/27/2021, 5:56 PM[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Starting task run...
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data': Finished task run for task with final state: 'Mapped'
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Starting task run...
[2021-05-27 17:56:24+0000] INFO - prefect | Run PushData demo_push1
[2021-05-27 17:56:24+0000] INFO - prefect.TaskRunner | Task 'push_data[0]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push2
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[1]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Run PushData demo_push3
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'push_data[2]': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Starting task run...
[2021-05-27 17:56:25+0000] INFO - prefect | Start WrapperTask Runner
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push1 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push1
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push2 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push2
[2021-05-27 17:56:25+0000] INFO - prefect | Start Rename: push3 : <adl://data_data_data>
[2021-05-27 17:56:25+0000] INFO - prefect | Finish Rename: push3
[2021-05-27 17:56:25+0000] INFO - prefect | Finish WrapperTask Runner
[2021-05-27 17:56:25+0000] INFO - prefect.TaskRunner | Task 'post_runner': Finished task run for task with final state: 'Success'
[2021-05-27 17:56:25+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Kevin Kho
bind
myself xDFelipe Saldana
05/27/2021, 5:58 PM