Hi there! We are seeing that when registering flow...
# prefect-community
m
Hi there! We are seeing that when registering flows on Prefect1, the code inside the task gets executed. This makes some trouble for example when doing queries taking long time, as the task registering sort of times out. Is it thought to be this way?
👀 1
b
Hello Marcelo, are you seeing a flow run after registering the flow? Where do you see the tasks being executed?
m
Thanks for the asnwer Bianca,. I have made the following example:
test_py.py
There, I have defined a task, and I run that task for each name on name_list
On the real case, the name_list is not a static list, but rather a query run on the database, which takes long time
So If I have to execute tasks given a databaee answer, than can be long, how can I avoid the database query to be run when registering the flow?
b
Hello Marcelo! I reconfigured your flow a bit and got it to register without running the task. I added a '@task' decorator, which prevented the task from running at registration.
Copy code
import prefect
from prefect import task, Flow, Parameter

@task
def hello_task(name):
    print(f'Hi {name}!!')

with Flow(name = "TestFlow") as flow:
    name_list = ['Ricardo', 'Sebastian', 'Caldo Knorr', 'Pop']
    print("name_list ready")
    for name in name_list: 
        hello_task(name)
        
        

#Register the flow first
#flow.register(project_name="InsertProjectNameHere")

#Uncomment the following line after registration to run the flow
#flow.run()
m
Thanks for the answer Bianca!
We are using tasks defined as custom class and not using the @task decorator because we want each task to have a different name. Instead of having a lot of tasks with same name "Hi", we want each task to have a name like "Say hi to Ricardo", "Say Hi to Marcelo", etc
Tasks are executed based on a list of names, one for each name. But this list of names is retrieved from server, which takes long time and we dont to have that wait when registering the flow
I make some changes of the code to explain better the case:
Copy code
import os
import sys
import time
sys.path.append(os.getcwd()) ##### for testing
from flows.config_flows import get_flow_schedule, get_flow_storage, RUN_CONFIG
from prefect import Flow, Task, context
from prefect.executors import LocalDaskExecutor
from tqdm import tqdm


# This is the function that loads the tasks to be executed
# Names are not previously known, they are returned by server and could change
def get_names():
    name_list = ['Ricardo', 'Sebastian', 'Caldo Knorr', 'Pop']
    time.sleep(120) # some heavy process at the server
    return name_list

# Names are each processed by a task
# Task are defined this way so that we can change the name attribute of task
# And see tasks on the prefect cloud UI like "Say hi to Ricardo"
class ExtractArticlesInfo(Task):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def run(self, name):
        print(f'Hi {name}!!')

FLOW_NAME = "test"
SCHEDULE = get_flow_schedule(start_hour=14, hours=24)
STORAGE = get_flow_storage(flow_file="test.py")

with Flow(FLOW_NAME, schedule=SCHEDULE, storage=STORAGE, run_config=RUN_CONFIG, executor=LocalDaskExecutor(num_workers=3)) as flow:
    name_list = get_names()
    print("name_list ready")
    for name in name_list:
        print(name)
        time.sleep(100)
        print(f'new task for {name}')
        t = ExtractArticlesInfo(name=f"Say hi to {name}")
        t(name)