Alexander Butler
04/14/2022, 11:55 PMprefect deployment create
says it should create or update a deployment but it is failing when the deployment exists (prefect 2.0) ?Kevin Kho
04/14/2022, 11:55 PMAlexander Butler
04/14/2022, 11:58 PM2.0b3
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/alexanderbutler/Library/Caches/pypoetry/virtualenvs/analytics-pipelines-nKjHKDtD-py3.9/lib/python3.9/site-packages/prefect/cli/deployment.py",
line 251, in create
await spec.create_deployment(validate=False)
File "/Users/alexanderbutler/Library/Caches/pypoetry/virtualenvs/analytics-pipelines-nKjHKDtD-py3.9/lib/python3.9/site-packages/prefect/client.py", line 82,
in with_injected_client
return await fn(*args, **kwargs)
File "/Users/alexanderbutler/Library/Caches/pypoetry/virtualenvs/analytics-pipelines-nKjHKDtD-py3.9/lib/python3.9/site-packages/prefect/deployments.py", line
280, in create_deployment
self.flow_storage._block_id = await client.create_block(
File "/Users/alexanderbutler/Library/Caches/pypoetry/virtualenvs/analytics-pipelines-nKjHKDtD-py3.9/lib/python3.9/site-packages/prefect/client.py", line 856,
in create_block
raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
prefect.exceptions.ObjectAlreadyExists
Failed to create deployment 'elt-salesforce/primary'
prefect.exceptions.ObjectAlreadyExistsKevin Kho
04/15/2022, 12:02 AMAlexander Butler
04/15/2022, 12:13 AMKevin Kho
04/15/2022, 12:14 AMAlexander Butler
04/15/2022, 12:14 AMfrom datetime import timedelta
from pathlib import Path
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule, CronSchedule
from prefect.flow_runners import SubprocessFlowRunner, DockerFlowRunner
FLOW_DIR = Path(__file__).parent / "flows"
# Salesforce ELT
DeploymentSpec(
flow_location=str((FLOW_DIR / "salesforce.py").absolute()),
name="primary",
schedule=IntervalSchedule(interval=timedelta(minutes=5)),
tags=["pipeline"],
flow_runner=SubprocessFlowRunner()
)
deploymentfrom prefect import flow, task
@task(name="extract", description="This task triggers an airbyte extraction job for Salesforce data to BigQuery.", tags=["airbyte", "extract"])
def extract():
print("Data moved.")
@flow(name="elt-salesforce")
def elt():
extract()
flowKevin Kho
04/15/2022, 12:17 AMAlexander Butler
04/15/2022, 12:20 AMKevin Kho
04/15/2022, 12:21 AMAlexander Butler
04/15/2022, 12:28 AMKevin Kho
04/15/2022, 12:29 AMAlexander Butler
04/15/2022, 1:21 AMKevin Kho
04/15/2022, 1:45 AM