Kalise Richmond
10/20/2022, 4:47 PMAlix Cook
10/20/2022, 4:52 PMStefan Rasmussen
10/20/2022, 5:00 PMJohn Ramey
10/20/2022, 5:05 PMget_run_logger
be called in each task/flow? i have a flow with ~20 tasks. and after upgrading to Prefect 2, I have 20 different calls to get_run_logger
ā is there a better way to adhere to DRY?Iuliia Volkova
10/20/2022, 7:41 PMBen Muller
10/20/2022, 10:30 PMcreate_flow_run
if that subflow then triggers more "subsubflows" if one of them fail, will my master flow know about it ? is there a way to propagate any of these signales?TomƔs Emilio Silva Ebensperger
10/21/2022, 12:14 AMresult = self.result.write(value, **formatting_kwargs)
TypeError: LocalResult.write() got multiple values for argument 'self'
Deepanshu Aggarwal
10/21/2022, 5:17 AMNic
10/21/2022, 8:37 AMredsquare
10/21/2022, 9:19 AMRobin WeiĆ
10/21/2022, 9:33 AMRunning
state and start a K8s job & pod, the other 100 Flow Runs just idle around in āPendingā. I have searched everywhere to find hints about why they wonāt start. There is definitely enough K8s computing resources for them to be scheduled, there is no concurrency limits set via tags or on the work queue directly. The K8s work queue just shows them as āPendingā in the UI. Does anyone have any idea where else to look? Cheers šAlejandro
10/21/2022, 1:23 PMprefect deployment build
CLI. As for the block options, I have defined the following custom LocalFileSystem
block (shown in the image below). However, when building the manifest file with the command prefect deployment build src/flows/example.py:healthcheck --name test --storage-block local-file-system/mount
, I get the following error:
Found flow 'healthcheck'
Traceback (most recent call last):
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/deployment.py", line 853, in build
deployment = await Deployment.build_from_flow(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 720, in build_from_flow
await deployment.upload_to_storage()
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/filesystems.py", line 187, in put_directory
shutil.copytree(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 568, in copytree
return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 467, in _copytree
os.makedirs(dst, exist_ok=dirs_exist_ok)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/os.py", line 225, in makedirs
mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
An exception occurred.
Any idea why this is happening?
I'm currently using python=3.9.13
and prefect=2.6.4
You can find the healthcheck
flow code in the following link:
https://discourse.prefect.io/t/how-to-deploy-prefect-2-0-flows-to-run-as-a-local-process-docker-container-or-a-kubernetes-job/1246Brandon T. Kowalski
10/21/2022, 1:42 PMBrandon T. Kowalski
10/21/2022, 2:15 PMAlejandro
10/21/2022, 3:10 PMKrishnan Chandra
10/21/2022, 3:39 PMSeth Goodman
10/21/2022, 3:50 PMAmy McClurg
10/21/2022, 4:02 PMTrevor Kramer
10/21/2022, 6:07 PMfrom prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness
def test_flow():
@task
def _add(x: int, y: int) -> int:
get_run_logger().info(f"Adding {x} + {y} in task")
return x + y
@flow(persist_result=True, result_storage="s3/mldc-result-storage", result_serializer="json")
def add(x: int, y: int) -> int:
get_run_logger().info(f"Adding {x} + {y}")
result = _add(x, y)
return result
with prefect_test_harness():
assert add(5, 6) == 11
Ilya Galperin
10/21/2022, 6:22 PMflow a deployment x
triggering flow b deployment y
?Jai P
10/21/2022, 8:12 PMStephen Herron
10/21/2022, 10:09 PMJarvis Stubblefield
10/22/2022, 12:36 AMNoam Cohen
10/22/2022, 8:55 AMNoam Cohen
10/22/2022, 11:17 AM.prefectignore
when I upload to my storage block and I am using a local agent. I am pretty sure that in the past this worked, but anyway -
How can I set the environmental variables of my execution from an .env
file?Manuel Garrido PeƱa
10/22/2022, 3:13 PMs3fs
is installed, I cant add any kind of block in the UI, do you think the issue is related to docker?Keith
10/22/2022, 4:58 PMrunning
. Nothing is ever messaged back to the logs and it never fails
, some sort of response or indication that the pod is no longer active would be great!
2. When you run a job and it never makes it to the actual Flow, the failure comes through in the Prefect Cloud UI but the kubernetes Pod remains active. I'm assuming this is due to the finished_job_ttl
not getting applied yet, but it is set at the deployment level so not sure why it keeps the pod around on a failure like this.Mtrl_Scientist
10/22/2022, 7:59 PMFlorian Kühnlenz
10/23/2022, 8:27 AMJens Freund
10/23/2022, 8:01 PMC:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
āāāā.idea
ā some files
ā
āāāāprefect_flows
ā prefect_flows.py
ā
āāāāprefect_tasks
prefect_tasks.py
In addition, I created a remote storage block via the UI named sftp-server
and the base path <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
.
After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder prefect_flows
with the file prefect_flows.py
is uploaded to the api_flow
directory of the SFTP server, not the folder prefect_tasks
. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!Jens Freund
10/23/2022, 8:01 PMC:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
āāāā.idea
ā some files
ā
āāāāprefect_flows
ā prefect_flows.py
ā
āāāāprefect_tasks
prefect_tasks.py
In addition, I created a remote storage block via the UI named sftp-server
and the base path <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
.
After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder prefect_flows
with the file prefect_flows.py
is uploaded to the api_flow
directory of the SFTP server, not the folder prefect_tasks
. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
Found flow 'api-flow'
Default '.prefectignore' file written to C:\Users\Testbenutzer\prefect_test_project\.prefectignore
Traceback (most recent call last):
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\base_events.py", line 646, in run_until_complete
return future.result()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
return await func(*args)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\deployment.py", line 853, in build
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 720, in build_from_flow
await deployment.upload_to_storage()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\filesystems.py", line 359, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\spec.py", line 819, in put_file
callback.relative_update(segment_len)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
Python code:
# prefect_flows.py
from prefect import flow
from prefect_tasks.prefect_tasks import call_api
@flow
def api_flow(url):
fact_json = call_api(url)
print(fact_json)
# prefect_tasks.py
import requests
from prefect import task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
Storage block definition:Q
10/24/2022, 11:27 AMpip install fsspec>=2022.10.0
As you can see in the traceback whenever a file gets written to the remote the progress is tracked via a callback that adds the value returned by AbstractFileSystem.open().write
to a counter.
It is assumed that the value represents the number of bytes written to the buffer.
However, in the case of fsspec.implementations.sftp.SFTPFileSystem
the buffer (paramiko.file.BufferedFile
) returns None
when write
is called, which leads to an exception when fsspec.spec.AbstractBufferedFile
attempts to add None
to an int counter.
The offending segment changed in fsspec==2022.10.0
(released 2022/10/19), bumping `fsspec`'s version seems to solve the problem.Jens Freund
10/24/2022, 11:57 AM