https://prefect.io logo
#prefect-community
Title
# prefect-community
j

John Song

09/10/2020, 8:08 PM
I am new to Prefect but experienced with airflow, I wrote a simple script but found my flow run multiple times automatically, sometimes 3 other times 6. Can't find anything in the community, please help. from prefect import Parameter, Flow, task print("***************") @task def get_bbp_data():     return "bbp_data" @task def get_sem_data():     return "sem_data" @task def align_bbp_sem_data(bbp, sem):     return "align_data" @task def generate_train_data(align):     return "train_data" with Flow("I5 ETL") as flow:     bbp_data = get_bbp_data()     sem_data = get_sem_data()     align_data = align_bbp_sem_data(bbp_data, sem_data)     train_data = generate_train_data(align_data) flow.visualize() flow.run()
d

Dylan

09/10/2020, 8:17 PM
Hi @John Song! We’re here to help 😄 - Can you send some of the logs that show your flow running multiple times? - Is this flow running with Prefect Cloud/Server by any chance or just locally?
j

John Song

09/10/2020, 8:46 PM
itWorkspace>python on 3.7.5 (tags/v3.7.5:5c02a39a0b, Oct 15 2019, 001134) [MSC v.1916 64 bit (AMD64)] on win32 "help", "copyright", "credits" or "license" for more information. from i5_data.workflow import pipeline *********** 0-09-10 204523] INFO - prefect.FlowRunner | Beginning Flow run for 'I5 ETL' prefect.FlowRunnerBeginning Flow run for 'I5 ETL' 0-09-10 204523] INFO - prefect.FlowRunner | Starting flow run. prefect.FlowRunnerStarting flow run. 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_bbp_data': Starting task run... prefect.TaskRunnerTask 'get_bbp_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_bbp_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_bbp_data': finished task run for task with final state: 'Success' 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_sem_data': Starting task run... prefect.TaskRunnerTask 'get_sem_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_sem_data': finished task run for task with final state: 'Success' 0-09-10 204523] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': Starting task run... prefect.TaskRunnerTask 'align_bbp_sem_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'align_bbp_sem_data': finished task run for task with final state: 'Success' 0-09-10 204523] INFO - prefect.TaskRunner | Task 'generate_train_data': Starting task run... prefect.TaskRunnerTask 'generate_train_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'generate_train_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'generate_train_data': finished task run for task with final state: 'Success' 0-09-10 204523] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded prefect.FlowRunnerFlow run SUCCESS: all reference tasks succeeded *********** 0-09-10 204523] INFO - prefect.FlowRunner | Beginning Flow run for 'I5 ETL' prefect.FlowRunnerBeginning Flow run for 'I5 ETL' 0-09-10 204523] INFO - prefect.FlowRunner | Starting flow run. prefect.FlowRunnerStarting flow run. 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_bbp_data': Starting task run... prefect.TaskRunnerTask 'get_bbp_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_bbp_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_bbp_data': finished task run for task with final state: 'Success' 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_sem_data': Starting task run... prefect.TaskRunnerTask 'get_sem_data': Starting task run... 0-09-10 204523] INFO - prefect.TaskRunner | Task 'get_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_sem_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': Starting task run... prefect.TaskRunnerTask 'align_bbp_sem_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'align_bbp_sem_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.TaskRunner | Task 'generate_train_data': Starting task run... prefect.TaskRunnerTask 'generate_train_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'generate_train_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'generate_train_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded prefect.FlowRunnerFlow run SUCCESS: all reference tasks succeeded *********** 0-09-10 204524] INFO - prefect.FlowRunner | Beginning Flow run for 'I5 ETL' prefect.FlowRunnerBeginning Flow run for 'I5 ETL' 0-09-10 204524] INFO - prefect.FlowRunner | Starting flow run. prefect.FlowRunnerStarting flow run. 0-09-10 204524] INFO - prefect.TaskRunner | Task 'get_bbp_data': Starting task run... prefect.TaskRunnerTask 'get_bbp_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'get_bbp_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_bbp_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.TaskRunner | Task 'get_sem_data': Starting task run... prefect.TaskRunnerTask 'get_sem_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'get_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'get_sem_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': Starting task run... prefect.TaskRunnerTask 'align_bbp_sem_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'align_bbp_sem_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'align_bbp_sem_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.TaskRunner | Task 'generate_train_data': Starting task run... prefect.TaskRunnerTask 'generate_train_data': Starting task run... 0-09-10 204524] INFO - prefect.TaskRunner | Task 'generate_train_data': finished task run for task with final state: 'Success' prefect.TaskRunnerTask 'generate_train_data': finished task run for task with final state: 'Success' 0-09-10 204524] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded prefect.FlowRunnerFlow run SUCCESS: all reference tasks succeeded
Here is the log it ran 3 times
d

Dylan

09/10/2020, 8:49 PM
Are multiple visualizations produced as well?
can you run
prefect diagnostics
please?
I am unable to replicate the multiple runs problem
j

John Song

09/10/2020, 9:06 PM
Yes, there are 3 plots as well
let me run this on linux server to see if it's the same
same on linux server
d

Dylan

09/10/2020, 9:09 PM
Would you mind running
prefect diagnositcs
?
This outputs some info that may help me assist you further
j

John Song

09/10/2020, 9:17 PM
INFOprefect.FlowRunnerFlow run SUCCESS: all reference tasks succeeded { "config_overrides": {}, "env_vars": [], "flow_information": { "environment": { "executor": true, "labels": false, "logger": true, "metadata": false, "on_exit": false, "on_start": false, "type": "LocalEnvironment" }, "result": {}, "schedule": {}, "storage": {}, "task_count": 4 }, "system_information": { "platform": "Windows-10-10.0.17134-SP0", "prefect_version": "0.12.6", "python_version": "3.7.5" } }
print(diagnostic_info(flow))
added above line, it printed out three times
def run_flow():     with Flow("I5 ETL") as flow:         bbp_data = get_bbp_data()         sem_data = get_sem_data()         align_data = align_bbp_sem_data(bbp_data, sem_data)         train_data = generate_train_data(align_data)     # flow.visualize()     flow.run()     print(diagnostic_info(flow))
Now I put flow run logic inside one method, then call it from outside
from i5_data.workflow import pipeline pipeline.run_flow()
the flow only ran once now, but ** still printed 3 times
somehow this file is being parsed 3 times
for now I am good, thanks for jumping in
d

Dylan

09/10/2020, 9:34 PM
Anytime!
2 Views