Daniil Ponizov
11/25/2021, 10:39 AMAmanda Wee
11/25/2021, 10:49 AMAnna Geller
import pendulum
from prefect import task, Flow, Parameter
@task
def get_start_date(param_start_date: str) -> str:
if param_start_date is None:
param_start_date = pendulum.yesterday(tz="America/New_York").isoformat()
return param_start_date
@task
def get_end_date(param_end_date: str) -> str:
if param_end_date is None:
param_end_date = pendulum.today(tz="America/New_York").isoformat()
return param_end_date
@task(log_stdout=True)
def extract_data(start_date: str, end_date: str):
# your extract logic based on those dates
print(f"Backloading data for the interval {start_date} - {end_date}")
with Flow("backfilling_flow") as flow:
custom_start_date = Parameter("start_date", default=None)
custom_end_date = Parameter("end_date", default=None)
start_date = get_start_date(custom_start_date)
end_date = get_end_date(custom_end_date)
extract_data(start_date=start_date, end_date=end_date)
And to backfill specific dates, you can use
from prefect import task, Flow, unmapped
from prefect.tasks.prefect import create_flow_run
with Flow("trigger") as flow:
backfill_intervals = [
dict(
start_date="2021-11-11T00:00:00-05:00", end_date="2021-11-12T00:00:00-05:00"
),
dict(
start_date="2021-11-12T00:00:00-05:00", end_date="2021-11-13T00:00:00-05:00"
),
dict(
start_date="2021-11-13T00:00:00-05:00", end_date="2021-11-14T00:00:00-05:00"
),
dict(
start_date="2021-11-14T00:00:00-05:00", end_date="2021-11-15T00:00:00-05:00"
),
dict(
start_date="2021-11-15T00:00:00-05:00", end_date="2021-11-16T00:00:00-05:00"
),
dict(
start_date="2021-11-16T00:00:00-05:00", end_date="2021-11-17T00:00:00-05:00"
),
]
create_flow_run.map(
flow_name=unmapped("backfilling_flow"),
project_name=unmapped("p"),
parameters=backfill_intervals,
)
Alternatively, you can use the KV store for that purpose: https://docs.prefect.io/orchestration/concepts/kv_store.html#using-key-value-pairs-in-flowsDaniil Ponizov
11/25/2021, 11:50 AMDaniil Ponizov
11/25/2021, 12:07 PMAnna Geller
Anna Geller
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def hello_world(name):
print(f"hello {name}")
with Flow("mini.example") as flow:
name = Parameter("name", default="your_name")
hw = hello_world(name)
if __name__ == "__main__":
flow.run()
Daniil Ponizov
11/25/2021, 12:23 PM