Marcelo Ortega
08/30/2022, 2:35 PMBianca Hoch
08/30/2022, 3:10 PMMarcelo Ortega
08/31/2022, 6:33 PMBianca Hoch
08/31/2022, 10:06 PMimport prefect
from prefect import task, Flow, Parameter
@task
def hello_task(name):
print(f'Hi {name}!!')
with Flow(name = "TestFlow") as flow:
name_list = ['Ricardo', 'Sebastian', 'Caldo Knorr', 'Pop']
print("name_list ready")
for name in name_list:
hello_task(name)
#Register the flow first
#flow.register(project_name="InsertProjectNameHere")
#Uncomment the following line after registration to run the flow
#flow.run()
Marcelo Ortega
09/01/2022, 1:57 PMimport os
import sys
import time
sys.path.append(os.getcwd()) ##### for testing
from flows.config_flows import get_flow_schedule, get_flow_storage, RUN_CONFIG
from prefect import Flow, Task, context
from prefect.executors import LocalDaskExecutor
from tqdm import tqdm
# This is the function that loads the tasks to be executed
# Names are not previously known, they are returned by server and could change
def get_names():
name_list = ['Ricardo', 'Sebastian', 'Caldo Knorr', 'Pop']
time.sleep(120) # some heavy process at the server
return name_list
# Names are each processed by a task
# Task are defined this way so that we can change the name attribute of task
# And see tasks on the prefect cloud UI like "Say hi to Ricardo"
class ExtractArticlesInfo(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self, name):
print(f'Hi {name}!!')
FLOW_NAME = "test"
SCHEDULE = get_flow_schedule(start_hour=14, hours=24)
STORAGE = get_flow_storage(flow_file="test.py")
with Flow(FLOW_NAME, schedule=SCHEDULE, storage=STORAGE, run_config=RUN_CONFIG, executor=LocalDaskExecutor(num_workers=3)) as flow:
name_list = get_names()
print("name_list ready")
for name in name_list:
print(name)
time.sleep(100)
print(f'new task for {name}')
t = ExtractArticlesInfo(name=f"Say hi to {name}")
t(name)