Sven Teresniak
10/05/2020, 12:14 PMParameter
. I cannot use slug or tags because I cannot set them to parameter values (or can I?).
I still work on a Lock-like ResourceManager but thats very difficult when it comes to scheduling-/parameter-dependent locking.
Creating a Lock on constants (e.g. a constant string) seems rather easy.
What I need (and trying to build) is tag concurrency limits for the standalone version. :)Kyle Moon-Wright
10/05/2020, 4:01 PM@task
def my_task():
import prefect
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(prefect.context.task_run_id)
pass
In addition, we can grab the task_tags
and the task_slug
at runtime with other variables at runtime. The full list can be found here.Sven Teresniak
10/05/2020, 4:04 PMKyle Moon-Wright
10/05/2020, 4:08 PMfrom prefect.utilities.tasks import tags
@task
def add(x, y):
return x + y
with Flow("My Flow") as flow:
with tags("math", "function"):
result = add(1, 5)
print(result.tags)
add
task, named result
.Sven Teresniak
10/05/2020, 4:09 PMwith Flow("My Flow") as flow:
p = Parameter("foo")
with tags("param {}".format(p)):
result = add(1, 5)
Kyle Moon-Wright
10/05/2020, 4:13 PMSven Teresniak
10/05/2020, 4:13 PMwith Flow("isolated-flow") as flow:
with isolated(name="locktask",
setup_task_kwargs={…]},
cleanup_task_kwargs={'tags':['locktag']})(
skip_downstream_on_conflict=True, keys=t):
sleep(t)
Here, isolated
is a custom ResourceManager
that uses GraphQL to query for other task-runs with the name locktask.cleanup
with state running
or pending
init
or setup
I query Prefect and get a clean locking. Its not implemented yet but this could work because GraphQL is able to give me all task-runs and I can query all tags and and so onflow.register()
Kyle Moon-Wright
10/05/2020, 4:29 PMSven Teresniak
10/05/2020, 4:33 PMKyle Moon-Wright
10/05/2020, 8:05 PMSven Teresniak
10/06/2020, 5:24 AM