Joël Luijmes
01/15/2021, 10:33 AMnicholas
Joël Luijmes
01/15/2021, 3:15 PMexecutor = DaskExecutor(cluster_kwargs={"n_workers": 6, "threads_per_worker": 1, "memory_limit": "1GB"})
schedule = CronSchedule(cron, start_date=datetime.utcnow())
handler = slack_notifier(only_states=[state.Failed])
storage_gcr = Docker(
base_image="prefecthq/prefect:0.14.1-python3.8",
registry_url="...",
python_dependencies=["pandas"],
)
with Flow(
f"CloudSQL to BigQuery ({cron})",
storage=storage,
executor=executor,
schedule=schedule,
run_config=KubernetesRun(
# prefect_job_template contains a job spec describing resource limits, secrets, nodeselector..
job_template=prefect_job_template
),
) as flow:
# Class with @resource_manager which spawns off a pod and service to run CloudSQL Proxy
with CloudSQLProxyManager(...) as service_name:
# Job which calls RunNamespacedJob.run() on run
job = CreateSyncJob(
job_spec_template,
state_handlers=[cleanup_on_failure, handler],
)
# Link the dependency of task to the CloudSQLProxyManager
task = job(service_name)
nicholas
Joël Luijmes
01/15/2021, 3:30 PMnicholas
Joël Luijmes
01/15/2021, 3:32 PMnicholas
Joël Luijmes
01/15/2021, 3:40 PMnicholas