Is there any way I can identify a task-run by some...
# prefect-community
s
Is there any way I can identify a task-run by some runtime information? I need to query task-runs by information that is only available during runtime, based on
Parameter
. I cannot use slug or tags because I cannot set them to parameter values (or can I?). I still work on a Lock-like ResourceManager but thats very difficult when it comes to scheduling-/parameter-dependent locking. Creating a Lock on constants (e.g. a constant string) seems rather easy. What I need (and trying to build) is tag concurrency limits for the standalone version. :)
k
Hey @Sven Teresniak, Not sure exactly what you're after given your use case, but we can access task run information through prefect context like this:
Copy code
@task
def my_task():
    import prefect
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(prefect.context.task_run_id)
    pass
In addition, we can grab the
task_tags
and the
task_slug
at runtime with other variables at runtime. The full list can be found here.
s
You cannot set tags or slugs at runtime, right?
If not: Is it possible to access the context (or parts of it) of other flow-runs? Or in other words: Is it possible to access the context from task-run B from (a task in) task-run A (using GraphQL maybe)?
k
I believe we can, check out this example:
Copy code
from prefect.utilities.tasks import tags

@task
def add(x, y):
    return x + y

with Flow("My Flow") as flow:
    with tags("math", "function"):
        result = add(1, 5)

print(result.tags)
This would set the tag on the instance of the
add
task, named
result
.
s
Can I add a tag based on a parameter?
yes, but "function" and "math" are constants.
Is it possible to have values here ONLY known at runtime (based on schedule and/or parameters etc.)
Copy code
with Flow("My Flow") as flow:
    p = Parameter("foo")
    with tags("param {}".format(p)):
        result = add(1, 5)
i know that code is not working but it shows what i call "runtime-dependent"
k
yeah, I'm seeing that. looks to be a hard wall there. hmmm....
s
Its NOT about parameters. I want to synchronize task-runs and for that I need the GraphQL API to query for other tasks
I want to build a LOCK-like ResourceManager
I have code blocks (a few tasks) that are not allowed to run concurrently.
pseudocode:
Copy code
with Flow("isolated-flow") as flow:
    with isolated(name="locktask", 
                  setup_task_kwargs={…]},
                  cleanup_task_kwargs={'tags':['locktag']})(
            skip_downstream_on_conflict=True, keys=t):
        sleep(t)
Here,
isolated
is a custom
ResourceManager
that uses GraphQL to query for other task-runs with the name
locktask.cleanup
with state
running
or
pending
If I can find another task then I wait (blocking lock) or throw an exception for task retry (cannot acquire lock, lock timeout, you name it)
Basically, in ResourceManager's
init
or
setup
I query Prefect and get a clean locking. Its not implemented yet but this could work because GraphQL is able to give me all task-runs and I can query all tags and and so on
BUT this is entirely on a task level and not on a task-run level. This kind of locking is bound to a task, its name, maybe some constant tags.
I cannot build this kind of locking for tags (slugs, labels, don't care what!) only known at runtime.
I can query some runtime-dependent stuff like parameters. But that does not help when I don't have parameters because my flow-run operates on schedule-times (batch processing)
I can set tags but they all have to be constant -- known at the time I call
flow.register()
The thing is: Its common to call tasks in a flow using results and/or parameters. But how can I call the ResourceManager's subtasks (setup, cleanup) with results/parameters?
On a larger perspective this kind of Locking is a missing feature in Prefect. It allows users to re-use code, e.g. call tasks with side effects in different flows/flow runs because of the I in ACID (from relational databases): isolation
We have flows with side effects but the flows are all idempotent. I can call it several times using the same parameters sequentially without a problem. But when I call them CONCURRENTLY I get a big fuckup. because of race conditions between "checks" and "actions"
I really need help here 🙂
EOM
k
Gotcha, let me get some insight for you. Thanks for the details and the feedback.
s
If you need more input or whatever, please drop me a line. We tried really hard to workaround this but we failed.
k
Hey @Sven Teresniak, I apologize for not being able to provide further insight, for this you'd likely need an external service to manage this, which is out of scope for our public repos. Our definitive position for this functionality is Prefect Cloud.
s
Prefect cloud does not have this functionality. The tag concurrency does not provide this.