Alexandru Anghel
06/06/2022, 3:08 PMfrom main.batch import job_template, create_bigquery_table, load_to_bq, KubernetesRun, Parameter, Flow, GCS, Schedule, CronClock
schedule = Schedule(clocks=[CronClock("*/5 * * * *")]) #every 5 minute
with Flow(name='alerts', schedule=schedule, storage=GCS(bucket="aino-prefect-batch"), run_config=KubernetesRun(labels=["dev"], job_template=job_template, image="<http://docker.io/alexanghel23/prefect-v0.2|docker.io/alexanghel23/prefect-v0.2>", image_pull_policy="Always")) as flow:
gcp_project = Parameter(name = 'gcp_project', default = 'myproject')
bigquery_dataset = Parameter(name = 'bigquery_dataset', default = 'batch_landing')
bigquery_table = Parameter(name = 'bigquery_table')
infer_schema = Parameter(name = 'infer_schema', default = True)
bigquery_fields = Parameter(name = 'bigquery_fields')
prometheus_query = Parameter(name = 'prometheus_query')
prometheus_start_date = Parameter(name = 'prometheus_start_date')
prometheus_end_date = Parameter(name = 'prometheus_end_date')
prometheus_step = Parameter(name = 'prometheus_step')
a = create_bigquery_table(gcp_project, bigquery_dataset, bigquery_table, infer_schema, prometheus_query, bigquery_fields)
load_to_bq(a, prometheus_query, prometheus_start_date, prometheus_end_date, prometheus_step, upstream_tasks=[a])
job_template={
"spec": {
"template": {
"spec":
{
"volumes": [
{
"name": "google-cloud-keys",
"secret":
{
"secretName": "gcp-cred"
}
}
],
"containers": [
{
"name": "flow",
"env": [
{
"name": "GOOGLE_APPLICATION_CREDENTIALS",
"value": "/var/secrets/google/key.json"
}
],
"volumeMounts": [
{
"name": "google-cloud-keys",
"mountPath": "/var/secrets/google"
}
]
}
]
}
}
}
}
Kevin Kho
06/06/2022, 4:08 PMAlexandru Anghel
06/07/2022, 6:40 AMKevin Kho
06/07/2022, 1:56 PMAlexandru Anghel
06/07/2022, 2:31 PMError: GraphQL error: Can not schedule a flow that has required parameters.
Still, i don't want to enable the schedule manually each time. Isn't supposed to enable it as soon as you mention the schedule parameter in the flow properties?
schedule = Schedule(clocks=[CronClock("*/5 * * * *")]) #every 5 minute
with Flow(name='alerts', schedule=schedule ...
Thanks!Kevin Kho
06/07/2022, 2:46 PMrequired=False
to your parametersAlexandru Anghel
06/07/2022, 3:49 PMKevin Kho
06/07/2022, 4:14 PMAlexandru Anghel
06/07/2022, 4:59 PM