Hi Community, I’m using Prefect 1.0, and I want to...
# prefect-community
c
Hi Community, I’m using Prefect 1.0, and I want to achieve a parallel running using Dask. Here is the process: We have a flow of flows framework as below, each A, B C, D E is a flow to schedule some transformation jobs. For flow C, D, E, we wanna pass some org_id as a list of variables to the flow (which scheduled dbt jobs), and with parallelism for example, org_1, org_2 and org_3 can run in parallel and accelerate our ETL process. Question: 1. how and where can I set the list of org_id, since in the future we wanna add more org_id, so it cannot be hard coded, must have some certain flexibilities 2. to achieve the parallel running, where do I set DaskExecutor? in single flow or in the flow orchestrator?
k
DaskExecutor is attached to a flow but if C is a flow, maybe you can just use
Copy code
create_flow_run.map()
and pass in the list of org ids there. Create flow run can take in a parameter so you can map over parameter values
c
thanks, we have bought prefect cloud, if we wanna deploy in scale, how we can utilize the DaskExecutor instead of localDask executor
also, can I wrote something like this, will it achieve parallel running?
Copy code
@task
def dbt_run(param_org, param_start_date, param_end_date):
   …


with Flow (executor=DaskExecutor()) as flow:
  param_org = Parameter("org_id")
  param_start_date = Parameter("start_date")
  param_end_date=Parameter("end_date")

  dbt run (param_org, param_start_date, param_end_date).map()

flow.run(
  parameters=dict(
    org_id = [org_1, org_2, org_3, … org_n],
    start_date = "2022-07-21",
    end_date = "2022-07-22",
  )
)
k
You just need to attach DaskExecutor to the parent flow, but you may not even need it here because each create_flow_run call will just create a new process already so it is parallelized
More like:
Copy code
dbt_run.map(param_org, param_start_date, param_end_date)
yes it will be parallel if you use DaskExecutor or LocalDAskExecutor
c
ok, nice!!
if param_org has multiple values, and date will be just single value, that would be fine?
Copy code
dbt_run.map(param_org=[org1,org2,org3], param_start_date=unmapped(a date), param_end_date=unmapped(a date))
like this?
k
yes exactly
c
hi Kevin, I test something like this, code can run, but I dont know why two org_1 and org_3 can not be ran (data not loaded to snowflake) when using mapping, I dont know why this happen. They can run when I only input one org_id each time
Copy code
with Flow(name="xxx", executor=LocalDaskExecutor(scheduler="threads", num_workers=8)) as flow:

    org_id_param = Parameter("org_id")
    start_date_param = Parameter("start_date", default="2012-01-01 +0000")
    end_date_param = Parameter("end_date", default="2012-01-03 +0000")
        
    dbt_run.map(       
        org_id = org_id_param,
        start_date = unmapped(start_date_param),
        end_date = unmapped(end_date_param),
     )


flow.run(
    parameters = dict(
        org_id=["org_1","org_2","org_3"],
        start_date="2022-01-17 +0000",
        end_date="2022-01-18 +0000"
    )
)
Is this a right format? If yes, I guess problem may happen in my dbt end, just wanna make sure I use Prefect Dask Executor in a right way
k
This seems right. Do you get an error?
c
no error message, all success. I think it works now! I’ wondering if we wanna use DaskExecutor which is not the local one is there something I need to configure? and how could I do that? Just wanna make sure I’m fully aware of that in case I bought some extra costs or security issue to our company.. Also, we use terraform, aws, and snowflake, not sure if that may relate to the answer of my question
k
DaskExecutor will be a local cluster (similar to LocalDask) if you don’t have a cluster to connect to
1
We won’t spin up a temporary cluster unless you configure it
c
sure! Also, when I set
executor=LocalDaskExecutor(scheduler="threads", num_workers=8)
for my flow, it will only run for one org_id, I dont know why. Even though the log says those two orgs have been ran. if I did not specify that, all orgs will be ran proplerly. Is there something I miss? how can I fix that. Thanks!!
k
This is complicated but I think your task is not thread safe. This is a good read on that
c
1. alright, I guess in my case dbt tasks (we use snowflake for storage) is also the same? Since I think somehow the second task to be ran overwrites the data in the first task’s output,. The logs show both tasks are successfully executed with the right time estimation, but I can only see the data from the last task’s output, I think that’s the reason? 2. Also, if that is not thread safe, can I use “process” instead of ‘thread’ for dask setting?
k
i think so with process yes because it makes a copy
🙌 1
c
Thanks!! When I tried processes I got this, it seems like not an error but a warning or something, it did not interrupt my flow, will this impact me?
Copy code
An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if _name_ == '_main_':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
k
This is a Dask thing. You can avoid with:
Copy code
if __name__ == "__main__":
    flow.run()
c
Thank you so much! A sad thing is when I used “processes”, it still not works. Same as “thread”, output will be overwritten by the last task.. 😭 In this case, how could parallel be achieved
k
Can you show me the actual dbt_run definition?
c
Copy code
@task(name="dbt run")
def dbt_run(org_id, start_date, end_date): 
    command = (
            f"dbt run --vars '{{org_id: {org_id},"
            f" start_date: {start_date},"
            f" end_date: {end_date}}}'"
            f" --select path/to/my/model"
        )

    return subprocess.run( 
        command,
        check=True,
        stderr=True,
        stdout=True,
        shell=True, 
        cwd="myDbtProject/",
    )
we use subprocess instead of ShellTask/DBTshellTask since some colleagues using Windows have trouble with these two package, I think problem is not coming from there
k
This should be thread safe I think on the Python side. I think there might be something with dbt that prevents parallelization? Not sure though. This looks right
Could you explain a bit more the behavior you see when trying to parallelize?
c
Sure! For example, if I implement dask, the dbt logs are very normal, say if we have two org_ids, which will form up two dbt_run flow, the last completed will overwrite the data in the snowflake db. If i did not implement dask, there will be no overwritten problem, all data will be there
k
Ah I’m not sure this can be parallelized then because it looks like stuff is being overwritten?
c
Yes, I’m also not sure why this cannot be parallelized. do you happen to know why
k
I think the issue is the writing side. The later stuff is overwriting the earlier stuff?
c
yes, The later stuff is overwriting the earlier stuff
and if I inspect the database during flow run, I can see at first, task 1 ’s data is there being produced, and when everything finsished, it will only left the later stuff’s data (whcih means previous data being overwrited)
k
But it works when run sequentially?
c
yes, exactly
I have four tables (say A, B, C, D) needed to be inserted data, when parallel, you will find during the flow, A may have org_1's data, B may have org2's data, but when everything is done, every table’s data is the latest org that been ran
k
I am honestly not sure if it breaks on the dbt side or snowflake side, but the task definition looks correct at least for parallelization
🙏 1
❤️ 1
c
sure, really appreciated for your help Kevin!! I feel really warmed for your endless help and answers!!! I’ll let you know if I know the reasons or it works
Hi Kevin,The problem (data has been overwritten) is because each time I built the table from scratch using DBT commands, dbt will take it as a full-refresh mode (even if I did not specify this) because there is no existing table, that means Prefect will create two parallel flows which execute its own dbt jobs in a full-refresh mode, so data overwritten happen.