Cole Murray
07/08/2022, 3:27 AMfrom typing import List
from uuid import uuid4
from prefect.client import get_client
from prefect.flow_runners import SubprocessFlowRunner
from prefect.flows import Flow
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.schedules import CronSchedule
from workflow_etl.flows.flow import hello_world
def main():
prefect_client = get_client()
schedules = [] # TODO
for schedule in schedules:
flow_id = await prefect_client.create_flow(hello_world)
prefect_client.create_deployment(flow_id=flow_id,
name=schedule.id,
schedule=CronSchedule(cron="0 * * * * *"),
parameters={
**schedule.workflow_params
},
tags={
'owner_id': schedule.owner_id,
},
flow_runner=SubprocessFlowRunner(),
flow_data=# UNKNOWN WHAT TO PUT HERE
)
I’m a bit stuck on what to put in the flow_data argument. Anyone tried this / have a link to sample?from typing import List
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
from prefect.orion.schemas.schedules import CronSchedule
from workflow_etl.flows.flow import hello_world
def main():
schedules = [] # Redacted
for schedule in schedules:
spec = DeploymentSpec(
name=schedule.id,
flow=hello_world,
parameters={
**schedule.workflow_params
},
schedule=CronSchedule(cron=schedule.cron),
tags=[
f"owner_id={schedule.owner_id}",
],
flow_runner=SubprocessFlowRunner()
)
spec.validate()
spec.create()
if __name__ == '__main__':
main()
Anna Geller
07/08/2022, 7:41 AM