How can I programmatically create deployments of f...
# ask-community
m
How can I programmatically create deployments of flows located in a S3 bucket within a python process ? I am using django to upload flows into an S3 bucket ( django is used for much more then just storing meta data about a flow). Whenever a new flow is uploaded, a django signal should trigger the deployment (to a execute the flow on a local process worker. I do this with the following function that is called when the prefect flow was uploaded to the S3 storage (and a django model instance with more metadata has been created) :
Copy code
def deploy_prefect_process_loc(sender, instance, **kwargs):
    """
    deploy prefect process
    """

  s3_bucket_block = S3Bucket.load("minio-prefect-flows-lara")

  print(f"******* - --- deploying prefect process {instance.name} - {prefect_workpool} ")

  flow.from_source(
        source=s3_bucket_block,
        entrypoint="robot_workflow_sila.py:science_robotic_process"
   ).deploy(
        name=f"minio-s3-deployment-lara-{instance.name}-{instance_id}",
        work_pool_name="local-process-worker-1", #prefect_workpool,
        job_variables={"env": {"EXTRA_PIP_PACKAGES": "prefect-aws"} },
        tags=["lara", "sila"],
        description="Deployment of a SiLA workflow.",
        version="v0.0.1",
  )
This programmatic deployment works fine as standalone code, but when I run it within the django process / event loop or async with celery, strange things happen - already at the stage of retrieving the S3_bucket_block: ( I added the error message in the thread) Any idea, why this is not working and how I can achieve the deployment ? Thanks 🙏
b
Hi Mark, thanks for your message! Would you mind moving your stack trace into the 🧵 instead of the main body of your post? It's a pretty big wall of text 😅
please & ty 🙇
m
Sure and sorry @Bianca Hoch
Here is the Error output, I get ...
Copy code
****** - --- deploying prefect process pi1_0.0.1 - cb969ed9 
INFO| Starting server on <http://127.0.0.1:8406>
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value title='Base URL' description='URL to the Docker host.' extra={'example': 'unix:///var/run/docker.sock'} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default='auto' description='The version of the API to use' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='Default timeout for API calls, in seconds.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='The maximum number of connections to save in the pool.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default=PydanticUndefined default_factory=<class 'dict'> title='Additional Configuration' description='Additional keyword arguments to pass to `docker.from_env()` or `DockerClient`.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
Warning!  Failed to load collection 'prefect_docker': PydanticUserError: The `__modify_schema__` method is not supported in Pydantic v2. Use `__get_pydantic_json_schema__` instead in class `SecretStr`.

For further information visit <https://errors.pydantic.dev/2.9/u/custom-json-schema>
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value title='Base URL' description='URL to the Docker host.' extra={'example': 'unix:///var/run/docker.sock'} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default='auto' description='The version of the API to use' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='Default timeout for API calls, in seconds.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value description='The maximum number of connections to save in the pool.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
/home/mark/py3venv/lara10/lib/python3.10/site-packages/pydantic/json_schema.py:2191: PydanticJsonSchemaWarning: Default value default=PydanticUndefined default_factory=<class 'dict'> title='Additional Configuration' description='Additional keyword arguments to pass to `docker.from_env()` or `DockerClient`.' extra={} is not JSON serializable; excluding default from JSON schema [non-serializable-default]
  warnings.warn(message, PydanticJsonSchemaWarning)
Traceback (most recent call last):
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/starlette/routing.py", line 732, in lifespan
    async with self.lifespan_context(app) as maybe_state:
  File "/usr/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 601, in lifespan
    await add_block_types()
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 454, in wrapper
    await fn(*args, **kwargs)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 524, in add_block_types
    await run_block_auto_registration(session=session)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/models/block_registration.py", line 207, in run_block_auto_registration
    await _register_registry_blocks(session)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/models/block_registration.py", line 153, in _register_registry_blocks
    block_schema=block_class._to_block_schema(block_type_id=block_type_id),
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 547, in _to_block_schema
    version=cls.get_block_schema_version(),
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 402, in get_block_schema_version
    return cls._block_schema_version or cls._get_current_package_version()
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/blocks/core.py", line 388, in _get_current_package_version
    top_level_module = sys.modules[
KeyError: 'prefect_docker'

Application startup failed. Exiting.
[...]
    result = await coro
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
    return await task
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/utilities.py", line 94, in with_injected_client
    client, inferred = get_or_create_client(client)
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/utilities.py", line 66, in get_or_create_client
    return get_httpx_client(), False
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/client/orchestration.py", line 218, in get_client
    server.start()
  File "/home/mark/py3venv/lara10/lib/python3.10/site-packages/prefect/server/api/server.py", line 812, in start
    raise RuntimeError(error_message)
RuntimeError: Timed out while attempting to connect to ephemeral Prefect API server.
@Marvin: what options are there in Prefect 3 to get S3 block information for deployments ?
m
In Prefect 3, you can interact with S3 data using the
S3Bucket
block along with AWS credentials for deployments. Here’s how you can manage S3 block information: 1. Reading and Writing Data: You can use an
S3Bucket
block to read from and write to an S3 bucket by providing previously saved AWS credentials. For example, the
read
method will download a specified file as a byte stream, while the
write
method uploads a byte stream to a specified key in the S3 bucket. 2. Accessing Blocks: You can load an
S3Bucket
block using:
Copy code
python
   from prefect.blocks.core import Block
   s3_bucket = Block.load("S3Bucket/my_s3_bucket_block")
   print(s3_bucket.bucket_name)  # This will output the bucket name associated to your S3 block.
3. Managing Blocks: You can delete a specific block using the
.delete()
method or through the CLI with a given slug or ID. For example:
Copy code
python
   Block.delete("s3-bucket/my-data-bucket-block")
Or using CLI:
Copy code
prefect block delete s3-bucket/my-data-bucket-block
You can find more information in the Prefect 3 documentation under the blocks section here: https://docs-3.prefect.io/3.0/develop/blocks These tools allow seamless management of S3 interactions within your Prefect deployments, enabling dynamic data handling. If you have any more questions or need further clarification, feel free to ask!
m
@Marvin: Block.load() seems to not work in a thread or event loop of a larger applications, since there is a lot of complex operations involved, like spinning up a server etc. are there simpler ways to get the block info ?
m
If you're dealing with issues using
Block.load()
within threads or event loops due to the complexity involved, such as server initialization, there are simpler approaches to retrieve block information. Here are some alternatives: 1. Prefect CLI: You can use the Prefect CLI to manage and inspect block configurations. For example, you can list available block types with:
Copy code
prefect block type ls
2. Direct Configuration Access: You can configure blocks directly through Python code without using the
Block.load()
method, which might help simplify managing block information in more constrained environments. 3. Use Prefect UI: If you are using Prefect Cloud or a self-hosted Prefect server, you can interact with and configure blocks through the Prefect UI. Blocks in Prefect are designed to encapsulate configuration storage, infrastructure, and secrets to streamline their usage within deployments and flow scripts. However, complex operations like server setups can be avoided by leveraging simpler CLI commands or direct configuration code where UI usage isn't practical. For more details, you can refer to the Prefect 3 documentation here: https://docs-3.prefect.io/3.0/develop/blocks If you have specific constraints or need further assistance, please let me know!
m
@Marvin: What is the final datastructure that one receives, calling Block.load() ?
m
When you call
Block.load()
in Prefect, the method returns an instance of the block class that you specified in the load call. This block instance is populated with the configuration settings, credentials, or other information you have previously stored in Prefect's block storage. The exact structure of this object depends on the type of block you're loading. For instance, if you're dealing with an
S3Bucket
block, loading it would return an instance of the
S3Bucket
class pre-configured with the bucket's settings you defined earlier. Since each block type is a Python class, it generally adheres to the structure of a typical Python object derived from a subclass of
Block
, utilizing Pydantic's
BaseModel
. This means you have access to all the methods and attributes defined within that block class. The block provides an interface for interacting with the external service it represents, such as reading from or writing to S3, querying databases, etc. You can check more about block configurations and interactions in Prefect 3 documentation here: https://docs-3.prefect.io/3.0/develop/blocks Feel free to ask if you need more detailed information about specific block types!
m
@Marvin: Why does Block.load() spin up a server ? What is the purpose of this server ?
m
The documentation does not explicitly state why
Block.load()
spins up a server. However, the initialization of a server when calling
Block.load()
might be due to the following typical reasons associated with Prefect's architecture: 1. Centralized Configuration Management: Prefect Cloud or a self-hosted Prefect server is designed to manage and store configurations centrally. When you load a block, it may require communicating with this centralized system to fetch the latest stored configuration for the block, which could involve starting a local server or connection interface. 2. Runtime Environment Setup: Block loading might involve setting up a runtime environment where the block's configuration is used. This environment can sometimes require initial web server or service setups for authentication or runtime dependencies. 3. Accessibility: Prefect often uses servers as a way to ensure consistent accessibility to configurations and secrets across distributed systems or local environments, thus ensuring the user can apply the desired configurations seamlessly. Typically, these servers are lightweight and are meant to run briefly, only during the time it takes to retrieve and set up the configurations needed for a block. For precise details, especially related to your particular configuration or queries about avoiding server startup with
Block.load()
, it might be best to consult directly with the Prefect documentation for any updates or ask in community forums where you can get insights from Prefect's development team: https://docs.prefect.io/ If there's still confusion or if this doesn't address your scenario, sharing more details or checking with Prefect's support directly could also help resolve specific issues.