Hello again, all! I'm probably missing something, ...
# ask-community
k
Hello again, all! I'm probably missing something, but when I'm trying to run simple mapping example through agent, it won't finish. This is the flow definition:
Copy code
from prefect import Flow, task

@task
def add_ten(x):
    return x + 10

with Flow('simple map') as flow:
    mapped_result = add_ten.map([1, 2])

flow.register()
flow.run_agent()
When running the flow from UI, the task "add_ten (Parent)" is stuck in state "Mapped" (with description "Preparing to submit 2 mapped tasks"). What's wrong?
Copy code
April 3rd 2020,8:57:18am 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'simple map'
April 3rd 2020,8:57:18am 	prefect.CloudFlowRunner	INFO	Starting flow run.
April 3rd 2020,8:57:18am 	prefect.CloudFlowRunner	DEBUG	Flow 'simple map': Handling state change from Scheduled to Running
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task '[1, 2]': Handling state change from Pending to Running
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task '[1, 2]': Starting task run...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Starting to upload result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-645921-00-00...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Finished uploading result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-645921-00-00...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task '[1, 2]': Calling task.run() method...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task '[1, 2]': Handling state change from Running to Success
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task '[1, 2]': finished task run for task with final state: 'Success'
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten': Handling state change from Pending to Mapped
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task 'add_ten': Starting task run...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task 'add_ten[0]': Starting task run...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[0]': Handling state change from Pending to Running
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Finished uploading result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-808925-00-00...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Starting to upload result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-808925-00-00...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[0]': Calling task.run() method...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Starting to upload result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-864501-00-00...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Finished uploading result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-864501-00-00...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[0]': Handling state change from Running to Success
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task 'add_ten[0]': finished task run for task with final state: 'Success'
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	INFO	Task 'add_ten[1]': Starting task run...
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Starting to upload result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-949016-00-00...
April 3rd 2020,8:57:18am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[1]': Handling state change from Pending to Running
April 3rd 2020,8:57:18am 	prefect.LocalResultHandler	DEBUG	Finished uploading result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-18-949016-00-00...
April 3rd 2020,8:57:19am 	prefect.LocalResultHandler	DEBUG	Starting to upload result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-19-004015-00-00...
April 3rd 2020,8:57:19am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[1]': Calling task.run() method...
April 3rd 2020,8:57:19am 	prefect.LocalResultHandler	DEBUG	Finished uploading result to C:\Users\kamil\.prefect\results\prefect-result-2020-04-03t06-57-19-004015-00-00...
April 3rd 2020,8:57:19am 	prefect.CloudTaskRunner	DEBUG	Task 'add_ten[1]': Handling state change from Running to Success
April 3rd 2020,8:57:19am 	prefect.CloudTaskRunner	INFO	Task 'add_ten[1]': finished task run for task with final state: 'Success'
April 3rd 2020,8:57:20am 	agent	INFO	Submitted for execution: PID: 19872
s
It may be (my guess) that the mapped results are never consumed by a downstream task. If you think about it similar to how Python's
map()
function works, it's simply an iterable that isn't executed until something else needs the results.
k
I thought the same, but even if there's consuming task, the result is the same (upstream task "mapped", downstream task "pending")
j
The
add_ten (Parent)
task stays in a mapped state because all of it’s inputs were mapped over.
Mapped
state is a type of Success state. https://docs.prefect.io/api/latest/engine/state.html#state
If you notice, your Flow’s final state should be
Success
k
I modified the flow to include another step:
Copy code
from prefect import Flow, task


@task
def add_ten(x):
    return x + 10

@task
def reduce(x):
    return sum(x)


with Flow('simple map') as flow:
    mapped_result = add_ten.map([1, 2])
    final_result = reduce(mapped_result)

flow.register()
flow.run_agent()
The flow did not finish.
j
Hmm that’s weird because your code works for me
One of those classic “works on my machine” cases haha
k
I believe there may be something wrong with my setup, I also experienced some other untypical errors .
j
On the run agent command could you also set
show_flow_logs=True
and see if there are any weird logs?
k
The flow finished when
show_flog_logs=True
was used 🙂
😄 1
I'll try again with some clean environment...
After some more elaboration: the problem is not related to mapping (as originally thought), but with invoking tasks in general. For example, this flow won't finish, too:
Copy code
from prefect import Flow, task

@task
def simple_task():
    return 1

with Flow("flow") as flow:
    a1 = simple_task()
    a2 = simple_task()
    a3 = simple_task()
    a4 = simple_task()
    a5 = simple_task()
    a6 = simple_task()
    a7 = simple_task()
    a8 = simple_task()

flow.register()
flow.run_agent()
I'm seeing the problem only on my windows environment. When I run the agent in WSL container, it runs correctly.
The problem is present even when flow is run through Prefect Cloud.
j
Hey @Kamil Okáč could you open up an issue for this and detail your windows environment? I don’t have a windows machine to use to troubleshoot but I know of a few other users who do
k
Just looking at it, the symptoms are the same