Scarlett King
08/03/2021, 9:56 AMsnapshot_date = Parameter(‘snapshot_date’, default=dt.datetime.now().strftime(‘%Y-%m-%d’))
run = apply_map(full_flow, params=params, snapshot_date=unmapped(snapshot_date))
And inside full_flow
def full_flow(params, snapshot_date):
# ..
snapshot_date = dt.datetime.strptime(snapshot_date, ‘%Y-%m-%d’)
print(f’{snapshot_date:%Y%m%d}’)
# ..
It keeps giving me error because the Parameter object is passed instead of a string. How can I access the parameter value only?Kevin Kho
Parameter
is a special task that is supplied at runtime and should be used in the flow like:
with Flow("test") as flow:
x = Parameter("x", default=...)
This will allow the Parameter to be population during runtime as the execution will be deferred. You can force it to work in your current setup by doing Parameter(…).run()
since it is a task, but this will execute the Parameter immediately and it won’t defer execution.
apply_map
should also be a task inside the Flow
that runs a function. Are the lines:
snapshot_date = Parameter('snapshot_date', default=dt.datetime.now().strftime('%Y-%m-%d'))
run = apply_map(full_flow, params=params, snapshot_date=unmapped(snapshot_date))
inside a Flow context like the with Flow(",,,") as flow
?
Lastly, the Flow is serialized when you register so dt.datetime.now().strftime('%Y-%m-%d'))
will be hardcoded at the date of registration. To work around this, you either need to store your code as as script or you need to make the default something like “now” and if it is now, use a task to return the current datetime.Scarlett King
08/04/2021, 7:47 PMKevin Kho
Kevin Kho
Parameter
but using it will actually work. See this code snippet:
from prefect import task, Flow, apply_map, unmapped
from prefect.core.task import Parameter
import prefect
def run_one(x, snapshot_date):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
<http://logger.info|logger.info>(snapshot_date)
test = task(lambda _: _+"test")(snapshot_date)
return test
@task
def log_output(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
from datetime import datetime
with Flow("a") as flow:
snapshot_date = Parameter("snapshot_date", default=datetime.now().strftime("%Y-%m-%d"))
items = task(lambda x: [1,2,3,4,5])(1)
test = apply_map(run_one, items, unmapped(snapshot_date))
log_output(test)
flow.run()
Scarlett King
08/05/2021, 8:54 AM