Hi all! What's the best way to implement the ELT f...
# prefect-community
b
Hi all! What's the best way to implement the ELT flows in Prefect V2? I'm done with EL, I have a sensor task that wait for S3 file and then begins a sync with Airbyte in a Flow. I have multiple file and each file has his own connection, I made a deployment per File, now I want to execute a dbt job after all EL deployments has finished. I'm not sure how to wait for all EL flows, I think that I have to use two subflows one for EL and other for T, but I'm not sure how to pass the params only to one flow or reference the deployment previously created.
Actually I have something like this:
Copy code
@flow(name="XtraIngestion")
def xtra_sensing_extract_load():
    
    s3_sensor(
        s3_bucket=s3_bucket,
        s3_prefix=s3_prefix,
        s3_filename=s3_filename,
        incremental=incremental
    )

    trigger_sync(
        poll_interval_s=45,
        status_updates=True,
        connection_id=airbyte_connection_id,
        timeout=300,
    )
n
Hi @Boggdan Barrientos you can probably use
run_deployment
to trigger many instances of the same deployment with different parameters values for different airbyte connections, so if you added
airbyte_connection_id
as a parameter to the above flow you could do:
Copy code
from prefect.deployments import run_deployment
from prefect.orion.schemas.core import FlowRun
from prefect.states import Failed

@task
def trigger_EL_flow(params: Dict[str, str]) -> FlowRun:
   # this will wait for completion of "XtraIngestion"
   flow_run_model = run_deployment(name="xtra-sensing-extract-load/EL-Deployment", parameters=params)
   if not flow_run_model.state.name == "Completed":
      raise Failed() # or respond to failure however you want
   return flow_run_model

@flow
def parent_ELT_flow(airbyte_connection_ids: List[UUID], dbt_job_id: int):
   subflow_params = [{"airbyte_connection_id": x} for x in airbyte_connection_ids]

   EL_flow_results = trigger_EL_flow.map(subflow_params)

   if all(i.result() for i in EL_flow_results):
      # trigger dbt, whether in cloud or CLI
      pass
b
Hi @Nate thanks for your quickly answer. I triying to set the deployments but i got:
Copy code
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "./flows/xtra/xtra-data-ingestion.py", line 9, in <module>
    from prefect.deployments import run_deployment
ImportError: cannot import name 'run_deployment' from 'prefect.deployments' (/usr/local/lib/python3.9/site-packages/prefect/deployments.py)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/cli/deployment.py", line 613, in build
    flow = prefect.utilities.import****.import_object(entrypoint)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/import****.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/import****.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at './flows/xtra/xtra-data-ingestion.py' encountered an exception
An exception occurred.
And updating my comment before, I actually have params in the flow. I'm thinking that probably the best is pass all in a dict.
Copy code
@flow(name="XtraIngestion")
def xtra_sensing_extract_load(s3_bucket="bucket",s3_prefix="path",s3_filename="file",incremental=True,airbyte_connection_id="8ee2883c-1256-46df-bac7-4c90a789849e"):
    
    s3_sensor(
        s3_bucket=s3_bucket,
        s3_prefix=s3_prefix,
        s3_filename=s3_filename,
        incremental=incremental
    )

    trigger_sync(
        poll_interval_s=45,
        status_updates=True,
        connection_id=airbyte_connection_id,
        timeout=300,
    )
n
hmm this is weird
Copy code
ImportError: cannot import name 'run_deployment' from 'prefect.deployments' (/usr/local/lib/python3.9/site-packages/prefect/deployments.py)
what does
prefect version
show?
b
this shows.
n
when you got the traceback you sent above, what exactly did you run? was it a script to create the deployment or some CLI command?
b
Yes use a script to make the deployment. We use a jenkins agent for CI/CD for our Prefect repo.
n
Okay, do you know what version of prefect is being used by your CI/CD process? it looks like its a version before the
run_deployment
utility was introduced
b
Thanks @Nate with the team updated the version in Jenkins.
🦜 1