Jeff Baatz
06/30/2021, 8:34 PM--gpus
flag. But setting that flag doesn't seem to be supported. Has anyone run into and implemented anything similar?Dan Zhao
06/30/2021, 9:03 PMDarshan
06/30/2021, 10:06 PMBen Muller
07/01/2021, 5:47 AMtask
with arguments?
I want to test the functions logic but unfortunately when pytest
imports the function it is calling the decorators logic for the result=
argument, which calls an external dependancy.
I have tried to mock the task decorator but I cant seem to crack it.Fabrice Toussaint
07/01/2021, 8:07 AMLaura Vaida
07/01/2021, 8:39 AM/opt/prefect/healthcheck.py:151: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'snowflake.sqlalchemy'
Does anybody have experience with that? I tried the following:
flow.storage = Docker(registry_url="xxx", image_name="xxx",
python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "oauth2client",
"snowflake-connector-python", "pyarrow", "fastparquet"],
secrets=["GCP_CREDENTIALS"], files={"C:/Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages/sqlalchemy": "/modules/sqlachemy.py"})
Piotr Karnasiewicz
07/01/2021, 10:02 AMKeyError: 'Task slug []-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
And this error always occurs only for the flow which was registered as first. Any help?Diogo Martins
07/01/2021, 10:03 AMSumit Kumar Rai
07/01/2021, 12:09 PMMatthias Roels
07/01/2021, 2:05 PMBrad I
07/01/2021, 3:24 PMsubscription {
flow_run_state_by_pk(id: "848a9771-4962-405b-86f1-2d4561ffffff") {
state
}
}
result:
{
"errors": [
{
"path": [
"flow_run_state_by_pk"
],
"message": "Cannot read property 'flow_run_state_by_pk' of undefined",
"extensions": {
"code": "INTERNAL_SERVER_ERROR"
}
}
],
"data": {
"flow_run_state_by_pk": null
}
}
thebuleon29
07/01/2021, 3:47 PMimport prefect
from prefect import Flow, task
from prefect.tasks.kubernetes.job import RunNamespacedJob
start_body = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "start"
},
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "start",
"image": "alpine",
"command": [
"echo",
"start"
]
}
],
"restartPolicy": "Never"
}
},
"backoffLimit": 4
}
}
@task
def print_kube_res(x):
print(str(x))
with Flow("Kubernetes Job") as flow:
start = RunNamespacedJob(body=start_body, kubernetes_api_key_secret=None)
p = print_kube_res(start)
start.set_downstream(p)
But here the print task prints 'None'. How do i get the result from a Kubernetes job ?Leon Kozlowski
07/01/2021, 3:50 PMregistry_url
?Ben Collier
07/01/2021, 3:59 PMLaura Vaida
07/01/2021, 4:09 PMok, i tried with the following:
@task(log_stdout=True)
def create_engine(snowflake_salesforce):
config = configparser.ConfigParser()
engine=create_engine(URL(**config[snowflake_salesforce]))
with Flow('UWG-Mail') as flow:
snowflake_credentials=PrefectSecret("Snowflake_Salesforce")
connection=create_engine(snowflake_salesforce=snowflake_credentials)
error:
Unexpected error: TypeError("unhashable type: 'dict'")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "<input>", line 36, in create_engine
File "/usr/local/lib/python3.8/configparser.py", line 959, in __getitem__
if key != self.default_section and not self.has_section(key):
File "/usr/local/lib/python3.8/configparser.py", line 668, in has_section
return section in self._sections
TypeError: unhashable type: 'dict'
Nicholas Chammas
07/01/2021, 5:50 PMParameter
being fed into a DatabricksRunNow
Task
as a notebook_param
. I can see the edge between the parameter and the task in the UI fine, but for whatever reason the Databricks notebook is not receiving the parameter value when I run the flow.
What should I try to troubleshoot the reason why the notebook task is not receiving parameter values?Gabriel Montañola
07/01/2021, 6:48 PMGithub Storage
+ KubernetesRun
and I'm not sure of how to handle this in an elegant way.
Should I structure my project as a python package and pip install -e .
inside a Dockerfile?
I want to use tasks/functions defined at /tasks
on my flows to avoid repetition.
my-happy-project/
├── flows/
│ ├── flow_1.py
│ └── flow_2.py
│
└── tasks/
├── shared_stuff_1.py
└── shared_stuff_2.py
Ben Muller
07/01/2021, 7:34 PMDave Nielsen
07/01/2021, 10:56 PMFelipe Saldana
07/01/2021, 11:01 PMAmanda Wee
07/02/2021, 12:12 AMget_etl_flow
function that takes a callback function that sets up the ETL-specific tasks, and thus defines and returns a flow by setting up these common tasks and calling the callback function. Then I have a register_flow
function and a run_flow
function that calls get_etl_flow
and registers or runs the flow returned, respectively. Each group of related ETLs are grouped into a prefect project in a single Python script, and register_flow
is called for each ETL in the main function. If I want to run a particular flow manually, I have to change that particular flow's register_flow
call to run_flow
and comment out the other register_flow
calls, which is cumbersome.
How can I make use of the new CLI for running flows such that my infrastructure can still register flows and run them via an agent, while I can for debugging etc run individual flows using agentless execution, without having to comment out code, despite there being multiple flows defined in a single file?davzucky
07/02/2021, 7:18 AMIgor Kaluđer
07/02/2021, 8:58 AMDK
07/02/2021, 4:21 PMwith case(results, None):
return # End the Flow
The examples with the Case Control Flow class always show an alternative task to run:
with case(cond, "a"):
run_if_cond_is_a()
with case(cond, "b"):
run_if_cond_is_b()
But I don't always have another task, sometimes I just need the flow to end.
I could check the inverse of 'results' to see if there is data instead of for None, and I could then run the subsequent tasks, but then I would need another task to check that condition. That's fine if that's how it needs to be done, but I just wanted to check if there was a simpler solution that I'm missing.Darshan
07/02/2021, 4:49 PMHuw Ringer
07/02/2021, 4:53 PMprefect.run_configs
in my Flow, and can see the Agent from Prefect Cloud, and was able to run ‘hello world’ successfully via it also.
3. Azure PostgreSQL database I need to run SQL against as part of a Prefect Task (have used a Secret in Prefect Cloud to create a dictionary with all the database login parameters)
4. Private API I need to call as part of a Prefect Task
5. GitHub storage for the whole Flow script (created a GitHub Access Token Secret to enable PrefectCloud to access the registered script)
6. Have run the prefect backend cloud
CLI command on my local Mac, to hopefully force everything to execute in Azure rather than locally
Here’s the journey I’ve been on:
1. I got the basic “hello world” flow executing on a local agent (yay!), with the Flow registered in the Prefect Cloud, and the code being pulled from GitHub
2. I then got it working against the Kubernetes agent, but for some reason the ‘hello world’ message didn’t appear in the logs. Is that because I need to set prefect.config.cloud.send_flow_run_logs = True
somewhere/how, or something else?
3. Tried importing psycopg2 and creating a connection to the PostgreSQL database to retrieve a very simple count result. Am not sure if I need to be using the PostgresExecute API call (which itself uses Psycopg2) rather than importing psycopg2 into my Flow. Thoughts/recommendations welcome!
4. Also tried importing the requests Python module to call the API.
5. When I try running the script to register the Flow it appears to work (finished with exit code 0), but when I look in my Prefect Cloud Flows tab it’s not there. Any idea why, please?
Sorry to bother you all about this, but am kind of at a loss on how to move forwards with this if I can’t even see the Flow I want to execute. Suspect it may be something to do with importing those libraries/modules and them not being available in the Execution environment, but have no idea from what I’ve read so far what I need to do to get that working. Any advice (even RTFM, if you can point me to the right topic) would be gratefully received. Thanks in advance!
* UPDATE * Have posted the script to the below thread as requested by @Kevin Kho
HuwAlex Furrier
07/02/2021, 10:03 PMprefect.engine.serializers.PandasSerializer
and passing that to the task as a local result. Like so:
@task(result=LocalResult(serializer=PandasSerializer(
'csv', serialize_kwargs={'index': False})))
def my_df_task(df: DataFrame) -> DataFrame:
However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list.
What's the most sensible way around this?
My hacky workaround is to create a serializer that uses PandasSerializer
methods for deserializing the Dataframe and PickleSerializer
methods for serializing the list.
I feel like there has to be a smarter way to do so.Ben Muller
07/02/2021, 10:20 PM15.0
in the API keys section.
I currently authenticate with my agents with prefect auth login -t "foo"
but in the UI I definitely have used values that are API keys
.
Does that mean I just need to update the prefect version on my agent and change the -t
argument to --key
?Ben Muller
07/03/2021, 2:35 AMgreat expectations
task that you have in built.
I am getting an exception when I don't provide the
validation_operators:
action_list_operator:
in my great_expectations.yml
, but then after providing it I receive a warning:
WARNING great_expectations.data_context.types.base:base.py:1016 You appear to be using a legacy capability with the latest config version (3.0).
Your data context with this configuration version uses validation_operators, which are being deprecated. Please update your configuration to be compatible with the version number 3.
Is this something to do with the version of ge
that prefect are using in the task?Omar Sultan
07/03/2021, 3:39 AM