https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    João Amorim

    06/30/2021, 2:19 PM
    Hi, i am using loguru to generate the logs of my scripts, i can stream this logs files to the logger of prefect?
    k
    3 replies · 2 participants
  • p

    Prathamesh

    06/30/2021, 3:46 PM
    Hi all I'm using the Prefect GraphQL API programmatically through a Python Client to search for flow and task statuses. Schedule interval is 15 minutes and runs 24/7. It was working fine for a few days but has now suddenly stopped working with the following error: prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured. Does Prefect have API limits for the GraphQL db without an API key? or/and any limit with the API key? Can I use a Service Account API Key? or should I use a User API Key?
    k
    1 reply · 2 participants
  • k

    Kevin Kho

    06/30/2021, 4:02 PM
    Jeremiah and I are live! Listed to us talk about Prefect and Coiled, and the new KV Store here.
    👍 1
    👏 2
    👀 2
  • y

    Yanina Libenson

    06/30/2021, 6:12 PM
    hey! can I use prefect.tasks.aws.s3.S3Upload to upload a parquet file?
    k
    3 replies · 2 participants
  • j

    Jeff Baatz

    06/30/2021, 8:34 PM
    Hey everyone. I'm working on setting up a Docker agent on an on-prem server so that we can run some flows which require GPU access. Unfortunately, I'm having trouble working out how I can make the GPUs available to the image at startup. The docker version we're using supports passing them through with a
    --gpus
    flag. But setting that flag doesn't seem to be supported. Has anyone run into and implemented anything similar?
    k
    m
    16 replies · 3 participants
  • d

    Dan Zhao

    06/30/2021, 9:03 PM
    Hi everyone, I am currently in the process of splitting a giant DAG into logically separate flows but I am running into this issue. I have 2 flows, but one task in Flow2 needs output from a task in Flow1. Does anyone know what is the easiest way to do this?
    k
    11 replies · 2 participants
  • d

    Darshan

    06/30/2021, 10:06 PM
    Hello, Trying to find any resources regarding Prefect and Spark (not on databricks but EMR) Is there any example in docs showcasing how to submit spark job from prefect task ? Appreciate your help !!
    k
    3 replies · 2 participants
  • b

    Ben Muller

    07/01/2021, 5:47 AM
    Hey community, Does anyone have a nice way to test functions that are decorated as a
    task
    with arguments? I want to test the functions logic but unfortunately when
    pytest
    imports the function it is calling the decorators logic for the
    result=
    argument, which calls an external dependancy. I have tried to mock the task decorator but I cant seem to crack it.
    a
    k
    +1
    12 replies · 4 participants
  • f

    Fabrice Toussaint

    07/01/2021, 8:07 AM
    Hi everyone, I have a question regarding CSV streaming: Is it possible to do this with Prefect? I have a 24TB file which I want to cut in batches and for each batch do two tasks (some transforming, then writing). The first task would be to stream the csv file (https://www.heatonresearch.com/content/csv_file.html). The main thing is that I do not want to load everything in RAM!
    j
    k
    2 replies · 3 participants
  • l

    Laura Vaida

    07/01/2021, 8:39 AM
    hi everyone! I want to install a module callesd"sqlalchemy" (snowflake connector) into my docker_image on googlecloud, because getting that error:
    /opt/prefect/healthcheck.py:151: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
      flows = cloudpickle_deserialization_check(flow_file_paths)
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 151, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
    ModuleNotFoundError: No module named 'snowflake.sqlalchemy'
    Does anybody have experience with that? I tried the following:
    flow.storage = Docker(registry_url="xxx", image_name="xxx",
                              python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "oauth2client",
                                                   "snowflake-connector-python", "pyarrow", "fastparquet"],
                              secrets=["GCP_CREDENTIALS"], files={"C:/Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages/sqlalchemy": "/modules/sqlachemy.py"})
    ✅ 1
    m
    8 replies · 2 participants
  • p

    Piotr Karnasiewicz

    07/01/2021, 10:02 AM
    Hey. We are investigating Prefect to our needs and so far it looks really awesome. However we encountered strange issue. Let’s say we have registered 2 flows which share the same task. Now we receive an error:
    KeyError: 'Task slug []-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
    And this error always occurs only for the flow which was registered as first. Any help?
    m
    k
    14 replies · 3 participants
  • d

    Diogo Martins

    07/01/2021, 10:03 AM
    Hi there, I'm trying to create a flow and project via VS Code to the Prefect Cloud but I'm stuck. I'm following this 2 tutorials: 1-https://docs.prefect.io/orchestration/tutorial/overview.html#create-an-api-key 2-https://docs.prefect.io/orchestration/tutorial/first.html#creating-a-project But I'm getting this error when trying to create a project via VS Code: -ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured. Error creating project And this one when trying to register a flow running the hello_flow.py: - ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured. I can see that the error is because of the API Keys and Service Account Keys but in your tutorial you don't say how am I suposed to put this keys in the hello_flow.py script.
    m
    15 replies · 2 participants
  • s

    Sumit Kumar Rai

    07/01/2021, 12:09 PM
    Hello, how can I have a task that clones a GitHub repository? I can't find it in the GiHub tasks. I want to run dbt project in ECSRun and I need to clone the latest project code before running dbt commands.
    s
    k
    2 replies · 3 participants
  • m

    Matthias Roels

    07/01/2021, 2:05 PM
    I was scrolling through the issues on GitHub and found this one: https://github.com/PrefectHQ/prefect/issues/4379. Is there any update on whether this will be picked up soon?
    k
    m
    6 replies · 3 participants
  • b

    Brad I

    07/01/2021, 3:24 PM
    Hi, I was wondering if anyone had a good example of subscribing to a flow’s running status using graphql subscriptions? When we tried it, the api just returns an error, but it works as a query request:
    subscription {
      flow_run_state_by_pk(id: "848a9771-4962-405b-86f1-2d4561ffffff") {
        state
      }
    }
    
    result:
    
    {
      "errors": [
        {
          "path": [
            "flow_run_state_by_pk"
          ],
          "message": "Cannot read property 'flow_run_state_by_pk' of undefined",
          "extensions": {
            "code": "INTERNAL_SERVER_ERROR"
          }
        }
      ],
      "data": {
        "flow_run_state_by_pk": null
      }
    }
    k
    3 replies · 2 participants
  • t

    thebuleon29

    07/01/2021, 3:47 PM
    Hi, I have the following code:
    import prefect
    from prefect import Flow, task
    from prefect.tasks.kubernetes.job import RunNamespacedJob
    
    
    start_body = {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {
            "name": "start"
        },
        "spec": {
            "template": {
                "spec": {
                    "containers": [
                        {
                            "name": "start",
                            "image": "alpine",
                            "command": [
                                "echo",
                                "start"
                            ]
                        }
                    ],
                    "restartPolicy": "Never"
                }
            },
            "backoffLimit": 4
        }
    }
    
    @task
    def print_kube_res(x):
        print(str(x))
    
    with Flow("Kubernetes Job") as flow:
        start = RunNamespacedJob(body=start_body, kubernetes_api_key_secret=None)
        p = print_kube_res(start)
        start.set_downstream(p)
    But here the print task prints 'None'. How do i get the result from a Kubernetes job ?
    k
    1 reply · 2 participants
  • l

    Leon Kozlowski

    07/01/2021, 3:50 PM
    Hi - I am looking to use Docker storage, but I have a separate CI/CD workflow for pushing images to my docker registry — Is there any specific configurations I should be passing to denote that?--Or should I just omit the
    registry_url
    ?
    k
    9 replies · 2 participants
  • b

    Ben Collier

    07/01/2021, 3:59 PM
    Hi all - quick question for you - we’re running a flow which is running a lambda function. Works fine in test with serverless framework, but on AWS itself, the .run() function is simply freezing and then never returning. It doesn’t seem to ever timeout, and we’re not getting an error message back. Does anyone have experience of similar issues - we’re assuming it’s a timeout due to permissions and checking IAM roles, security groups and so on, but theoretically these are set correctly. Are there any obvious gotchas or flags we could use to debug this wrapped function?
    k
    16 replies · 2 participants
  • l

    Laura Vaida

    07/01/2021, 4:09 PM
    hi @all, im trying to use a PrefectSecret from the UI as kwargs for a config, but im getting an error with the following code, does anybody know?
    ok, i tried with the following:
    
    @task(log_stdout=True)
    def create_engine(snowflake_salesforce):
        config = configparser.ConfigParser()
        engine=create_engine(URL(**config[snowflake_salesforce]))
    with Flow('UWG-Mail') as flow:
        snowflake_credentials=PrefectSecret("Snowflake_Salesforce")
        connection=create_engine(snowflake_salesforce=snowflake_credentials)
    error:
    Unexpected error: TypeError("unhashable type: 'dict'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "<input>", line 36, in create_engine
      File "/usr/local/lib/python3.8/configparser.py", line 959, in __getitem__
        if key != self.default_section and not self.has_section(key):
      File "/usr/local/lib/python3.8/configparser.py", line 668, in has_section
        return section in self._sections
    TypeError: unhashable type: 'dict'
    ✅ 1
    s
    4 replies · 2 participants
  • n

    Nicholas Chammas

    07/01/2021, 5:50 PM
    I have a
    Parameter
    being fed into a
    DatabricksRunNow
    Task
    as a
    notebook_param
    . I can see the edge between the parameter and the task in the UI fine, but for whatever reason the Databricks notebook is not receiving the parameter value when I run the flow. What should I try to troubleshoot the reason why the notebook task is not receiving parameter values?
    k
    13 replies · 2 participants
  • g

    Gabriel Montañola

    07/01/2021, 6:48 PM
    Hi there folks, how you doing? Could you share some tips or recommended practices to use shared tasks within flows? I'm using
    Github Storage
    +
    KubernetesRun
    and I'm not sure of how to handle this in an elegant way. Should I structure my project as a python package and
    pip install -e .
    inside a Dockerfile? I want to use tasks/functions defined at
    /tasks
    on my flows to avoid repetition.
    my-happy-project/
    ├── flows/
    │   ├── flow_1.py
    │   └── flow_2.py
    │
    └── tasks/
        ├── shared_stuff_1.py
        └── shared_stuff_2.py
    k
    14 replies · 2 participants
  • b

    Ben Muller

    07/01/2021, 7:34 PM
    Morning, is there anyone I can reach out to in order to change my email on my account? I signed up when I was just playing about and now need to change over to my work email.
    n
    1 reply · 2 participants
  • d

    Dave Nielsen

    07/01/2021, 10:56 PM
    Hey folks, @Kevin Kho is giving a Prefect talk at the Subsurface (free virtual conference) on July 21st: • Orchestrating Data Validation Workflows with Prefect - by @Kevin Kho And other open source Cloud Data Lake talks too, chuck as: Arrow Flight, Iceberg and Nessie; and a panel about Open Data Architecture with some project leads Full-disclosure, I'm one of the organizers
    :upvote: 6
    🏀 1
    👍 3
    :marvin: 5
  • f

    Felipe Saldana

    07/01/2021, 11:01 PM
    Hey folks. I feel like I am running into something that might be a bit of a nuance with Label names. In Prefect cloud, I am seeing "It looks like no currently running Agent has this Flow's full set of labels". I believe we may have had an extra trailing space(?) at some point during the initial flow registration that we attempted to address a few days back. So, I went to the Interactive API section to query the given flow to verify the label and the graphQL is returning null for the label. I was expecting to see some sort of value for the flow label. Using graphql I could see that the agent had the correct label with no spaces. Any ideas?
    j
    4 replies · 2 participants
  • a

    Amanda Wee

    07/02/2021, 12:12 AM
    Hi friends, I have a bunch of ETLs with a similiar set of preliminary tasks and closing tasks, and so I wrote a
    get_etl_flow
    function that takes a callback function that sets up the ETL-specific tasks, and thus defines and returns a flow by setting up these common tasks and calling the callback function. Then I have a
    register_flow
    function and a
    run_flow
    function that calls
    get_etl_flow
    and registers or runs the flow returned, respectively. Each group of related ETLs are grouped into a prefect project in a single Python script, and
    register_flow
    is called for each ETL in the main function. If I want to run a particular flow manually, I have to change that particular flow's
    register_flow
    call to
    run_flow
    and comment out the other
    register_flow
    calls, which is cumbersome. How can I make use of the new CLI for running flows such that my infrastructure can still register flows and run them via an agent, while I can for debugging etc run individual flows using agentless execution, without having to comment out code, despite there being multiple flows defined in a single file?
    d
    k
    2 replies · 3 participants
  • d

    davzucky

    07/02/2021, 7:18 AM
    How I can set a serializer to a task without setting up the result? We can have multiple storage for our flows which are specify at the flow level. We would like it task to serializer using pandas serializer however we can only do it using the result of the task? Any solution?
    k
    m
    15 replies · 3 participants
  • i

    Igor Kaluđer

    07/02/2021, 8:58 AM
    Hi, is there a way to upgrade local agents without interrupting the currently running flows?
    k
    3 replies · 2 participants
  • d

    DK

    07/02/2021, 4:21 PM
    This seems like a basic question, but is there a way to end/return a Flow early based on a condition with the Case control flow class? For example, I have a task that queries a DB and returns the results. The next task creates a file with those results. I'd like to add a condition that does something like this:
    with case(results, None):
        return # End the Flow
    The examples with the Case Control Flow class always show an alternative task to run:
    with case(cond, "a"):
            run_if_cond_is_a()
    
        with case(cond, "b"):
            run_if_cond_is_b()
    But I don't always have another task, sometimes I just need the flow to end. I could check the inverse of 'results' to see if there is data instead of for None, and I could then run the subsequent tasks, but then I would need another task to check that condition. That's fine if that's how it needs to be done, but I just wanted to check if there was a simpler solution that I'm missing.
    k
    2 replies · 2 participants
  • d

    Darshan

    07/02/2021, 4:49 PM
    Hello, If I am not using any backend but just a core library, is there a way to get a unique Id generated in flow context if I run a flow using "prefect run --path" command ?
    m
    18 replies · 2 participants
  • h

    Huw Ringer

    07/02/2021, 4:53 PM
    Hi, new to Prefect and Python so apologies if what I’m about to describe is a pathetically trivial/obvious problem. Here’s my environment: 1. Prefect Cloud for orchestration. 2. Azure Kubernetes Prefect Agent (note I don’t have an executor defined in Azure as I’m not sure how to do this yet, or whether I even need to, but it’s important that the entire Flow runs in Azure - ideally on the existing Agent if possible). Have imported KubernetesRun from
    prefect.run_configs
    in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also. 3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters) 4. Private API I need to call as part of a Prefect Task 5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script) 6. Have run the
    prefect backend cloud
    CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally Here’s the journey I’ve been on: 1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub 2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set
    prefect.config.cloud.send_flow_run_logs = True
    somewhere/how, or something else? 3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome! 4. Also tried importing the requests Python module to call the API. 5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please? Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance! * UPDATE * Have posted the script to the below thread as requested by @Kevin Kho Huw
    k
    m
    35 replies · 3 participants
Powered by Linen
Title
h

Huw Ringer

07/02/2021, 4:53 PM
Hi, new to Prefect and Python so apologies if what I’m about to describe is a pathetically trivial/obvious problem. Here’s my environment: 1. Prefect Cloud for orchestration. 2. Azure Kubernetes Prefect Agent (note I don’t have an executor defined in Azure as I’m not sure how to do this yet, or whether I even need to, but it’s important that the entire Flow runs in Azure - ideally on the existing Agent if possible). Have imported KubernetesRun from
prefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also. 3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters) 4. Private API I need to call as part of a Prefect Task 5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script) 6. Have run the
prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally Here’s the journey I’ve been on: 1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub 2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set
prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else? 3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome! 4. Also tried importing the requests Python module to call the API. 5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please? Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance! * UPDATE * Have posted the script to the below thread as requested by @Kevin Kho Huw
k

Kevin Kho

07/02/2021, 5:05 PM
Hi @Huw Ringer, could you move the code block into this thread so that we don’t crowd the main channel? I’ll look at this.
h

Huw Ringer

07/02/2021, 5:06 PM
Thanks Kevin
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run prefect backend cloud CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
cur = con.cursor()
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("customer")
print(<http://qry_str.as|qry_str.as>_string(con))
cur.execute(qry_str)
rs = cur.fetchone()
rs
with Flow("api-postgres-test") as flow:
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
hello_task
api_test
sql_test
flow.register(project_name="Project")
k

Kevin Kho

07/02/2021, 5:08 PM
You’re saying to don’t see the Flow in the Prefect Cloud UI? Are you looking at the right project? Do you register with
python file.py
? You flow looks good. Maybe you want to close the cursor though?
Do you see a message saying it was successful?
h

Huw Ringer

07/02/2021, 5:10 PM
Can’t see it there, sorry. I registered it using the flow.register(project_name=“Project”) call at the end of the script above
k

Kevin Kho

07/02/2021, 5:11 PM
But this is the Smythson project right? As seen in the top right? Do you have a project named Project?
flow.register(project_name="Project")
will register this flow in the Project project.
h

Huw Ringer

07/02/2021, 5:15 PM
I changed the name of the Project in the script I posted, for obfuscation reasons
However, I think I can see what the issue is. Every time I hit run in PyCharm, I was actually re-registering the hello world script over and over again
This is because I did a ‘Save As’ for the revised code to my local GitHub repo directory and didn’t update the PyCharm Project settings
Doh!
So basically, what you’re saying is that if I actually run the right script in Python it should all work?
k

Kevin Kho

07/02/2021, 5:19 PM
Yeah this script should register correctly. Nothing is wrong with your code. Except maybe close your cursor? You should see something like when you register.
h

Huw Ringer

07/02/2021, 5:20 PM
Good catch about closing the cursor! Will do! Thanks Kevin
k

Kevin Kho

07/02/2021, 5:20 PM
Wrong image posted earlier. Here is the right one
I registered with
flow.register("coiled-prefect")
and this was the output when I ran. This should tell you it registered successfully
h

Huw Ringer

07/02/2021, 5:21 PM
I have spent hours wading through documentation trying to figure out what I was doing wrong, and all along it was a stupid IDE issue. Thanks for putting me out of my misery!
k

Kevin Kho

07/02/2021, 5:22 PM
Ah sorry there is one thing to note here. The place of execution needs
psycopg2
. You can add a container to KubernetesRun to specify an image to run the flow on. I think you may need an image with
psycopg2
(and prefect) to get this working.
Happy to help! 🙂
h

Huw Ringer

07/02/2021, 5:24 PM
Ah! That’s what I was afraid you were going to say
How do I go about doing that please? I take it the Prefect Agent alone can’t do it?
If you can please just point me in the right direction on how to set that up, I will take it from there
k

Kevin Kho

07/02/2021, 5:25 PM
Let me look if there’s an easier way first one sec
Going back
with Flow("api-postgres-test") as flow:
   hello_task()
   api_test()
   sql_test()
flow.storage = GitHub(
      repo="MyUser/MyRepo",
      path="Project/flows/api-postgres-test.py",
      access_token_secret="GITHUB_ACCESS_TOKEN"
   )
flow.run_config = KubernetesRun(labels=["prod"])
You need to call your tasks in your flow like this. Then set the storage and run config outside. You can also use this to install the EXTRA_PIP_PACKAGES. `
flow.run_config = KubernetesRun(env={"EXTRA_PIP_PACKAGES": "psycopg2"})
You won’t need to make your own image this way
h

Huw Ringer

07/02/2021, 5:49 PM
That sounds fantastic, thanks, Kevin - I’ll give it a try. Have a great weekend!
Fixed my PyCharm Project settings so was able to deploy the right code this time (🙄) and implemented Kevin’s suggestions above (thanks again Kevin!). Flow appeared to run successfully, but when I looked in the Logs there was no trace of any of the print output requested by the Tasks. Here’s the latest version of the code, for reference:
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret

# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor

# used for making API calls
import requests

# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True


@task
def hello_task():
   logger = prefect.context.get("logger")
   <http://logger.info|logger.info>("Hello world!")


@task
def api_test(log_stdout=True):
   api_call = requests.get("<https://www.askpython.com/>")
   print(api_call.status_code)


@task
def sql_test(log_stdout=True):
   dbconfig = Secret("fsdb").get()
   dbname = dbconfig['dbname']
   dbhost = dbconfig['host']
   dbuser = dbconfig['user']
   dbpassword = dbconfig['password']
   con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
              f"password={dbpassword} sslmode=require",
              cursor_factory=RealDictCursor)
   field_list = ["count(*)"]
   qry_str = sql.SQL("SELECT {} FROM {}").format(
      sql.SQL(",").join(map(sql.Identifier, field_list)),
      sql.Identifier("sap_customer"))
   print(qry_str.as_string(con))
   with con.cursor() as cur:
      cur.execute(qry_str)
      for record in cur:
         print(record)


with Flow("api-postgres-test") as flow:
   hello_task
   api_test
   sql_test


flow.storage = GitHub(
   repo="Ringerrr/SmartClient",
   path="Smythson/flows/api-postgres-test.py",
   access_token_secret="GITHUB_ACCESS_TOKEN"
   )
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="Smythson")
Whilst I’d really liked to believe it all did actually work ok, I’d also like to see evidence of it with my own eyes in the log output (does that make me a bad person?). Any ideas please where I’m going wrong above? Thanks in advance for your assistance….
Here’s a copy of the Flow Run Log, for reference:
m

Michael Adkins

07/03/2021, 12:22 AM
Change from
@task
to
@task(log_stdout=True)
You want to provide that setting to the task decorator not your own function
h

Huw Ringer

07/03/2021, 12:29 AM
Thanks very much for the suggestion Michael, but it doesn’t appear to have made any difference to the Logs output:
Am I using the prefect.config.cloud.send_flow_run_logs setting correctly? Or is there something else I’ve done wrong you think?
k

Kevin Kho

07/03/2021, 2:39 AM
Are you calling the tasks? I think you might have to do:
with Flow("api-postgres-test") as flow:
   hello_task()
   api_test()
   sql_test()
h

Huw Ringer

07/03/2021, 7:37 PM
@Kevin Kho 🙏 that did the trick. Many thanks for your invaluable assistance
View count: 3