Hello again, Thanks for the help so far, i've bee...
# prefect-community
a
Hello again, Thanks for the help so far, i've been able to build the foundations for orchestrating some pipelines. However, i'm coming back with a question. I have a flow containing two tasks: the first one is creating a bigquery table and the second one is calling an API at seconds interval (while loop with sleep) and inserting the payload into the bigquery table. The problem i encounter is when i try to register the flow using
flow.register(project_name='my-project')
inside the python code. The while loop from the second task is preventing the register and it is unreachable. I want to register and run the flow using
python file.py
More code inside the thread. Is there a way to reach the register command of the flow inside the python code? Thank you!
Copy code
from main.streaming import *

flow_name = 'alerts'

gcp_project = "my-project" 
bigquery_dataset = "prefect_streaming"
bigquery_table = "alerts" 
infer_schema= True
bigquery_fields = ["alertname", "alertstate", "container", "namespace", "pod", "severity"]
prometheus_query = "ALERTS"
interval_seconds = 10


with Flow(
        name=flow_name, 
        storage=GCS(bucket="my-bucket"), 
        run_config=KubernetesRun(
            labels=["dev"], 
            job_template=job_template, 
            image="prefect-image:latest", 
            image_pull_policy="Always")
    ) as flow:


    table = create_bigquery_table(gcp_project, bigquery_dataset, bigquery_table, infer_schema, prometheus_query, bigquery_fields)
    
    load_to_bq(table, prometheus_query, interval_seconds, upstream_tasks=[table])

flow.register(project_name='prefct-project')
flow.register(project_name='prefct-project')
is not reached because load_to_bq task contains a while loop
A workaround would be to remove the flow.register part and run like this:
prefect register --project my-project --path alerts.py &&  prefect run --name alerts --project my-project --run-name alerts20220608
I am still wondering though if there is a way to reach the register part when there is a while loop inside a task.
k
I dont think the while loop inside a task should matter because it’s not executed during registration. What happens when you use
flow.register()
? It just hangs?
a
@Kevin Kho, It's actually registering the flow but no run is triggered. I have another similar flow without an infinite loop and it's triggering runs when i register it. I tried to add:
Copy code
flow.register(project_name='prefct-project')
flow.run()
but i think it's running it locally as it cannot reach the API i'm calling in the task. Do i need to add some parameters in the run function? I feel i'm missing something here.
k
Register should not trigger a run. You can trigger a run from the UI and it will go to an agent. So when you register, you don’t use `flow.run()`'