Alexandru Anghel
06/08/2022, 2:11 PMflow.register(project_name='my-project')
inside the python code. The while loop from the second task is preventing the register and it is unreachable. I want to register and run the flow using python file.py
More code inside the thread.
Is there a way to reach the register command of the flow inside the python code? Thank you!from main.streaming import *
flow_name = 'alerts'
gcp_project = "my-project"
bigquery_dataset = "prefect_streaming"
bigquery_table = "alerts"
infer_schema= True
bigquery_fields = ["alertname", "alertstate", "container", "namespace", "pod", "severity"]
prometheus_query = "ALERTS"
interval_seconds = 10
with Flow(
name=flow_name,
storage=GCS(bucket="my-bucket"),
run_config=KubernetesRun(
labels=["dev"],
job_template=job_template,
image="prefect-image:latest",
image_pull_policy="Always")
) as flow:
table = create_bigquery_table(gcp_project, bigquery_dataset, bigquery_table, infer_schema, prometheus_query, bigquery_fields)
load_to_bq(table, prometheus_query, interval_seconds, upstream_tasks=[table])
flow.register(project_name='prefct-project')
flow.register(project_name='prefct-project')
is not reached because load_to_bq task contains a while loopprefect register --project my-project --path alerts.py && prefect run --name alerts --project my-project --run-name alerts20220608
I am still wondering though if there is a way to reach the register part when there is a while loop inside a task.Kevin Kho
flow.register()
? It just hangs?Alexandru Anghel
06/08/2022, 3:50 PMflow.register(project_name='prefct-project')
flow.run()
but i think it's running it locally as it cannot reach the API i'm calling in the task. Do i need to add some parameters in the run function? I feel i'm missing something here.Kevin Kho