Adam Roderick
03/30/2022, 4:38 PMFuETL
03/30/2022, 5:01 PMMaria
03/30/2022, 5:40 PMfrom prefect.storage import GitLab
storage = GitLab(repo="org/repo",
path="/hello/hello_k8s.py",
ref="prefect")
And when I try to run the flow, I get 404 error file not found but the file is there. Is there a way to print out what path prefect runner is using?Shuchita Tripathi
03/30/2022, 5:45 PMChris Reuter
03/30/2022, 6:45 PMFina Silva-Santisteban
03/30/2022, 6:52 PMflow.set_dependencies(
task=SnowflakeQuery,
keyword_tasks=dict(query='''SELECT * FROM dummy_table;'''
)
)
flow.set_dependencies(
task=save_query_result_as_df,
keyword_tasks=dict(result_set=SnowflakeQuery)
)
The task save_query_result_as_df
currently only does a print()
of the result_set
. I’m confused about a few things:
• I didn’t provide the SnowflakeQuery
with any authentication. Why doesn’t it throw an error about that?
• The print statement prints out <class 'prefect.tasks.snowflake.snowflake.SnowflakeQuery'>
, which makes me think the snowflakequery task wasn’t run? If it wasn’t run it would at least explain why it didn’t throw an error 😅 but how can I make it run?Scott Aefsky
03/30/2022, 6:54 PMAnna Geller
03/30/2022, 7:12 PMIevgenii Martynenko
03/30/2022, 9:19 PMfetch_task = TestMySQLFetchOneValue(connection_name='...', query="select now(6) as time_column", name="Fetch Time")
execute_task = TestMySQLExecute(connection_name='...', name="Write Time")
with Flow("Test Flow") as flow:
get_time = fetch_task() #fetch_task().run() works
execute = execute_task(query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
flow.run()
Gaurav
03/30/2022, 10:48 PMjob_template={
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"metadata": {
"labels": {
"execution-model": "serverless"
}
},
"spec": {
"containers": [
{
"name": "flow"
}
],
"nodeSelector": {
"execution-model": "serverless"
},
"tolerations": [
{
"key": "<http://virtual-kubelet.io/provider|virtual-kubelet.io/provider>",
"operator": "Exists"
}
]
}
}
}
However the flow fails. When i ran kubectl get events. I notice the following output:
Warning ProviderCreateFailed pod/prefect-job-XXXXX-XXXXX ACI does not support providing args without specifying the command. Please supply both command and args to the pod spec.
Just some more information - I also ran the same flow successfully on a alternate deployment on AWS EKS Fargate, using an AWS Kubernetes Agent.
Any guidance is really appreciated :)Caleb Ejakait
03/31/2022, 7:47 AMMichael Smith
03/31/2022, 9:52 AMHenning Holgersen
03/31/2022, 11:25 AMThomas Mignon
03/31/2022, 11:38 AMNoam Gal
03/31/2022, 12:44 PMShuchita Tripathi
03/31/2022, 1:58 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002A9EA91E100>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
My prefect server is not at localhost.
if I am doing client.register(flow, project_name), the error says ->
raise ValueError("This flow has no storage to build")
ValueError: This flow has no storage to build
What is the best way to run POST calls which can create a new flow?Prasanth Kothuri
03/31/2022, 2:19 PMAtul Anand
03/31/2022, 2:48 PMAtul Anand
03/31/2022, 2:48 PMAtul Anand
03/31/2022, 2:50 PMJoshua Weber
03/31/2022, 4:39 PMJoshua Weber
03/31/2022, 4:40 PMJoshua Weber
03/31/2022, 4:40 PMBruno Nunes
03/31/2022, 4:48 PMprefect orion kubernetes-manifest | kubectl apply -f -
I got an error pulling the container because it didn't find docker.io/prefecthq/prefect:dev-python3.8. I've edited the deployment manifest to use docker.io/prefecthq/prefect:latest-python3.8 instead but I'm guessing that this is not the correct container either since I'm now getting the following error in the pod logs:
Usage: prefect [OPTIONS] COMMAND [ARGS]...
Try 'prefect -h' for help.
Error: No such command 'orion'.
Jose Daniel Posada Montoya
03/31/2022, 5:43 PMPatrick Tan
03/31/2022, 7:02 PMMilton
03/31/2022, 7:08 PMHoratiu Bota
03/31/2022, 7:20 PMJoshua Weber
03/31/2022, 7:25 PMIevgenii Martynenko
03/31/2022, 7:30 PMwith Flow("Test ETL Flow") as flow:
start_task_result = start_task()
truncate_task_result = truncate_task()
fetch_task_result = fetch_task()
update_template_result = update_template_task(variables=fetch_task_result)
merge_task_result = merge_task()
complete_task_result = complete_task()
truncate_task.set_upstream(start_task_result)
fetch_task.set_upstream(truncate_task_result)
merge_task.set_upstream(update_template_result)
complete_task.set_upstream(merge_task_result)
flow.visualize()
Ievgenii Martynenko
03/31/2022, 7:30 PMwith Flow("Test ETL Flow") as flow:
start_task_result = start_task()
truncate_task_result = truncate_task()
fetch_task_result = fetch_task()
update_template_result = update_template_task(variables=fetch_task_result)
merge_task_result = merge_task()
complete_task_result = complete_task()
truncate_task.set_upstream(start_task_result)
fetch_task.set_upstream(truncate_task_result)
merge_task.set_upstream(update_template_result)
complete_task.set_upstream(merge_task_result)
flow.visualize()
Kevin Kho
03/31/2022, 7:34 PMtruncate_task.set_upstream(start_task_result)
Instead you want:
truncate_task_result.set_upstream(start_task_result)
so that the upstream is set on the instance.truncate_task.set_upstream(start_task_result)
refers to the task while truncate_task_result
is the instance of the taskIevgenii Martynenko
03/31/2022, 7:37 PMKevin Kho
03/31/2022, 7:38 PM