Felipe Saldana
05/25/2021, 1:47 AMpost_runner.set_upstream(all_pushes_mapped_results)
post_runner.bind(mapped_run_name,
mapped_gpudb_user,
mapped_gpudb_pass,
mapped_gpudb_host,
mapped_collection_name)
post_runner is a task itself so that I can actually get access to the parameters. Internally the runner task loops and creates a dynamic number tasks (if I do the same algorithm below directly in the flow context I dont have access to the parameters)
class RenameTaskRunner(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# other constructor logic
def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):
<http://logger.info|logger.info>("Start Rename Runner")
all_tasks = []
iterations = len(run_names)
all_tasks.append(RenameTask(name="push1_post"))
all_tasks[0].bind(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)
for i in range(1, iterations):
all_tasks.append(RenameTask(name=f"push{i + 1}_post"))
all_tasks[i].set_upstream(all_tasks[i - 1])
all_tasks[i].bind(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)
<http://logger.info|logger.info>("Finish Rename Runner")
return all_tasks
Below is the error. Is it possible to have RenameTaskRunner register the task with the given outer flow context? Can I pass a reference in to the constructor or some other idea?
[2021-05-25 01:31:50+0000] ERROR - prefect.TaskRunner | Unexpected error: ValueError("Could not infer an active Flow context while creating edge to <Task: push1_post>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `RenameTask(...).run(...)`")
Traceback (most recent call last):
Kevin Kho
Kevin Kho
Kevin Kho
Felipe Saldana
05/25/2021, 1:57 AMFelipe Saldana
05/25/2021, 1:58 AMKevin Kho
Felipe Saldana
05/25/2021, 2:00 AMKevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
Felipe Saldana
05/25/2021, 2:04 AMsub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)
Kevin Kho
StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)()
Felipe Saldana
05/25/2021, 2:05 AMsub_flow_task.run(parameters={"one_date": one_date})
Kevin Kho
.run()
method.Felipe Saldana
05/25/2021, 2:10 AMUnexpected error: ValueError('No Flow was passed, and could not infer an active Flow context.')
Kevin Kho
bind
attaches the task to a Flow (in general, we don’t recommend using it) but this is happening inside a Task. Flows run tasks and this task doesn’t know what Flow it’s part of. I don’t think you can use a Task to modify a Flow like this.Kevin Kho
RenameTask(name="push1_post").run()
, this will work inside a Task. It might be set_upstream
causing errors too because that modifies the Flow, which it doesnt seeKevin Kho
def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):
<http://logger.info|logger.info>("Start Rename Runner")
iterations = len(run_names)
RenameTask(name="push1_post").run(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)
for i in range(1, iterations):
RenameTask(name=f"push{i + 1}_post").run(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)
<http://logger.info|logger.info>("Finish Rename Runner")
return all_tasks
Felipe Saldana
05/25/2021, 2:18 AMFelipe Saldana
05/25/2021, 2:20 AMFelipe Saldana
05/25/2021, 2:22 AMset_upstream
?Kevin Kho
Kevin Kho
Felipe Saldana
05/25/2021, 2:24 AMKevin Kho
Felipe Saldana
05/25/2021, 2:26 AM[2021-05-25 02:25:20+0000] INFO - prefect.TaskRunner | Task 'post_runner': Starting task run...
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename Runner
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename: push1_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename: push1_post
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename: push2_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename: push2_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename Runner
[2021-05-25 02:25:20+0000] INFO - prefect.TaskRunner | Task 'post_runner': Finished task run for task with final state: 'Success'
[2021-05-25 02:25:20+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Felipe Saldana
05/25/2021, 2:29 AMKevin Kho
Kevin Kho
Felipe Saldana
05/25/2021, 2:35 AMFelipe Saldana
05/25/2021, 2:35 AMFelipe Saldana
05/25/2021, 3:33 AM