Alejandro
10/21/2022, 1:23 PMprefect deployment build
CLI. As for the block options, I have defined the following custom LocalFileSystem
block (shown in the image below). However, when building the manifest file with the command prefect deployment build src/flows/example.py:healthcheck --name test --storage-block local-file-system/mount
, I get the following error:
Found flow 'healthcheck'
Traceback (most recent call last):
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/cli/deployment.py", line 853, in build
deployment = await Deployment.build_from_flow(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 720, in build_from_flow
await deployment.upload_to_storage()
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/site-packages/prefect/filesystems.py", line 187, in put_directory
shutil.copytree(
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 568, in copytree
return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/shutil.py", line 467, in _copytree
os.makedirs(dst, exist_ok=dirs_exist_ok)
File "/home/user/Miniconda3/envs/prefect/lib/python3.9/os.py", line 225, in makedirs
mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
An exception occurred.
Any idea why this is happening?
I'm currently using python=3.9.13
and prefect=2.6.4
You can find the healthcheck
flow code in the following link:
https://discourse.prefect.io/t/how-to-deploy-prefect-2-0-flows-to-run-as-a-local-process-docker-container-or-a-kubernetes-job/1246Brandon T. Kowalski
10/21/2022, 1:42 PMBrandon T. Kowalski
10/21/2022, 2:15 PMAlejandro
10/21/2022, 3:10 PMKrishnan Chandra
10/21/2022, 3:39 PMSeth Goodman
10/21/2022, 3:50 PMAmy McClurg
10/21/2022, 4:02 PMTrevor Kramer
10/21/2022, 6:07 PMfrom prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness
def test_flow():
@task
def _add(x: int, y: int) -> int:
get_run_logger().info(f"Adding {x} + {y} in task")
return x + y
@flow(persist_result=True, result_storage="s3/mldc-result-storage", result_serializer="json")
def add(x: int, y: int) -> int:
get_run_logger().info(f"Adding {x} + {y}")
result = _add(x, y)
return result
with prefect_test_harness():
assert add(5, 6) == 11
Ilya Galperin
10/21/2022, 6:22 PMflow a deployment x
triggering flow b deployment y
?Jai P
10/21/2022, 8:12 PMStephen Herron
10/21/2022, 10:09 PMJarvis Stubblefield
10/22/2022, 12:36 AMNoam Cohen
10/22/2022, 8:55 AMNoam Cohen
10/22/2022, 11:17 AM.prefectignore
when I upload to my storage block and I am using a local agent. I am pretty sure that in the past this worked, but anyway -
How can I set the environmental variables of my execution from an .env
file?Manuel Garrido Peña
10/22/2022, 3:13 PMs3fs
is installed, I cant add any kind of block in the UI, do you think the issue is related to docker?Keith
10/22/2022, 4:58 PMrunning
. Nothing is ever messaged back to the logs and it never fails
, some sort of response or indication that the pod is no longer active would be great!
2. When you run a job and it never makes it to the actual Flow, the failure comes through in the Prefect Cloud UI but the kubernetes Pod remains active. I'm assuming this is due to the finished_job_ttl
not getting applied yet, but it is set at the deployment level so not sure why it keeps the pod around on a failure like this.Mtrl_Scientist
10/22/2022, 7:59 PMFlorian Kühnlenz
10/23/2022, 8:27 AMJens Freund
10/23/2022, 8:01 PMC:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
├───.idea
│ some files
│
├───prefect_flows
│ prefect_flows.py
│
└───prefect_tasks
prefect_tasks.py
In addition, I created a remote storage block via the UI named sftp-server
and the base path <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
.
After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder prefect_flows
with the file prefect_flows.py
is uploaded to the api_flow
directory of the SFTP server, not the folder prefect_tasks
. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!iKeepo w
10/24/2022, 3:49 AMprefect deployment build
?Stephen Herron
10/24/2022, 3:56 AMdbt_projects/*
!dbt_projects/.gitkeep
Tim Enders
10/24/2022, 2:08 PMisinstance(blah, Failed)
but Failed isn't a type. Here is a code snippet:
if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
return Completed(message="Load Finished!")
elif isinstance(bq_result, Failed):
slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
slack_webhook_block.notify("Hello from Prefect 2.0!")
return bq_result
else:
iñigo
10/24/2022, 4:03 PMDavid Elliott
10/24/2022, 4:08 PMAlejandro
10/24/2022, 4:49 PMBlock
inside the class in which another Block
is defined using the Python API?
class Foo(Block):
target_name: str
_block_type_slug = "foo-block"
def block_initialization(self) -> None:
bar_block = JSON.load("bar-block").value
another_block = JSON.load(target_name).value
I have tried with the above code but I get the following warning, which results in a coroutine being returned instead of a block:
RuntimeWarning: coroutine 'Block.load' was never awaited
redsquare
10/24/2022, 5:22 PMAgent started! Looking for work from queue(s): default-work-queue...Mon, Oct 24 2022 6:17:45 pm17:17:45.536 | ERROR | prefect.agent - Failed to create work queue 'default-work-queue'.Mon, Oct 24 2022 6:17:45 pm
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/9396b5ff-23c0-42b4-91d9-5892450xxxx/workspaces/de803906-c6f7-45%0A77-a3aa-d300b56fxxx/work_queues/>'
Tim Enders
10/24/2022, 6:52 PMif __name__ == "__main__":
try:
flow_result = main()
except Exception as e:
slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
slack_webhook_block.notify(f"{e.__class__}: {str(e)}")
Tim Enders
10/24/2022, 9:13 PMlink89
10/25/2022, 1:19 AMrm -rf ~/.prefect/storage/*
to clear cache so that the task would run again next time.
But I find it won't work this time and I have to clean the whole ~/.prefect
directory instead of just storage to make it work.
Is this a bug or I just miss something? Does prefect provide some command line interface to clear cache?Saurabh Indoria
10/25/2022, 3:04 AM[20 Oct 2022 12:22pm]: Error during execution of task: SSLError(MaxRetryError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1131)')))"))