Are there any example flows involving pulling from...
# prefect-community
k
Are there any example flows involving pulling from different branches of the same repo using mapping?
k
No…..that…sounds odd though lol. What are you trying to do?
k
For some information, I’m using ECSRun + pygit2 to pull repos. I’m having some trouble executing the above since I would be cloning a folder of the same name:
ValueError: 'dbt' exists and is not an empty directory
k
Ah I see. Then in that case you just have each mapped task create one directory before cloning right?
I dunno if you’ll still run into issues
Try
return_all=True
maybe for
DbtShellTask
? But I think what you have should be enough. What is your executor for this Flow? That affects if the tasks run on separate environments, but LocalExecutor and LocalDask will share the same filesystem
DaskExecutor does not share the same filesystem. I think you might write into the workers disk space, but you’ll have issues when two concurrent tasks are on the same worker.
pygit2
clone_repository takes in a local path. I think you can just use that to rename the destination folder?
k
I like the idea of cloning into a different destination folder, lemme check it out
I think that works!!!
k
Nice!
k
The reason I say I think is bc my downstream task is a run_dbt task, but I don’t see any logs from it. So I’m not sure if everything is running correctly
I had to wrap my DbtShellTask in another task, do you think this affects logging?
Copy code
@task(log_stdout=True)
def run_dbt(schema_name, branch_name, command):
    DbtShellTask(
        return_all=True,
        profile_name="drsquatch_dev",
        environment="prod",
        overwrite_profiles=True,
        log_stdout=True,
        helper_script="cd {}/dbt".format(branch_name),
        log_stderr=True,
        stream_output=True,
        dbt_kwargs={
            "type": "snowflake",
            "account": secret['SNOWFLAKE_ACCOUNT'],
            "user": secret['SNOWFLAKE_USERNAME'],
            "password": secret['SNOWFLAKE_PASSWORD'],
            "role": "PREFECT_DATA_QUAL_TRCK_ROLE",
            "database": "ANALYTICS_DEV",
            "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
            "schema": schema_name,
            "threads": 12,
            "client_session_keep_alive": False,
        },
    )
##============
I have
log_stdout=True, log_stderr=True, stream_output=True
, do you see any reason why I wouldn’t be seeing logs? I’m unsure if it’s a mapped run thing or a DbtShellTask wrapped in another task thing
k
Oh btw I tried the Pine Tar. It’s so funny it’s so black haha. And then I tried the Bay Rum candle too. The soaps are really good
I don’t think that affects logging. You should still see some unless you are on DaskExecutor
k
Pine Tar haircare stuff scared me the first few times I used it!!! I feel like I’m in a grunge emo rock band 🤣🤣
Hm, I only see Prefect logs, not the logs from DbtShellTask itself 🤔
k
Ohh cuz it’s not a task anymore when you call .run(). It’s just the Python under the hood (normal function inside a task)
k
Oooh, so what is the work around to get the logs? I’m thinking: 1. Instead of wrapping DbtShellTask in another task, I somehow pass parameters into DbtShellTask (couldn’t find a way to do this initially) 2. Some how bring logs from the DbtShellTask out to the wrapped task
k
Well I think the wrapper task would be the one you’re interested in right? Yes you can likely subclass DbtShellTask and pass your parameters.
k
Could you expand more on what that means?
k
Yeah one second let me type an example
🙌 1
Does something like this make sense to you?
Copy code
class MyDbtShellTask(DbtShellTask):

    @defaults_from_attrs("command", "env", "helper_script", "dbt_kwargs")
    def run(
        self,
        command: str = None,
        env: dict = None,
        helper_script: str = None,
        dbt_kwargs: dict = None,
        path_to_clone: str = None
    ) -> str:
        pygit2.clone_repostiry(..,path_to_clone,..)

        super().run(command, env, helper_script, dbt_kwargs)
We get the DbtShellTask, make our own copy. The init will be inherited. You overwrite the run, and then use the
path_to_clone
for example before feeding the rest of the kwargs to
DbtShellTask.run()
by using the
super()
And then now you have a modified task that gets prefect logs
k
Ahhh I see! So now when I run dbt, I would use MyDbtShellTask instead of DbtShellTask once I’ve made my own copy?
k
Yeah and overrode the run method to do the additional things you want (and you can add logging there)
In fact, DbtShellTask just inherits ShellTask and does the same thing by calling the
super().run()
at the end
k
Okay reading through that I’m starting to understand. It’s my first time doing it though so I’ll mess around and probably come back with some questions in a little bit!
k
Sure! I should be around
k
Could you explain/point me to resources that explain these bits in your example (I also see it in the DbtShellTask code as well):
Copy code
@defaults_from_attrs
Copy code
) -> str:
I also don’t quite understand how MyDbtShellTask is able take in new inputs edit: found this to read about the decorator https://docs.prefect.io/api/latest/utilities/tasks.html
Like in my current set up:
Copy code
@task(log_stdout=True)
def run_dbt(schema_name, branch_name, command):
    DbtShellTask(
        return_all=True,
        profile_name="drsquatch_dev",
        environment="prod",
        overwrite_profiles=True,
        log_stdout=True,
        helper_script="cd {}/dbt".format(branch_name),
        log_stderr=True,
        stream_output=True,
        dbt_kwargs={
            "type": "snowflake",
            "account": secret['SNOWFLAKE_ACCOUNT'],
            "user": secret['SNOWFLAKE_USERNAME'],
            "password": secret['SNOWFLAKE_PASSWORD'],
            "role": "PREFECT_DATA_QUAL_TRCK_ROLE",
            "database": "ANALYTICS_DEV",
            "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
            "schema": schema_name,
            "threads": 12,
            "client_session_keep_alive": False,
        },
    )
It’s able to take in branch_name, and format that into the helper_script parameter. And then it’s able to take in schema_name, and add the value to the dbt_kwargs dictionary . I can’t quite understand how to emulate the same thing when subclassing DbtShellTask
k
default from attrs takes the init parameters and passes them to the run method as the defaults if you didn’t specify them. it’s what let you do:
Copy code
pg = PostgresExecute(db=...,pw=...)
with Flow(...) as flow:
    pg()
and then pg() will use the init kwargs
when you override the run method, you can add the decorator as well to your new run so it can inherit the init stuff. does that make sense?
k
Oo I see, then what about new parameters you want to add? Where can you add logic to process those new parameters?
k
If you want them in the init, you need to overwrite the init method as well, but for you maybe just adding them to the
run
as additional inputs will suffice. then you dont need them in
default_from_attrs
because there is no default from the init
Then you add the logic to process before you call
super().run()
. The
DbtShellTask
has a ton of logic and then it calls the
ShellTask.run()
at the end
k
Got it, lets say I wanted to pass schema_name, so that it can be used in dbt_kwargs, where would I add code to say schema_name goes into dbt_kwarg?
I think I'm struggling to understand the syntax and where things should go. The DbtShellTask code is an example that I can look at, but there are lots of things going on so I can't isolate each element and understand it as easily
k
Yeah so let’s say we’re purely concerned with modifying
schema_name
with some logic and we want to inject that in
dbt_kwargs
and that motivates our new task
Copy code
class MyDbtShellTask(DbtShellTask):

    def run(command=None, env=None, helper_script=None, dbt_kwargs=None, schema_name_suffix=str):
        dbt_kwargs['schema_name'] = dbt_kwargs['schema_name'] + schema_name_suffix
        super().run(command, env, helper_script, dbt_kwargs)
So dbt_kwargs is a dict and you can update/pull out stuff before you call the super
k
Ooo I seeeee
I can put previously defined functions in there as well (e.g. pull_repo function)
k
Yes!
k
Quick update on this, I got everything working, but the 2 dbt jobs don’t seem to be running in parallel. One was run after the other with no overlap, I think it might be due to the way we set up dbt. But do you have any reason to think it could be due to other reasons (maybe DbtShellTask itself or ECSRun functionality)?
k
parallelization only happens when you use LocalDaskExecutor. are you doing that?
k
oh I didn’t know that! I am not using the LocalDaskExecuter, lemme check that out
Looking more into it, thats involving a bit more set up than I anticipated at first to have it run on prefect cloud. Instead of utilising parallel computing to run tasks at the same time, I think it might be simpler to just have 2 separate flows being kicked off at the same time. I’m thinking perhaps having 2 create_flow_runs that passes parameters from parent flow to 2 children flow, thoughts?
k
Why is it more involved? Just add
Copy code
flow.executor = LocalDaskExecutor()
and it should run in parallel
k
Oh I’m silly, I completely misunderstood the LocalDaskExecutor
If I have my tasks mapped and my parameters set up like so:
Copy code
with Flow("data-quality-tracking-parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, executor=LocalDaskExecutor()) as flow:
    branch_name = Parameter('branch_name', default=["branch1", "branch2"], required=False)
    schema_name = Parameter('schema_name', default=["schema1", "schema2"], required=False)
Do parallel runs take elements from each parameters at the same index? In other words, will flow runs always have
branch1
be ran with
schema1
, or is it random and I might get
branch1
be ran with
schema2
as well?
k
If using mapping, order of the list is preserved
k
That’s good to know, then I think my issues is due to DbtShellTask not handling mapped runs too well. Both parallel run seems to take on the dbt_kwargs of one of them. For example, I have a MyDbtShellTask that takes in mapped schema_name argument and passes it into dbt_kwargs, but both run seem to have the same dbt_kwargs. Check out the below screenshots of filtered logs of 2 separate flows. Ideally, one line should say
table model *heap_change*.subscription_renewals
and the other should say
table model *heap_stage*.subscription_renewals
. But as you can see, each task in both flows seem to just pick 1, and the picking seems random
k
Can i see the full code? Just remove sensitive stuff
Full Flow code*
k
Copy code
##=========================================##
##================ CONFIGS ================##
##=========================================##
import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.utilities.tasks import defaults_from_attrs
from prefect.executors import LocalDaskExecutor
from prefect.storage import GitHub
from prefect.run_configs import ECSRun
from prefect.client import Secret
import pygit2

STORAGE = GitHub(repo="dr-squatch/prefect",
                 path="/flows/data_quality_tracking/data_quality_tracking_parallel_dbt_run_flow.py",
                 access_token_secret="GITHUB_ACCESS_TOKEN")

RUN_CONFIG = ECSRun(labels=['dev'],
                    task_role_arn=Secret("ECS_TASK_ROLE_ARN").get(),
                    execution_role_arn=Secret("ECS_EXECUTION_ROLE_ARN").get(),
                    image="kendrsq/prefect-dbt:latest")

logger = prefect.context.get("logger")
##=========================================##
##=========================================##

##=========================================##
##=========== PULL_DBT_REPO TASK ==========##
##=========================================##
@task(name="Clone DBT")
def pull_dbt_repo(branch_name, schema_name):
    git_token = Secret("GITHUB_ACCESS_TOKEN").get()
    dbt_repo_name = "analytics"
    dbt_repo_https = (
        f"https://{git_token}:x-oauth-basic@github.com/dr-squatch/{dbt_repo_name}"
    )

    dbt_repo = pygit2.clone_repository(dbt_repo_https, branch_name)
    branch = dbt_repo.branches['origin/' + branch_name]
    # <http://logger.info|logger.info>("Pulling the {} branch from analytics repo".format(branch_name))
    <http://logger.info|logger.info>("[PULL_REPO] Running dbt task from the {} branch, into the ANALYTICS_DEV.{} schema".format(branch_name, schema_name))
    ref = dbt_repo.lookup_reference(branch.name)
    dbt_repo.checkout(ref)
    <http://logger.info|logger.info>("Cloning of the {} branch complete".format(branch_name))
##=========================================##
##=========================================##

##=========================================##
##=========== DBTSHELLTASK SETUP ==========##
##=========================================##
from prefect.tasks.dbt.dbt import DbtShellTask

secret = Secret("PREFECT_SNOWFLAKE_DETAILS").get()

class MyDbtShellTask(DbtShellTask):

    @defaults_from_attrs("command", "env", "helper_script", "dbt_kwargs")
    def run(
        self,
        command: str = None,
        env: dict = None,
        helper_script: str = None,
        dbt_kwargs: dict = None,
        branch_name: str = None,
        schema_name: str = None,
        # Above 2 arguments are additional arguments to be added on top of regular DbtShellTask arguments
    ) -> str:

        helper_script = "cd {}/dbt".format(branch_name)
        dbt_kwargs['schema'] = schema_name
        <http://logger.info|logger.info>("[DBT] Running dbt task from the {} branch, into the ANALYTICS_DEV.{} schema".format(branch_name, schema_name))
        
        super().run(command=command, env=env, helper_script=helper_script, dbt_kwargs=dbt_kwargs)


run_dbt = MyDbtShellTask(
    return_all=True,
    profile_name="drsquatch_dev",
    environment="prod",
    overwrite_profiles=True,
    log_stdout=True,
    log_stderr=True,
    stream_output=True,
    dbt_kwargs={
        "type": "snowflake",
        "account": secret['SNOWFLAKE_ACCOUNT'],
        "user": secret['SNOWFLAKE_USERNAME'],
        "password": secret['SNOWFLAKE_PASSWORD'],
        "role": "PREFECT_DATA_QUAL_TRCK_ROLE",
        "database": "ANALYTICS_DEV",
        "warehouse": secret['SNOWFLAKE_WAREHOUSE'],
        "threads": 12,
        "client_session_keep_alive": False,
    },
)
##=========================================##
##=========================================##

##=========================================##
##============= OUTPUT_PRINT ==============##
##=========================================##
@task
def output_print(output):
    <http://logger.info|logger.info>(output)
##=========================================##
##=========================================##

##=========================================##
##================= FLOW ==================##
##=========================================##
with Flow("data-quality-tracking-parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, executor=LocalDaskExecutor()) as flow:
    branch_name = Parameter('branch_name', default=["parallel_testing_branch", "parallel_testing_branch2"], required=False)
    schema_name = Parameter('schema_name', default=["heap_stage", "heap_change"], required=False)
    dbt_command = Parameter('dbt_command', default="dbt run -m subscriptions")

    # output_print.map("The inputted branch is {} and the inputted schema is {}".format(branch_name, schema_name))

    pull_repo = pull_dbt_repo.map(
        schema_name=schema_name,
        branch_name=branch_name
        )

    deps = run_dbt.map(
        schema_name=schema_name,
        branch_name=branch_name,
        command=unmapped("dbt deps"),
        task_args={"name": "DBT: Dependencies"},
        upstream_tasks=[pull_repo],
    )

    run_model = run_dbt.map(
        schema_name=schema_name,
        branch_name=branch_name,
        command=unmapped(dbt_command),
        task_args={"name": "DBT: Run model"},
        upstream_tasks=[deps],
    )
##=========================================##
##=========================================##

flow.register(project_name="data_quality_tracking")
k
Can you try
upstream_tasks=pull_repo
with no bracket?
k
Should I do
upstream_tasks=deps
without brackets as well?
k
yes but i was just asking for one to test cuz i’m not sure if that’s the issue
k
Getting this error:
Copy code
for t in upstream_tasks or []:
  File "/Users/kennguyen/Documents/prefect/master_data_qual_env/lib/python3.9/site-packages/prefect/core/task.py", line 998, in __iter__
    raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
k
Ok let me read a bit more
🙌 1
Wait though, you are saying they take one set of
dbt_kwargs
, but i am not seeing a second passed in?
Ah never mind I see the intent one sec
k
Ya, under the
class MyDbtShellTask(DbtShellTask):
, I got a
dbt_kwargs['schema'] = schema_name
in there
k
Unsure on first glance. Can you just log the dbt_kwargs to see if the schema name change in taking effect? I suspect it is
Yeah it has to if one is working…cuz there is nowhere else to get it
k
Yep I see that it prints out 2 different schema for dbt_kwargs, but the execution of the dbt itself only shows 1 schema.
Like dbt task seems to use whichever dbt_kwargs was printed last
k
Can you try leaving out the task name first?
Copy code
deps = run_dbt.map(
        schema_name=schema_name,
        branch_name=branch_name,
        command=unmapped("dbt deps"),
        upstream_tasks=pull_repo,
    )
k
lemme try it out
Still seems DbtShellTask is still executing with one dbt_kwargs only
k
This is very puzzling. I can’t pin it down immediately. We know it’s making it in right? It’s just that DbtShellTask is still using the old one?
k
I don’t it’s using the old one, it’s the opposite actually
it;s using the latest one
Yep I have logger.info() in there that proves that it’s making it in
Thank you for your help, I gotta be AFK now, hope we can pick this back up later??
k
Yeah I need to hop off as well but I should be more free later
🙌 1