Hello, I am hoping for some guidance here. After a...
# ask-community
f
Hello, I am hoping for some guidance here. After a mapped task runs in parallel I need to run a different task sequentially a dynamic number of times with Parameters. Is this possible? The below is in the flow context ... with Flow
Copy code
post_runner.set_upstream(all_pushes_mapped_results)
    post_runner.bind(mapped_run_name,
        mapped_gpudb_user,
        mapped_gpudb_pass,
        mapped_gpudb_host,
        mapped_collection_name)
post_runner is a task itself so that I can actually get access to the parameters. Internally the runner task loops and creates a dynamic number tasks (if I do the same algorithm below directly in the flow context I dont have access to the parameters)
Copy code
class RenameTaskRunner(Task):
    def __init__(self, *args, **kwargs):

        super().__init__(*args, **kwargs)

        # other constructor logic

    def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):

        <http://logger.info|logger.info>("Start Rename Runner")

        all_tasks = []
        iterations = len(run_names)

        all_tasks.append(RenameTask(name="push1_post"))
        all_tasks[0].bind(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)

        for i in range(1, iterations):
            all_tasks.append(RenameTask(name=f"push{i + 1}_post"))
            all_tasks[i].set_upstream(all_tasks[i - 1])
            all_tasks[i].bind(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)

        <http://logger.info|logger.info>("Finish Rename Runner")

        return all_tasks
Below is the error. Is it possible to have RenameTaskRunner register the task with the given outer flow context? Can I pass a reference in to the constructor or some other idea?
Copy code
[2021-05-25 01:31:50+0000] ERROR - prefect.TaskRunner | Unexpected error: ValueError("Could not infer an active Flow context while creating edge to <Task: push1_post>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `RenameTask(...).run(...)`")
Traceback (most recent call last):
k
Hey @Felipe Saldana, have you seen Task Looping?
I went over the code. Any reason why you’re using the imperative API instead of function? I think in general you want to use Task Looping to handle this and take in a parameter to determine the number of iterations.
Just a friendly reminder to move the large code block to the thread next time to now crowd the main channel 🙂
👍 1
f
I saw you and another member in the community discussing it. I have not tried it yet though. The conversation I saw used Task Looping with a sub flow. Can I use Task Looping with out a subflow?
k
By subflow do you mean StartFlowRun or you just mean looping within a task that is inside a Flow?
f
StartFlowRun
k
I think the thread you linked is exactly looping over StartFlowRun with a code example.
His use case is he has a Parameter that takes in a list of dates. For each date, he wants to run StartFlowRun (looping in order)
Note that you’ll need the idempotency key to create the new flow runs. Otherwise, only one will be created. My latest example in that thread is the fully working, complete example
f
I want to do the _loop_over_dates_ for sure but I would like to know if this would still work without
Copy code
sub_flow_task = StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)
k
I believe it will, just note that if you call it within Flow, you use
StartFlowRun(project_name="aws", flow_name="sub-flow", wait=True)()
f
and use/call DoSomething directly instead of
Copy code
sub_flow_task.run(parameters={"one_date": one_date})
k
If you want to use a task inside another task, you just need to call the
.run()
method.
f
if I change my code to use run() instead of bind() I then get
Copy code
Unexpected error: ValueError('No Flow was passed, and could not infer an active Flow context.')
k
I think the issue with the code is that
bind
attaches the task to a Flow (in general, we don’t recommend using it) but this is happening inside a Task. Flows run tasks and this task doesn’t know what Flow it’s part of. I don’t think you can use a Task to modify a Flow like this.
But if you just do
RenameTask(name="push1_post").run()
, this will work inside a Task. It might be
set_upstream
causing errors too because that modifies the Flow, which it doesnt see
Can you try something like this? Not 100% sure if it’ll work
Copy code
def run(self, run_names, gpudb_user, gpudb_pass, gpudb_host, collections_list):
        <http://logger.info|logger.info>("Start Rename Runner")

        iterations = len(run_names)
        RenameTask(name="push1_post").run(gpudb_user, gpudb_pass, gpudb_host[0], collections_list)

        for i in range(1, iterations):
            RenameTask(name=f"push{i + 1}_post").run(gpudb_user, gpudb_pass, gpudb_host[i], collections_list)
        <http://logger.info|logger.info>("Finish Rename Runner")
        return all_tasks
f
let me give it a try
wow ...no errors and I see my log messages
will this run sequentially though .... without using
set_upstream
?
k
I believe it will because it’s just running the Python code underneath now for the original function, which waits by default. It’s just normal Python code being executed there.
Attach a logger to the loop and I guess we’ll find out?
f
yup ... will do
k
This is way easier than task looping 😆
😆 1
f
Copy code
[2021-05-25 02:25:20+0000] INFO - prefect.TaskRunner | Task 'post_runner': Starting task run...
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename Runner
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename: push1_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename: push1_post
[2021-05-25 02:25:20+0000] INFO - prefect | Start Rename: push2_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename: push2_post
[2021-05-25 02:25:20+0000] INFO - prefect | Finish Rename Runner
[2021-05-25 02:25:20+0000] INFO - prefect.TaskRunner | Task 'post_runner': Finished task run for task with final state: 'Success'
[2021-05-25 02:25:20+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
👍 1
On that note what are your thoughts on Task Looping vs what we just did?
k
I don’t know what to think at the moment lol, but the earlier thread we looked at initially tried to do it this way I believe but it didn’t work. It might be specifically because of StartFlowRun and waiting but I’m not sure. Task looping also does provide observability and retries for each individual iteration in the loop.
And failure handling and stuff so I guess it depends if you need those features of Prefect
f
That makes sense. I may try Task looping too (without the StartFlowRun - subtask).
Thanks for the quick help!
👍 1
FYI I got the Task Loop to process as well. Although Task Loop requires more work to keep track with the parameters