Tim Helfensdörfer
05/30/2022, 10:31 AMJoshua Greenhalgh
05/30/2022, 11:45 AMRei Mendel
05/30/2022, 12:33 PMMateo Merlo
05/30/2022, 2:24 PM{
"ACCESS_KEY": "abcdef",
"SECRET_ACCESS_KEY": "ghijklmn"
}
If I'm using pandas to read a file in S3:
df = pd.read_csv(f"s3://{s3_bucket_name}/{filename}")
Should I need to pass the credentials as a param to the function read_csv? Or are they read automatically from the Cloud?
Currently I'm getting this error:
"botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden"
Thanks!Shriram Holla
05/30/2022, 2:38 PMflow.run()
, it works just fine. However, I’m running a local backend server and when I try to register the task using flow.register()
, I get this error:
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 496, in save
rv = reduce(self.proto)
File "/Users/shriramholla/.pyenvs/python3.6/lib/python3.6/site-packages/absl/flags/_flagvalues.py", line 677, in __getstate__
raise TypeError("can't pickle FlagValues")
TypeError: can't pickle FlagValues
Note: Nothing is being passed into or being returned by any taskAndreas
05/30/2022, 3:16 PMflow.add_edge(_upstream_task_, _downstream_task_, _*key*_)
is there any way to specify which of the multiple return values from the upstream task should pass to the downstream task?Ilya Sapunov
05/30/2022, 5:23 PMFuETL
05/30/2022, 6:17 PMSlackbot
05/30/2022, 6:27 PMDung Khuc
05/30/2022, 7:18 PMShriram Holla
05/30/2022, 7:18 PMKayvan Shah
05/30/2022, 8:16 PMDanyl Hrynko
05/30/2022, 9:25 PMprefect.context.get("logger")
as logger. Could anyone guess how to fix this? ThanksThomas Agung Santoso
05/31/2022, 2:31 AMMuddassir Shaikh
05/31/2022, 5:35 AMCamilo Fernandez
05/31/2022, 7:59 AMTim Helfensdörfer
05/31/2022, 8:55 AMHoratiu Bota
05/31/2022, 10:20 AMDileep Damodaran
05/31/2022, 10:49 AMMartin T
05/31/2022, 12:10 PMflow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
, that I execute with flow_run.map(parameters=parameters, run_name=run_name)
. (Here parameters
and run_name
are generated by other map()
tasks, and of same length.)
When I run the wrapper task locally, targetting the sub-task in Prefect cloud, I get:
└── 12:48:22 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:22 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:23 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
└── 12:48:23 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:23 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:24 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
Note the same GUIDs!!! When we visit the cloud UI, only the first mapped task is created!
If the wrapper task is also running in cloud, it works as expected.
I'm not sure how to create a minimal reproducible example for this, since StartFlowRun
can't trigger a local flow (I don't use prefect server, only prefect cloud).Nikolaus Landgraf
05/31/2022, 12:38 PMVackar Afzal
05/31/2022, 1:16 PMstorage.files.update(THE_FILE)
to bring in the dependency
What’s the recommended approach to achieve something similar in prefect 2Sander
05/31/2022, 1:20 PMSylvain Hazard
05/31/2022, 1:26 PMSander
05/31/2022, 1:51 PMJohn Mizerany
05/31/2022, 2:12 PMLucas Hosoya
05/31/2022, 2:15 PMRuntimeError: Missing dependency kubectl. Please install kubectl following the instructions for your OS.
Does anyone know about this error with K8s tolerations?Lucas Rodrigues
05/31/2022, 2:37 PMfrom prefect.client import Client
flow_id = "..."
client = Client()
client.graphql(
"""
mutation {
delete_flow(input: {flow_id: "%s"}) {
success
}
}
""" % flow_id
)
But how can I delete all versions of a given flow? It looks like the Flow Group ID is constant over the flow versions. I tried to replaced it into above snippet but didn't work.Marcin Grzybowski
05/31/2022, 2:51 PMPatrick Tan
05/31/2022, 3:04 PM