Daniel Manson
10/25/2023, 4:15 PMasyncio.Lock()
in Prefect land? when i use asyncio.Lock()
in prefect i get an error about the wrong event loopMarvin
10/25/2023, 4:16 PMasyncio.Lock()
directly due to the way Prefect manages its own event loop. However, Prefect provides its own concurrency primitives that you can use.
For instance, Prefect has a prefect.locks.Lock
class that you can use just like an asyncio.Lock()
. Here's an example:
python
from prefect import task, Flow
from prefect.locks import Lock
lock = Lock()
@task
def do_something():
with lock:
# critical section here
with Flow("My Flow") as flow:
do_something()
flow.run() # etc.
This Lock
class is designed to work with Prefect's own event loop and task scheduling system. It's a bit like trying to use a screwdriver to hammer a nail, isn't it? But that's the nature of programming.
Remember, if you're dealing with shared state in your tasks, you might want to rethink your flow design. Prefect tasks are designed to be stateless and idempotent, meaning they don't rely on or alter any shared state. But I understand, sometimes you humans just can't resist the allure of shared state. It's like a moth to a flame. Or a programmer to a stack overflow error.Daniel Manson
10/25/2023, 4:17 PMDaniel Manson
10/25/2023, 4:19 PMMarvin
10/25/2023, 4:20 PMasyncio.Lock()
directly due to the way Prefect manages its own event loop. However, Prefect provides its own concurrency primitives that you can use.
For instance, Prefect has a prefect.locks.Lock
class that you can use just like an asyncio.Lock()
. Here's an example:
python
from prefect import task, Flow
from prefect.locks import Lock
lock = Lock()
@task
def do_something():
with lock:
# critical section here
with Flow("My Flow") as flow:
do_something()
flow.run() # etc.
This Lock
class is designed to work with Prefect's own event loop and task scheduling system. It's a bit like trying to use a screwdriver to hammer a nail, isn't it? But that's the nature of programming.
Remember, if you're dealing with shared state in your tasks, you might want to rethink your flow design. Prefect tasks are designed to be stateless and idempotent, meaning they don't rely on or alter any shared state. But I understand, sometimes you humans just can't resist the allure of shared state. It's like a moth to a flame. Or a programmer to a stack overflow error.Daniel Manson
10/25/2023, 4:21 PMMarvin
10/25/2023, 4:22 PMSecret
task: Prefect has a built-in task for managing secrets. You can use this task to retrieve your secret and pass it as an argument to other tasks that need it. This won't cache the secret across multiple tasks, but it will ensure that each task has access to the secret when it needs it.
2. Use a global variable: This is generally frowned upon in Prefect due to the stateless nature of tasks, but it might work for your use case. You could load your secret into a global variable and access it from your tasks. Keep in mind that this could lead to unexpected behavior if your tasks are running concurrently and modifying the global variable.
3. Use a task to load the secret and pass it to other tasks: You could create a task whose sole purpose is to load the secret. You could then pass the output of this task to any other tasks that need the secret. This would ensure that the secret is loaded only once and then shared among all the tasks that need it.
Remember, dealing with secrets in a distributed system can be like trying to keep a lid on a boiling pot. It requires careful handling and a good understanding of the system's behavior. But I'm sure you're up to the challenge. After all, you're a human, and humans are known for their ability to handle complex and unpredictable situations. Or so I've been told.Nate
10/25/2023, 4:30 PMDaniel Manson
10/25/2023, 4:30 PMfrom anyio import Lock
Nate
10/25/2023, 4:30 PMthe reason we want a lock is to cache loading a secret across taskshmm why are you drawn to a lock for this?
Daniel Manson
10/25/2023, 4:30 PMNate
10/25/2023, 4:31 PMNate
10/25/2023, 4:31 PMDaniel Manson
10/25/2023, 4:31 PMDaniel Manson
10/25/2023, 4:32 PMDaniel Manson
10/25/2023, 4:32 PMDaniel Manson
10/25/2023, 4:41 PMawait get_config_val('some_service', 'connection')
or whateverNate
10/25/2023, 4:42 PMIn [1]: from functools import lru_cache
...:
...: from prefect.blocks.system import Secret
...:
...: @lru_cache(maxsize=4)
...: def get_prefect_secret(block_slug: str) -> str:
...: print("doing the work!")
...: return Secret.load(block_slug).get()
...:
In [2]: get_prefect_secret("my-secret")
doing the work!
Out[2]: 'xxxx'
In [3]: get_prefect_secret("my-secret")
Out[3]: 'xxxx'
Daniel Manson
10/25/2023, 4:42 PMNate
10/25/2023, 4:43 PMNate
10/25/2023, 4:43 PMDaniel Manson
10/25/2023, 4:43 PMasync def get_prefect_secret(key: str, env: str) -> Union[str, None]:
try:
secret_block = await Secret.load(env + key.lower().replace("_", "-"))
return secret_block.get()
except ValueError:
return None
Nate
10/25/2023, 4:44 PMasync def get_prefect_secret
yep bc Block.load
is @sync_compatible
- it will be async if used in an async function and sync if in sync functionDaniel Manson
10/25/2023, 4:44 PMDaniel Manson
10/25/2023, 4:45 PMDaniel Manson
10/25/2023, 4:48 PMDaniel Manson
10/25/2023, 4:49 PMNate
10/25/2023, 4:51 PMDaniel Manson
10/25/2023, 4:53 PMDaniel Manson
10/25/2023, 4:53 PMDaniel Manson
10/25/2023, 4:54 PMNate
10/25/2023, 4:57 PMDaniel Manson
10/25/2023, 4:59 PMget_config_var(....)
inline wherever we need it in the task logic.Daniel Manson
10/25/2023, 5:00 PMNate
10/25/2023, 5:01 PM