João Amorim
06/30/2021, 2:19 PMPrathamesh
06/30/2021, 3:46 PMKevin Kho
06/30/2021, 4:02 PMYanina Libenson
06/30/2021, 6:12 PMJeff Baatz
06/30/2021, 8:34 PM--gpus
flag. But setting that flag doesn't seem to be supported. Has anyone run into and implemented anything similar?Dan Zhao
06/30/2021, 9:03 PMDarshan
06/30/2021, 10:06 PMBen Muller
07/01/2021, 5:47 AMtask
with arguments?
I want to test the functions logic but unfortunately when pytest
imports the function it is calling the decorators logic for the result=
argument, which calls an external dependancy.
I have tried to mock the task decorator but I cant seem to crack it.Fabrice Toussaint
07/01/2021, 8:07 AMLaura Vaida
07/01/2021, 8:39 AM/opt/prefect/healthcheck.py:151: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'snowflake.sqlalchemy'
Does anybody have experience with that? I tried the following:
flow.storage = Docker(registry_url="xxx", image_name="xxx",
python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "oauth2client",
"snowflake-connector-python", "pyarrow", "fastparquet"],
secrets=["GCP_CREDENTIALS"], files={"C:/Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages/sqlalchemy": "/modules/sqlachemy.py"})
Piotr Karnasiewicz
07/01/2021, 10:02 AMKeyError: 'Task slug []-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
And this error always occurs only for the flow which was registered as first. Any help?Diogo Martins
07/01/2021, 10:03 AMSumit Kumar Rai
07/01/2021, 12:09 PMMatthias Roels
07/01/2021, 2:05 PMBrad I
07/01/2021, 3:24 PMsubscription {
flow_run_state_by_pk(id: "848a9771-4962-405b-86f1-2d4561ffffff") {
state
}
}
result:
{
"errors": [
{
"path": [
"flow_run_state_by_pk"
],
"message": "Cannot read property 'flow_run_state_by_pk' of undefined",
"extensions": {
"code": "INTERNAL_SERVER_ERROR"
}
}
],
"data": {
"flow_run_state_by_pk": null
}
}
thebuleon29
07/01/2021, 3:47 PMimport prefect
from prefect import Flow, task
from prefect.tasks.kubernetes.job import RunNamespacedJob
start_body = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "start"
},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "start",
"image": "alpine",
"command": [
"echo",
"start"
]
}
],
"restartPolicy": "Never"
}
},
"backoffLimit": 4
}
}
@task
def print_kube_res(x):
print(str(x))
with Flow("Kubernetes Job") as flow:
start = RunNamespacedJob(body=start_body, kubernetes_api_key_secret=None)
p = print_kube_res(start)
start.set_downstream(p)
But here the print task prints 'None'. How do i get the result from a Kubernetes job ?Leon Kozlowski
07/01/2021, 3:50 PMregistry_url
?Ben Collier
07/01/2021, 3:59 PMLaura Vaida
07/01/2021, 4:09 PMok, i tried with the following:
@task(log_stdout=True)
def create_engine(snowflake_salesforce):
config = configparser.ConfigParser()
engine=create_engine(URL(**config[snowflake_salesforce]))
with Flow('UWG-Mail') as flow:
snowflake_credentials=PrefectSecret("Snowflake_Salesforce")
connection=create_engine(snowflake_salesforce=snowflake_credentials)
error:
Unexpected error: TypeError("unhashable type: 'dict'")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "<input>", line 36, in create_engine
File "/usr/local/lib/python3.8/configparser.py", line 959, in __getitem__
if key != self.default_section and not self.has_section(key):
File "/usr/local/lib/python3.8/configparser.py", line 668, in has_section
return section in self._sections
TypeError: unhashable type: 'dict'
Nicholas Chammas
07/01/2021, 5:50 PMParameter
being fed into a DatabricksRunNow
Task
as a notebook_param
. I can see the edge between the parameter and the task in the UI fine, but for whatever reason the Databricks notebook is not receiving the parameter value when I run the flow.
What should I try to troubleshoot the reason why the notebook task is not receiving parameter values?Gabriel Montañola
07/01/2021, 6:48 PMGithub Storage
+ KubernetesRun
and I'm not sure of how to handle this in an elegant way.
Should I structure my project as a python package and pip install -e .
inside a Dockerfile?
I want to use tasks/functions defined at /tasks
on my flows to avoid repetition.
my-happy-project/
├── flows/
│ ├── flow_1.py
│ └── flow_2.py
│
└── tasks/
├── shared_stuff_1.py
└── shared_stuff_2.py
Ben Muller
07/01/2021, 7:34 PMDave Nielsen
07/01/2021, 10:56 PMFelipe Saldana
07/01/2021, 11:01 PMAmanda Wee
07/02/2021, 12:12 AMget_etl_flow
function that takes a callback function that sets up the ETL-specific tasks, and thus defines and returns a flow by setting up these common tasks and calling the callback function. Then I have a register_flow
function and a run_flow
function that calls get_etl_flow
and registers or runs the flow returned, respectively. Each group of related ETLs are grouped into a prefect project in a single Python script, and register_flow
is called for each ETL in the main function. If I want to run a particular flow manually, I have to change that particular flow's register_flow
call to run_flow
and comment out the other register_flow
calls, which is cumbersome.
How can I make use of the new CLI for running flows such that my infrastructure can still register flows and run them via an agent, while I can for debugging etc run individual flows using agentless execution, without having to comment out code, despite there being multiple flows defined in a single file?davzucky
07/02/2021, 7:18 AMIgor Kaluđer
07/02/2021, 8:58 AMDK
07/02/2021, 4:21 PMwith case(results, None):
return # End the Flow
The examples with the Case Control Flow class always show an alternative task to run:
with case(cond, "a"):
run_if_cond_is_a()
with case(cond, "b"):
run_if_cond_is_b()
But I don't always have another task, sometimes I just need the flow to end.
I could check the inverse of 'results' to see if there is data instead of for None, and I could then run the subsequent tasks, but then I would need another task to check that condition. That's fine if that's how it needs to be done, but I just wanted to check if there was a simpler solution that I'm missing.Darshan
07/02/2021, 4:49 PMHuw Ringer
07/02/2021, 4:53 PMprefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also.
3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters)
4. Private API I need to call as part of a Prefect Task
5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script)
6. Have run the prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally
Here’s the journey I’ve been on:
1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub
2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else?
3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome!
4. Also tried importing the requests Python module to call the API.
5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please?
Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance!
* UPDATE * Have posted the script to the below thread as requested by @Kevin Kho
HuwHuw Ringer
07/02/2021, 4:53 PMprefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also.
3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters)
4. Private API I need to call as part of a Prefect Task
5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script)
6. Have run the prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally
Here’s the journey I’ve been on:
1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub
2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else?
3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome!
4. Also tried importing the requests Python module to call the API.
5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please?
Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance!
* UPDATE * Have posted the script to the below thread as requested by @Kevin Kho
HuwKevin Kho
07/02/2021, 5:05 PMHuw Ringer
07/02/2021, 5:06 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run prefect backend cloud CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
cur = con.cursor()
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("customer")
print(<http://qry_str.as|qry_str.as>_string(con))
cur.execute(qry_str)
rs = cur.fetchone()
rs
with Flow("api-postgres-test") as flow:
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
hello_task
api_test
sql_test
flow.register(project_name="Project")
Kevin Kho
07/02/2021, 5:08 PMpython file.py
? You flow looks good. Maybe you want to close the cursor though?Huw Ringer
07/02/2021, 5:10 PMKevin Kho
07/02/2021, 5:11 PMflow.register(project_name="Project")
will register this flow in the Project project.Huw Ringer
07/02/2021, 5:15 PMKevin Kho
07/02/2021, 5:19 PMHuw Ringer
07/02/2021, 5:20 PMKevin Kho
07/02/2021, 5:20 PMflow.register("coiled-prefect")
and this was the output when I ran. This should tell you it registered successfullyHuw Ringer
07/02/2021, 5:21 PMKevin Kho
07/02/2021, 5:22 PMpsycopg2
. You can add a container to KubernetesRun to specify an image to run the flow on. I think you may need an image with psycopg2
(and prefect) to get this working.Huw Ringer
07/02/2021, 5:24 PMKevin Kho
07/02/2021, 5:25 PMwith Flow("api-postgres-test") as flow:
hello_task()
api_test()
sql_test()
flow.storage = GitHub(
repo="MyUser/MyRepo",
path="Project/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"])
flow.run_config = KubernetesRun(env={"EXTRA_PIP_PACKAGES": "psycopg2"})
You won’t need to make your own image this wayHuw Ringer
07/02/2021, 5:49 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
from prefect.client import Secret
# used for running PostgreSQL commands
import psycopg2
from psycopg2 import connect, sql
import psycopg2.extras
from psycopg2.extras import RealDictCursor
# used for making API calls
import requests
# have already run `prefect backend cloud` CLI command on localhost to set orchestration backend for Prefect Cloud
# set flow config to log to cloud server
prefect.config.cloud.send_flow_run_logs = True
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello world!")
@task
def api_test(log_stdout=True):
api_call = requests.get("<https://www.askpython.com/>")
print(api_call.status_code)
@task
def sql_test(log_stdout=True):
dbconfig = Secret("fsdb").get()
dbname = dbconfig['dbname']
dbhost = dbconfig['host']
dbuser = dbconfig['user']
dbpassword = dbconfig['password']
con = connect(f"dbname={dbname} host={dbhost} user={dbuser}"
f"password={dbpassword} sslmode=require",
cursor_factory=RealDictCursor)
field_list = ["count(*)"]
qry_str = sql.SQL("SELECT {} FROM {}").format(
sql.SQL(",").join(map(sql.Identifier, field_list)),
sql.Identifier("sap_customer"))
print(qry_str.as_string(con))
with con.cursor() as cur:
cur.execute(qry_str)
for record in cur:
print(record)
with Flow("api-postgres-test") as flow:
hello_task
api_test
sql_test
flow.storage = GitHub(
repo="Ringerrr/SmartClient",
path="Smythson/flows/api-postgres-test.py",
access_token_secret="GITHUB_ACCESS_TOKEN"
)
flow.run_config = KubernetesRun(labels=["prod"],env={"EXTRA_PIP_PACKAGES": "psycopg2-binary requests"})
flow.register(project_name="Smythson")
Whilst I’d really liked to believe it all did actually work ok, I’d also like to see evidence of it with my own eyes in the log output (does that make me a bad person?).
Any ideas please where I’m going wrong above? Thanks in advance for your assistance….Michael Adkins
07/03/2021, 12:22 AM@task
to @task(log_stdout=True)
Huw Ringer
07/03/2021, 12:29 AMKevin Kho
07/03/2021, 2:39 AMwith Flow("api-postgres-test") as flow:
hello_task()
api_test()
sql_test()
Huw Ringer
07/03/2021, 7:37 PM