Huw Ringer
07/02/2021, 4:53 PMprefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also.
3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters)
4. Private API I need to call as part of a Prefect Task
5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script)
6. Have run the prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally
Here’s the journey I’ve been on:
1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub
2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else?
3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome!
4. Also tried importing the requests Python module to call the API.
5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please?
Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance!
* UPDATE * Have posted the script to the below thread as requested by @Kevin Kho
HuwKevin Kho
Huw Ringer
07/02/2021, 5:06 PMHuw Ringer
07/02/2021, 5:07 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run prefect backend cloud CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
cur = con.cursor()
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("customer")
print(<http://qry_str.as|qry_str.as>_string(con))
cur.execute(qry_str)
rs = cur.fetchone()
rs
with Flow("api-postgres-test") as flow:
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
hello_task
api_test
sql_test
flow.register(project_name="Project")
Kevin Kho
python file.py
? You flow looks good. Maybe you want to close the cursor though?Kevin Kho
Huw Ringer
07/02/2021, 5:10 PMKevin Kho
Kevin Kho
flow.register(project_name="Project")
will register this flow in the Project project.Huw Ringer
07/02/2021, 5:15 PMHuw Ringer
07/02/2021, 5:16 PMHuw Ringer
07/02/2021, 5:17 PMHuw Ringer
07/02/2021, 5:17 PMHuw Ringer
07/02/2021, 5:18 PMKevin Kho
Huw Ringer
07/02/2021, 5:20 PMKevin Kho
Kevin Kho
flow.register("coiled-prefect")
and this was the output when I ran. This should tell you it registered successfullyHuw Ringer
07/02/2021, 5:21 PMKevin Kho
psycopg2
. You can add a container to KubernetesRun to specify an image to run the flow on. I think you may need an image with psycopg2
(and prefect) to get this working.Kevin Kho
Huw Ringer
07/02/2021, 5:24 PMHuw Ringer
07/02/2021, 5:24 PMHuw Ringer
07/02/2021, 5:25 PMKevin Kho
Kevin Kho
with Flow("api-postgres-test") as flow:
hello_task()
api_test()
sql_test()
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
Kevin Kho
flow.run_config = KubernetesRun(env={"EXTRA_PIP_PACKAGES": "psycopg2"})
You won’t need to make your own image this wayHuw Ringer
07/02/2021, 5:49 PMHuw Ringer
07/02/2021, 11:42 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("sap_customer"))
print(qry_str.as_string(con))
with con.cursor() as cur:
cur.execute(qry_str)
for record in cur:
print(record)
with Flow("api-postgres-test") as flow:
hello_task
api_test
sql_test
flow.storage = GitHub(
repo="Ringerrr/SmartClient",
path="Smythson/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="Smythson")
Whilst I’d really liked to believe it all did actually work ok, I’d also like to see evidence of it with my own eyes in the log output (does that make me a bad person?).
Any ideas please where I’m going wrong above? Thanks in advance for your assistance….Huw Ringer
07/02/2021, 11:50 PMZanie
@task
to @task(log_stdout=True)
Zanie
Huw Ringer
07/03/2021, 12:29 AMHuw Ringer
07/03/2021, 12:30 AMKevin Kho
with Flow("api-postgres-test") as flow:
hello_task()
api_test()
sql_test()
Huw Ringer
07/03/2021, 7:37 PM