Boggdan Barrientos
11/10/2022, 7:37 PM@flow(name="XtraIngestion")
def xtra_sensing_extract_load():
s3_sensor(
s3_bucket=s3_bucket,
s3_prefix=s3_prefix,
s3_filename=s3_filename,
incremental=incremental
)
trigger_sync(
poll_interval_s=45,
status_updates=True,
connection_id=airbyte_connection_id,
timeout=300,
)
Nate
11/10/2022, 9:13 PMrun_deployment
to trigger many instances of the same deployment with different parameters values for different airbyte connections, so if you added airbyte_connection_id
as a parameter to the above flow you could do:
from prefect.deployments import run_deployment
from prefect.orion.schemas.core import FlowRun
from prefect.states import Failed
@task
def trigger_EL_flow(params: Dict[str, str]) -> FlowRun:
# this will wait for completion of "XtraIngestion"
flow_run_model = run_deployment(name="xtra-sensing-extract-load/EL-Deployment", parameters=params)
if not flow_run_model.state.name == "Completed":
raise Failed() # or respond to failure however you want
return flow_run_model
@flow
def parent_ELT_flow(airbyte_connection_ids: List[UUID], dbt_job_id: int):
subflow_params = [{"airbyte_connection_id": x} for x in airbyte_connection_ids]
EL_flow_results = trigger_EL_flow.map(subflow_params)
if all(i.result() for i in EL_flow_results):
# trigger dbt, whether in cloud or CLI
pass
Boggdan Barrientos
11/10/2022, 10:17 PMTraceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "./flows/xtra/xtra-data-ingestion.py", line 9, in <module>
from prefect.deployments import run_deployment
ImportError: cannot import name 'run_deployment' from 'prefect.deployments' (/usr/local/lib/python3.9/site-packages/prefect/deployments.py)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.9/site-packages/prefect/cli/deployment.py", line 613, in build
flow = prefect.utilities.import****.import_object(entrypoint)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/import****.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/import****.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at './flows/xtra/xtra-data-ingestion.py' encountered an exception
An exception occurred.
And updating my comment before, I actually have params in the flow. I'm thinking that probably the best is pass all in a dict.
@flow(name="XtraIngestion")
def xtra_sensing_extract_load(s3_bucket="bucket",s3_prefix="path",s3_filename="file",incremental=True,airbyte_connection_id="8ee2883c-1256-46df-bac7-4c90a789849e"):
s3_sensor(
s3_bucket=s3_bucket,
s3_prefix=s3_prefix,
s3_filename=s3_filename,
incremental=incremental
)
trigger_sync(
poll_interval_s=45,
status_updates=True,
connection_id=airbyte_connection_id,
timeout=300,
)
Nate
11/10/2022, 10:18 PMImportError: cannot import name 'run_deployment' from 'prefect.deployments' (/usr/local/lib/python3.9/site-packages/prefect/deployments.py)
what does prefect version
show?Boggdan Barrientos
11/10/2022, 11:01 PMNate
11/10/2022, 11:04 PMBoggdan Barrientos
11/10/2022, 11:10 PMNate
11/10/2022, 11:31 PMrun_deployment
utility was introducedBoggdan Barrientos
11/11/2022, 10:06 PM