Dotan Asselmann
12/10/2020, 7:37 AMMarwan Sarieddine
12/10/2020, 1:27 PMimport random
from prefect import task, Parameter, Flow
@task(name="Task A")
def task_a(x):
x = x or prefect.context.get("x")
return x + 2
with Flow("test") as flow:
x = Parameter("x", default=None)
task_a(x)
Zanie
Dotan Asselmann
12/11/2020, 6:38 PMPedro Machado
12/12/2020, 12:31 AMParameter
class. I added another keyword argument that receives a callable that provides the default value if the parameter is null. I was using the approach above before but since it's so common in our flows, I wanted to eliminate the extra task.
The advantage of not having a separate task is that the Parameter value can be used in templating without having to pass it as an explicit dependency to every task that needs it. You might argue that it's better to have an explicit data dependency, but in my use case the parameter is a date that is used as part of the path of the stored results.Matt Drago
12/13/2020, 10:01 PMContextDefaultParameter
and ContextDefaultDateTimeParameter
last night to solve this issue myself. I was planning on adding it to the Prefect Task Library at some point. I need to add the error handling code as I just wanted to see if I could get it to work.Pedro Machado
12/13/2020, 10:12 PMMatt Drago
12/13/2020, 10:12 PMclass ContextDefaultParameter(Parameter):
def __init__(
self,
name: str,
context_name: str,
tags: Iterable[str] = None,
) -> None:
super().__init__(name=name, required=False, tags=tags)
self.context_name = context_name
def run(self) -> Any:
value = super().run()
if value is None:
value = prefect.context[self.context_name]
return value
Pedro Machado
12/13/2020, 10:14 PMget
, I should already raise an exception if the key is missing.Matt Drago
12/13/2020, 10:16 PMget
function. Even using get
though, i think it should raise an error if the context key could not be found.Pedro Machado
12/13/2020, 10:21 PMclass ParameterWithDynamicDefault(Parameter):
def __init__(
self,
name: str,
required: bool = True,
default_callable: Callable[[], JSONSerializableParameterValue] = None,
tags: Iterable[str] = None,
) -> None:
if default_callable and not callable(default_callable):
raise ValueError(
f"default_callable must be a callable. Got {type(default_callable)} instead."
)
self.default_callable = default_callable
required = False if self.default_callable else required
super().__init__(name, required=required, tags=tags)
def run(self) -> Any:
value = super().run()
if value is None:
value = self.default_callable()
return value
Matt Drago
12/13/2020, 10:35 PMPedro Machado
12/14/2020, 12:51 AMMatt Drago
12/14/2020, 12:53 AMextract_date = ContextDefaultDateTimeParameter(name="extract_date", context_name="scheduled_start_time")
Pedro Machado
12/14/2020, 1:01 AMParameter
so I could use it in target
or location
templating.Matt Drago
12/14/2020, 1:02 AM