mark doerr
10/22/2024, 6:22 PMdef deploy_prefect_process_loc(sender, instance, **kwargs):
"""
deploy prefect process
"""
s3_bucket_block = S3Bucket.load("minio-prefect-flows-lara")
print(f"******* - --- deploying prefect process {instance.name} - {prefect_workpool} ")
flow.from_source(
source=s3_bucket_block,
entrypoint="robot_workflow_sila.py:science_robotic_process"
).deploy(
name=f"minio-s3-deployment-lara-{instance.name}-{instance_id}",
work_pool_name="local-process-worker-1", #prefect_workpool,
job_variables={"env": {"EXTRA_PIP_PACKAGES": "prefect-aws"} },
tags=["lara", "sila"],
description="Deployment of a SiLA workflow.",
version="v0.0.1",
)
This programmatic deployment works fine as standalone code, but when I run it within the django process / event loop or async with celery, strange things happen - already at the stage of retrieving the S3_bucket_block:
( I added the error message in the thread)
Any idea, why this is not working and how I can achieve the deployment ? Thanks 🙏Bianca Hoch
10/22/2024, 8:41 PMBianca Hoch
10/22/2024, 8:41 PMmark doerr
10/23/2024, 11:46 AMmark doerr
10/23/2024, 11:47 AM****** - --- deploying prefect process pi1_0.0.1 - cb969ed9
INFO| Starting server on <http://127.0.0.1:8406>
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value title='Base URL' description='URL to the Docker host.' extra={'example': 'unix:///var/run/docker.sock'} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default='auto' description='The version of the API to use' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='Default timeout for API calls, in seconds.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='The maximum number of connections to save in the pool.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default=PydanticUndefined default_factory=<class 'dict'> title='Additional Configuration' description='Additional keyword arguments to pass to `docker.from_env()` or `DockerClient`.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
Warning! Failed to load collection 'prefect_docker': PydanticUserError: The `__modify_schema__` method is not supported in Pydantic v2. Use `__get_pydantic_json_schema__` instead in class `SecretStr`.
For further information visit <https://errors.pydantic.dev/2.9/u/custom-json-schema>
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value title='Base URL' description='URL to the Docker host.' extra={'example': 'unix:///var/run/docker.sock'} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default='auto' description='The version of the API to use' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='Default timeout for API calls, in seconds.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='The maximum number of connections to save in the pool.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default=PydanticUndefined default_factory=<class 'dict'> title='Additional Configuration' description='Additional keyword arguments to pass to `docker.from_env()` or `DockerClient`.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
warnings.warn(message, PydanticJsonSchemaWarning)
Traceback (most recent call last):
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/starlette/routing.py", line 732, in lifespan
async with self.lifespan_context(app) as maybe_state:
File "/usr/lib/python3.10/contextlib.py", line 199, in __aenter__
return await anext(self.gen)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 601, in lifespan
await add_block_types()
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 454, in wrapper
await fn(*args, **kwargs)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 524, in add_block_types
await run_block_auto_registration(session=session)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/models/block_registration.py", line 207, in run_block_auto_registration
await _register_registry_blocks(session)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/models/block_registration.py", line 153, in _register_registry_blocks
block_schema=block_class._to_block_schema(block_type_id=block_type_id),
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 547, in _to_block_schema
version=cls.get_block_schema_version(),
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 402, in get_block_schema_version
return cls._block_schema_version or cls._get_current_package_version()
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 388, in _get_current_package_version
top_level_module = sys.modules[
KeyError: 'prefect_docker'
Application startup failed. Exiting.
[...]
result = await coro
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
return await task
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 382, in ctx_call
result = await async_fn(*args, **kwargs)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/utilities.py", line 94, in with_injected_client
client, inferred = get_or_create_client(client)
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/utilities.py", line 66, in get_or_create_client
return get_httpx_client(), False
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/orchestration.py", line 218, in get_client
server.start()
File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 812, in start
raise RuntimeError(error_message)
RuntimeError: Timed out while attempting to connect to ephemeral Prefect API server.
mark doerr
10/23/2024, 11:49 AMMarvin
10/23/2024, 11:49 AMS3Bucket
block along with AWS credentials for deployments. Here’s how you can manage S3 block information:
1. Reading and Writing Data: You can use an S3Bucket
block to read from and write to an S3 bucket by providing previously saved AWS credentials. For example, the read
method will download a specified file as a byte stream, while the write
method uploads a byte stream to a specified key in the S3 bucket.
2. Accessing Blocks: You can load an S3Bucket
block using:
python
from prefect.blocks.core import Block
s3_bucket = Block.load("S3Bucket/my_s3_bucket_block")
print(s3_bucket.bucket_name) # This will output the bucket name associated to your S3 block.
3. Managing Blocks: You can delete a specific block using the .delete()
method or through the CLI with a given slug or ID. For example:
python
Block.delete("s3-bucket/my-data-bucket-block")
Or using CLI:
prefect block delete s3-bucket/my-data-bucket-block
You can find more information in the Prefect 3 documentation under the blocks section here:
https://docs-3.prefect.io/3.0/develop/blocks
These tools allow seamless management of S3 interactions within your Prefect deployments, enabling dynamic data handling.
If you have any more questions or need further clarification, feel free to ask!mark doerr
10/23/2024, 11:52 AMMarvin
10/23/2024, 11:52 AMBlock.load()
within threads or event loops due to the complexity involved, such as server initialization, there are simpler approaches to retrieve block information. Here are some alternatives:
1. Prefect CLI: You can use the Prefect CLI to manage and inspect block configurations. For example, you can list available block types with:
prefect block type ls
2. Direct Configuration Access: You can configure blocks directly through Python code without using the Block.load()
method, which might help simplify managing block information in more constrained environments.
3. Use Prefect UI: If you are using Prefect Cloud or a self-hosted Prefect server, you can interact with and configure blocks through the Prefect UI.
Blocks in Prefect are designed to encapsulate configuration storage, infrastructure, and secrets to streamline their usage within deployments and flow scripts. However, complex operations like server setups can be avoided by leveraging simpler CLI commands or direct configuration code where UI usage isn't practical.
For more details, you can refer to the Prefect 3 documentation here: https://docs-3.prefect.io/3.0/develop/blocks
If you have specific constraints or need further assistance, please let me know!mark doerr
10/23/2024, 11:54 AMMarvin
10/23/2024, 11:58 AMBlock.load()
in Prefect, the method returns an instance of the block class that you specified in the load call. This block instance is populated with the configuration settings, credentials, or other information you have previously stored in Prefect's block storage. The exact structure of this object depends on the type of block you're loading.
For instance, if you're dealing with an S3Bucket
block, loading it would return an instance of the S3Bucket
class pre-configured with the bucket's settings you defined earlier.
Since each block type is a Python class, it generally adheres to the structure of a typical Python object derived from a subclass of Block
, utilizing Pydantic's BaseModel
. This means you have access to all the methods and attributes defined within that block class. The block provides an interface for interacting with the external service it represents, such as reading from or writing to S3, querying databases, etc.
You can check more about block configurations and interactions in Prefect 3 documentation here: https://docs-3.prefect.io/3.0/develop/blocks
Feel free to ask if you need more detailed information about specific block types!mark doerr
10/23/2024, 12:00 PMMarvin
10/23/2024, 12:00 PMBlock.load()
spins up a server. However, the initialization of a server when calling Block.load()
might be due to the following typical reasons associated with Prefect's architecture:
1. Centralized Configuration Management: Prefect Cloud or a self-hosted Prefect server is designed to manage and store configurations centrally. When you load a block, it may require communicating with this centralized system to fetch the latest stored configuration for the block, which could involve starting a local server or connection interface.
2. Runtime Environment Setup: Block loading might involve setting up a runtime environment where the block's configuration is used. This environment can sometimes require initial web server or service setups for authentication or runtime dependencies.
3. Accessibility: Prefect often uses servers as a way to ensure consistent accessibility to configurations and secrets across distributed systems or local environments, thus ensuring the user can apply the desired configurations seamlessly.
Typically, these servers are lightweight and are meant to run briefly, only during the time it takes to retrieve and set up the configurations needed for a block.
For precise details, especially related to your particular configuration or queries about avoiding server startup with Block.load()
, it might be best to consult directly with the Prefect documentation for any updates or ask in community forums where you can get insights from Prefect's development team: https://docs.prefect.io/
If there's still confusion or if this doesn't address your scenario, sharing more details or checking with Prefect's support directly could also help resolve specific issues.