https://prefect.io logo
Title
a

Alon Barad

05/07/2023, 3:00 PM
Hey all! Just getting started with Prefect and I have a question Is there a way to use Prefect to create a base flow class that can execute tasks based on configuration? I couldn't find a way to create such a flow class and am currently only using the flow decorator on a flow function.
n

Nate

05/07/2023, 11:29 PM
hi @Alon Barad - can you explain more what you mean by this?
create a base flow class that can execute tasks based on configuration
it sounds like maybe you want to conditionally execute tasks in a flow based on some logic related to configuration, in which case I would think you should be able to do this with a standard
@flow
decorated function, but perhaps I'm not understanding your use case.
a

Alon Barad

05/08/2023, 7:21 AM
Hey @Nate, thanks for helping me I'll explain, for example, if I want to create a website crawler that has 2 generic steps, crawl, parse when I execute my flow, I want to decide which steps are enabled and only execute them The crawling part should get as arguments the website it should scrape, and a Spider (Scrapy), and it should crawl the website and save the raw data in an S3 bucket. The parsing part should be implemented for each website separately, this is why I want to inherit from a base flow class, it should fetch the raw data from the s3 bucket parse it, and save the parsed data in a database
class BaseFlow:
    def __init__(self, actions, config):
        self.actions = actions
        self.config = config

    @task(name="Crawl")
    def crawl(self, *args, **kwargs):
        raise NotImplementedError

    @task(name="Parse")
    def parse(self, *args, **kwargs):
        raise NotImplementedError
    
    @flow(name="Base Flow")
    def run(self):
        if 'crawl' in self.actions:
            self.crawl(**self.actions['crawl'])
        if 'parse' in self.actions:
            self.parse(**self.actions['parse'])
I want to be able to run every task \ the whole flow with a custom config JSON from the UI Thanks!
m

Miguel Moncada

05/08/2023, 7:28 AM
I might have missed something, but if I understood correctly you want to define a pipeline that accepts parameters and can be triggered (e.g: on-demand via API or UI) with this user-defined configuration, would that be right? In that case it'd be possible to simply create your process with the parent flow accepting parameter(s) and then create a deployment from it. When you want to trigger the deployment to create a flow run you can input your params values. For instance, in my case I'm having my flow accept a value "env" which I then parse with a task to pull prod/staging variables
@flow(name="parent_flow")
def parent_flow(env: str = "staging")
 # Logic to fetch runtime values depending on param from secret/variables
 ...
Apologies if this is not what you were looking for šŸ™‡
a

Alon Barad

05/08/2023, 7:33 AM
There aren't any flow class APIs?
m

Miguel Moncada

05/08/2023, 7:42 AM
I'm no expert so leaving this to anyone else in the community. But I do not see the need for you to have this setup. If you wanted to define a standard flow that can be shared across pipelines you could create a Python package project structure and have this defined in a "commons" module, if that makes sense šŸ¤” You could then call the common flow from other flows
.
ā”œā”€ā”€ __init__.py
ā”œā”€ā”€ _version.py
ā”œā”€ā”€ deployments
│   ā”œā”€ā”€ __init__.py
│   ā”œā”€ā”€ first_flow_deployment.py
└── flows
    ā”œā”€ā”€ __init__.py
    ā”œā”€ā”€ first_flow.py
    ā”œā”€ā”€ second_flow.py
    └── utils
        ā”œā”€ā”€ __init__.py
        ā”œā”€ā”€ commons
        │   ā”œā”€ā”€ __init__.py
        │   └── common_flow.py