https://prefect.io logo
Title
f

Fabio Scarpellini

04/21/2022, 7:41 PM
Hello There. I'm trying use two concepts HERE (Reuse single docker storage) and HERE (One flow with multiple parameters) to reuse tasks and create flows with different initial parameters. I'm asking because when I use the "store_as_script=True" raise an error that not found flow specified in registration. It's possible to do that, create a flow without necessary explicit declaration of flows (do like one flow with multiple parameters) ?
k

Kevin Kho

04/21/2022, 8:01 PM
I am thinking about this, and Anna may correct me but I don’t think this is possible easily. The short answer is that you should just use S3/Github storage or something like that for the Flow storage to hold the script. And then you put all of the dependencies in DockerRun and use that container. The reason I say this is because the One flow with multiple parameters creates a new file in storage, which is normally fine, but in the case of Docker storage, you have you rebuild your image each time and it seems very very hard to manage. You also have to store all the Flow files somewhere anyway before you can copy them in the container. Does that make sense?
n

Nate

04/21/2022, 9:26 PM
@Fabio Scarpellini one idea that may work for you on the second point is to deploy a single flow that has multiple clocks - where you can associate specific Parameter values with each clock like:
import datetime
from prefect.schedules import clocks, Schedule

now = datetime.datetime.utcnow()

clock1 = clocks.IntervalClock(
    start_date=now,
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"p": "CLOCK 1"},
)
clock2 = clocks.IntervalClock(
    start_date=now + datetime.timedelta(seconds=30),
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"p": "CLOCK 2"},
)

# the full schedule
schedule = Schedule(clocks=[clock1, clock2])

flow.schedule = schedule  # set the schedule on the Flow
flow.run()
a

Anna Geller

04/22/2022, 1:01 AM
the error you mentioned implies that you didn't include this line that adds the flow to storage before registration:
docker_storage.add_flow(flow)
f

Fabio Scarpellini

04/22/2022, 1:28 PM
Thank you everyone for the replies. Well, how Kevin wrote, maybe this solution is not so simple. Nate, with your suggestion I think that I will lose the independency to manage the flows. Ana, I did this but not resolve. Se the code below:
def registration_func(company):
with Flow(name=company, storage=docker_storage, run_config=kubernetes_run) as flow:
docker_storage.add_flow(flow)
flow.register(project_name="Dev", build=False)
a

Anna Geller

04/22/2022, 1:44 PM
having this as a separate function
registration_func
is the problem - your flow object won't be found by Prefect Can you try following my example from GitHub?
f

Fabio Scarpellini

04/22/2022, 1:58 PM
Ana, with your exemple works, but I need explicit declaration of the flow. How did you say, prefect doesn't find the object when I registered through the registration_func.
a

Anna Geller

04/22/2022, 3:44 PM
Can you explain the problem you try to solve here? why do you believe you need this extra function?
f

Fabio Scarpellini

04/22/2022, 5:34 PM
Sure. Well I have to execute a ETL process from a several brand's database. But how the process is the same for almost all brands I'm want reuse the tasks, where in the flow has only one parameter, brand's name. But to do this I need copy/paste all Flow block for a new brand and register then. I'm thinking if is possivel for the part copy/paste Flow, is possible use the solution that Kevin post in "One flow with multiple parameters" and not necessary replicate Flow block in the code. Let me know if I was clear.
a

Anna Geller

04/22/2022, 6:05 PM
Thanks for explaining your use case. I see two solutions that make sense here. One is what Nate mentioned - attaching several schedules with different parameters, one for each brand.
import datetime
from prefect.schedules import clocks, Schedule

now = datetime.datetime.utcnow()

clock1 = clocks.IntervalClock(
    start_date=now,
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"brand": "brand1"},
)
clock2 = clocks.IntervalClock(
    start_date=now + datetime.timedelta(seconds=30),
    interval=datetime.timedelta(minutes=1),
    parameter_defaults={"brand": "brand2"},
)

# the full schedule
schedule = Schedule(clocks=[clock1, clock2])

flow.schedule = schedule  # set the schedule on the Flow
another possibility is to use a parametrized flow of flows as shown in this Discourse topic it could look like this - your child flow is your main flow. Then your parent flow triggers multiple child flow runs, one for each brand:
mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("your_brand_flow"),
        project_name=unmapped("your_project_name"),
        parameters=["brand1", "brand2", "brand3"]
    )
:upvote: 1
f

Fabio Scarpellini

04/22/2022, 6:23 PM
Tks Ana. I will try it and return. 😃
👍 1