Hi, new to Prefect and Python so apologies if what...
# ask-community
h
Hi, new to Prefect and Python so apologies if what I’m about to describe is a pathetically trivial/obvious problem. Here’s my environment: 1. Prefect Cloud for orchestration. 2. Azure Kubernetes Prefect Agent (note I don’t have an executor defined in Azure as I’m not sure how to do this yet, or whether I even need to, but it’s important that the entire Flow runs in Azure - ideally on the existing Agent if possible). Have imported KubernetesRun from
prefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also. 3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters) 4. Private API I need to call as part of a Prefect Task 5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script) 6. Have run the
prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally Here’s the journey I’ve been on: 1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub 2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set
prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else? 3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome! 4. Also tried importing the requests Python module to call the API. 5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please? Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance! * UPDATE * Have posted the script to the below thread as requested by @Kevin Kho Huw
k
Hi @Huw Ringer, could you move the code block into this thread so that we don’t crowd the main channel? I’ll look at this.
h
Thanks Kevin
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run prefect backend cloud CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
cur = con.cursor()
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("customer")
print(<http://qry_str.as|qry_str.as>_string(con))
cur.execute(qry_str)
rs = cur.fetchone()
rs
with Flow("api-postgres-test") as flow:
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
hello_task
api_test
sql_test
flow.register(project_name="Project")
k
You’re saying to don’t see the Flow in the Prefect Cloud UI? Are you looking at the right project? Do you register with
python file.py
? You flow looks good. Maybe you want to close the cursor though?
Do you see a message saying it was successful?
h
Can’t see it there, sorry. I registered it using the flow.register(project_name=“Project”) call at the end of the script above
k
But this is the Smythson project right? As seen in the top right? Do you have a project named Project?
flow.register(project_name="Project")
will register this flow in the Project project.
h
I changed the name of the Project in the script I posted, for obfuscation reasons
However, I think I can see what the issue is. Every time I hit run in PyCharm, I was actually re-registering the hello world script over and over again
This is because I did a ‘Save As’ for the revised code to my local GitHub repo directory and didn’t update the PyCharm Project settings
Doh!
So basically, what you’re saying is that if I actually run the right script in Python it should all work?
k
Yeah this script should register correctly. Nothing is wrong with your code. Except maybe close your cursor? You should see something like when you register.
h
Good catch about closing the cursor! Will do! Thanks Kevin
k
Wrong image posted earlier. Here is the right one
I registered with
flow.register("coiled-prefect")
and this was the output when I ran. This should tell you it registered successfully
h
I have spent hours wading through documentation trying to figure out what I was doing wrong, and all along it was a stupid IDE issue. Thanks for putting me out of my misery!
k
Ah sorry there is one thing to note here. The place of execution needs
psycopg2
. You can add a container to KubernetesRun to specify an image to run the flow on. I think you may need an image with
psycopg2
(and prefect) to get this working.
Happy to help! 🙂
h
Ah! That’s what I was afraid you were going to say
How do I go about doing that please? I take it the Prefect Agent alone can’t do it?
If you can please just point me in the right direction on how to set that up, I will take it from there
k
Let me look if there’s an easier way first one sec
Going back
Copy code
with Flow("api-postgres-test") as flow:
   hello_task()
   api_test()
   sql_test()
flow.storage = GitHub(
      repo="MyUser/MyRepo",
      path="Project/flows/api-postgres-test.py",
      access_token_secret="GITHUB_ACCESS_TOKEN"
   )
flow.run_config = KubernetesRun(labels=["prod"])
You need to call your tasks in your flow like this. Then set the storage and run config outside. You can also use this to install the EXTRA_PIP_PACKAGES. `
Copy code
flow.run_config = KubernetesRun(env={"EXTRA_PIP_PACKAGES": "psycopg2"})
You won’t need to make your own image this way
h
That sounds fantastic, thanks, Kevin - I’ll give it a try. Have a great weekend!
Fixed my PyCharm Project settings so was able to deploy the right code this time (🙄) and implemented Kevin’s suggestions above (thanks again Kevin!). Flow appeared to run successfully, but when I looked in the Logs there was no trace of any of the print output requested by the Tasks. Here’s the latest version of the code, for reference:
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret

# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor

# used for making API calls
import requests

# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True


@task
def hello_task():
   logger = prefect.context.get("logger")
   <http://logger.info|logger.info>("Hello world!")


@task
def api_test(log_stdout=True):
   api_call = requests.get("<https://www.askpython.com/>")
   print(api_call.status_code)


@task
def sql_test(log_stdout=True):
   dbconfig = Secret("fsdb").get()
   dbname = dbconfig['dbname']
   dbhost = dbconfig['host']
   dbuser = dbconfig['user']
   dbpassword = dbconfig['password']
   con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
              f"password={dbpassword} sslmode=require",
              cursor_factory=RealDictCursor)
   field_list = ["count(*)"]
   qry_str = sql.SQL("SELECT {} FROM {}").format(
      sql.SQL(",").join(map(sql.Identifier, field_list)),
      sql.Identifier("sap_customer"))
   print(qry_str.as_string(con))
   with con.cursor() as cur:
      cur.execute(qry_str)
      for record in cur:
         print(record)


with Flow("api-postgres-test") as flow:
   hello_task
   api_test
   sql_test


flow.storage = GitHub(
   repo="Ringerrr/SmartClient",
   path="Smythson/flows/api-postgres-test.py",
   access_token_secret="GITHUB_ACCESS_TOKEN"
   )
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="Smythson")
Whilst I’d really liked to believe it all did actually work ok, I’d also like to see evidence of it with my own eyes in the log output (does that make me a bad person?). Any ideas please where I’m going wrong above? Thanks in advance for your assistance….
Here’s a copy of the Flow Run Log, for reference:
z
Change from
@task
to
@task(log_stdout=True)
You want to provide that setting to the task decorator not your own function
h
Thanks very much for the suggestion Michael, but it doesn’t appear to have made any difference to the Logs output:
Am I using the prefect.config.cloud.send_flow_run_logs setting correctly? Or is there something else I’ve done wrong you think?
k
Are you calling the tasks? I think you might have to do:
Copy code
with Flow("api-postgres-test") as flow:
   hello_task()
   api_test()
   sql_test()
h
@Kevin Kho 🙏 that did the trick. Many thanks for your invaluable assistance
117 Views