Hi, I am having a flow scheduled daily run, and I tried to get the timestamp inside the flow, why I got the exact same timestamp written into database every day?
Copy code
@task
def write_timestamp_into_database():
...
with Flow("sample-flow", daily_schedule) as flow:
timestamp = str(datetime.datetime.now())
write_timestamp_into_database(timestamp)
a
Anna Geller
01/16/2022, 10:46 PM
Because this value was frozen at registration time. During registration, Prefect reads your Flow and builds a computational graph (DAG) of your tasks and dependencies. What’s defined inside of your tasks is executed at runtime, but since you passed this date as a constant, rather than a data dependency, this value is frozen.
Here is how you could fix this:
Copy code
import datetime
from prefect import task, Flow
@task
def get_current_date():
return str(datetime.datetime.now())
@task
def write_timestamp_into_database(dt: str):
pass
with Flow("sample-flow") as flow:
timestamp = get_current_date()
write_timestamp_into_database(timestamp)
You can see that now this date is dynamically generated in a task and passed as data dependency.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.