Hi everyone, I tryed to create FLow-of-Flows with...
# ask-community
r
Hi everyone, I tryed to create FLow-of-Flows with StartFlowRun. I made simple example https://www.codepile.net/pile/O0OLQXGb , but it gives a strange result. Each flow returns the Success status, but the flow "proccess" didn't run actually. How to fix it?
Copy code
[2021-06-18 12:33:54+0300] INFO - prefect.FlowRunner | Beginning Flow run for 'download'
[2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'check_file': Starting task run...
Checking file availability plip ...
[2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'check_file': Finished task run for task with final state: 'Success'
[2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
Downloading file plip ...
[2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'download_file': Finished task run for task with final state: 'Success'
[2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'Flow process': Starting task run...
[2021-06-18 12:33:55+0300] INFO - prefect.TaskRunner | Task 'Flow process': Finished task run for task with final state: 'Success'
[2021-06-18 12:33:55+0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
Hey @Rustam Iskenderov, if you go to Prefect Cloud, you will see queued runs for the flow "process". flow.run() will trigger StartFlowRun, which actually runs it on Prefect Cloud.
StartFlowRun by default just creates a flow run and considers it a success. If you want to wait for the completion of the flow run, you should use
StartFlowRun(wait=True)
r
@Kevin Kho , thank you for your answer! I see the flow "proccess" in Prefect Cloud. It's in status "Scheduled" at the time, when I ran it. Seems it's not going to run at all 😕. How to run it immediately?
k
You need an agent capable of picking up (this is true for all Flows). If you go to your command line, you can spin up an agent with
prefect local agent start
. You need to supply your own compute to run Prefect flows. The agent is the one responsible for pulling the Flow and running it on your compute. Docs
It will run immediately if there is an agent to pick it up
r
@Kevin Kho you right, I forgot to start an agent. So I've started it and got an another issue, when my flow ran:
Copy code
[2021-06-18 14:55:52,538] INFO - agent | Found 1 flow run(s) to submit for execution.
[2021-06-18 14:55:52,864] INFO - agent | Deploying flow run be11f960-23e9-411b-81be-3b0c87ab0545
[2021-06-18 14:55:52,868] ERROR - agent | Logging platform error for flow run be11f960-23e9-411b-81be-3b0c87ab0545
[2021-06-18 14:55:53,359] ERROR - agent | Error while deploying flow: FileNotFoundError(2, 'No such file or directory', None, 2, None)
Could you help with this error?
k
Is it the same flow you are using? Or did you change anything?
r
It's the same flow exactly
k
Oh I think it might be related to the
flow.run
at the end. Can you remove that bit and just leave the two
flow.register
? You’re starting this from the UI with the Quick Start right?
And then of course can you run it again to register again?
r
You’re starting this from the UI with the Quick Start right?
Yes, that's right
And then of course can you run it again to register again?
I found that the error appears only on my laptop. I tryed to run the same flow on server and it works. It seems, that it works, but It doesn't. I don't see the last print "Processing file plip ..." from the "process" task in the output:
Copy code
[2021-06-18 18:56:01+0300] INFO - prefect.FlowRunner | Beginning Flow run for 'download'
[2021-06-18 18:56:01+0300] INFO - prefect.TaskRunner | Task 'check_file': Starting task run...
Checking file availability plip ...
[2021-06-18 18:56:01+0300] INFO - prefect.TaskRunner | Task 'check_file': Finished task run for task with final state: 'Success'
[2021-06-18 18:56:01+0300] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
Downloading file plip ...
[2021-06-18 18:56:01+0300] INFO - prefect.TaskRunner | Task 'download_file': Finished task run for task with final state: 'Success'
[2021-06-18 18:56:01+0300] INFO - prefect.TaskRunner | Task 'Flow process': Starting task run...
[2021-06-18 18:56:03+0300] INFO - prefect.Flow process | Flow Run: <https://cloud.prefect.io/analytics-bstd-ru/flow-run/fed0ecfe-93c5-4f5d-adc9-837a8f7a6a5a>
[2021-06-18 18:56:13+0300] INFO - prefect.TaskRunner | SUCCESS signal raised: SUCCESS('fed0ecfe-93c5-4f5d-adc9-837a8f7a6a5a finished in state <Success: "All reference tasks succeeded.">')
[2021-06-18 18:56:13+0300] INFO - prefect.TaskRunner | Task 'Flow process': Finished task run for task with final state: 'Success'
[2021-06-18 18:56:13+0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
Oh gotcha ok let’s walk through what happens when you register. When you register, the
Flow
gets saved in
Storage
. When an agent runs the
Flow
, it retrieves the
Flow
from
Storage
and runs it.
Storage
can be S3 or Github, or Docker. The default is
Local
storage which saves the file locally. The agent is looking the file locally. The agent needs to run on the machine that you registered from if using the default
Local
storage.
The logs of subflows don’t propagate to the main flwo
r
@Kevin Kho thank you very much! I checked logs - everything is ok, it works. 🎉
k
Nice!