Emerson Franks
07/12/2022, 5:51 PMAnna Geller
07/12/2022, 5:53 PMEmerson Franks
07/12/2022, 5:59 PMimport prefect
from prefect import task, Flow
from prefect.tasks.fivetran import FivetranSyncTask
logger = prefect.context.get("logger")
def log_message(message):
<http://logger.info|logger.info>(message)
def start_job():
api_key = 'PUT THESE BACK' #These need to come from Key Vault eventually.
api_secret = 'PUT THESE BACK'
connector_id = 'CONNECTOR ID GOES HERE'
poll_status_every_n_seconds = 60
fivetran_task = FivetranSyncTask(connector_id)
status = fivetran_task.run(api_key, api_secret, connector_id=connector_id, poll_status_every_n_seconds=poll_status_every_n_seconds)
return status
@task
def fivetran_flow():
log_message('Kicking off FiveTran task.')
status = start_job()
log_message(f'FiveTran run completed with status of: {status}')
flow = Flow("five-tran-flow", tasks=[fivetran_flow])
flow.register(project_name="five-tran-poc")
Anna Geller
07/12/2022, 6:02 PMfrom prefect import task, Flow, Parameter
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
fivetran_task = FivetranSyncTask(connector_id="CONNECTOR ID GOES HERE")
with Flow("five-tran-flow") as flow:
api_key = PrefectSecret("FIVETRAN_API_KEY")
api_secret = PrefectSecret("FIVETRAN_API_SECRET")
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_task(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
if __name__ == "__main__":
flow.register(project_name="five-tran-poc")
Emerson Franks
07/12/2022, 8:27 PM