https://prefect.io logo
Title
e

Edmondo Porcu

08/11/2022, 11:58 PM
Hi, I am using Prefect 1.0 and I tried to create two tasks that share a lot of common code, and at a certain moment they invoke a function which uses the task library to create an instance on of a task and runs it. However, this function is not annotated via @task, while the two invoking functions they are. I observed a weird behavior where only one of the two tasks was run. Should mytask.run() only be invoked within fuction annotated with @task ?
1
m

Matt Conger

08/12/2022, 12:19 AM
Hi Edmondo, could you expand a little bit on a basic structure of your code please, to help get a little bit better understanding of the format.
e

Edmondo Porcu

08/12/2022, 1:10 AM
Sure @Matt Conger
a

Anna Geller

08/12/2022, 1:47 PM
you don't need to call the run method, Prefect calls it automatically for you
moving with Flow() out of the function to the top level may help to clarify the dependencies
also if you're just getting started, I recommend switching to the default Prefect version which is now Prefect 2.0
docs.prefect.io gives more info about this new product version
e

Edmondo Porcu

08/12/2022, 2:34 PM
I need to call run to pass the secret, I cannot pass the databricks connection secret upon constructor... I think
A month ago the tasks were not migrated yet to 2.0, so that's why we started with 1.0
a

Anna Geller

08/12/2022, 2:46 PM
Was lack of Databricks task in 2.0 collections really the only reason you started with 1.0?
e

Edmondo Porcu

08/12/2022, 4:19 PM
Yes
this is not going to help us, we need to submit databricks jobs
we would need to refactor
a

Anna Geller

08/12/2022, 5:13 PM
I let our Integrations team know that this is important for you, we'll check when we can start working on the Databricks collection in 2.0
e

Edmondo Porcu

08/12/2022, 7:41 PM
Thank you!
I am still struggling, looks like only one task is run 😞
a

Anna Geller

08/12/2022, 10:43 PM
What's your current flow code, can you share? Can you in the worst case draw how your flow is supposed to look like? We can then see how to set dependencies to get there
e

Edmondo Porcu

08/12/2022, 11:29 PM
def build_flow():
    with Flow('my-flow') as flow:
        spark_version = Parameter('spark_version')
        project_name = Parameter('project_name')
        github_repo = Parameter('github_repo')
        git_ref = Parameter('git_ref')
        databricks_connstring = {
            'host': os.environ['DATABRICKS_HOST'],
            'token': os.environ['DATABRICKS_TOKEN'],
        }
        cluster_id = '0602-201750-t8xnk4qp'
        git_source = create_jobs_git_source(
            github_repo=github_repo,
            git_ref=git_ref
        )
        setup_task = setup_database(
            git_source=git_source,
            databricks_connstring=databricks_connstring,
            git_ref=git_ref,
            cluster_id=cluster_id,
            is_test=True,
        )
        ingestion_task = ingestion(
            job_name=project_name,
            spark_version=spark_version,
            git_source=git_source,
            git_ref=git_ref,
            databricks_connstring=databricks_connstring,
            cluster_id=cluster_id,
            is_test=True,
            upstream_tasks=[setup_task]
        )
        return flow
a

Anna Geller

08/13/2022, 11:46 AM
Looks good, again moving this out of this function and visualizing may help you troubleshoot
e

Edmondo Porcu

08/13/2022, 3:19 PM
Do you mean create the flow in the main of the script? code in the the function "build flow" should be in the "main" part of the script?
This is the dag, phenotype ingestion is not submitted, in reality. Only setup database is submitted
😞
a

Anna Geller

08/13/2022, 6:40 PM
yes, I meant using it the same way as in the docs
do you want to build a dummy example I could reproduce that would show what you have and draw what would you like to see? otherwise, maybe look at examples here https://discourse.prefect.io/tag/state-dependencies
e

Edmondo Porcu

08/13/2022, 6:43 PM
Thank you! Will have a look
🙌 1
This is what I am doing... but what I see in the log is the following
[2022-08-13 09:05:06-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 09:05:07-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
## cropped lines here , waiting for the task to succeed
INFO:prefect.TaskRunner:Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 09:09:40-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
INFO:prefect.TaskRunner:Task 'phenotype ingestion': Starting task run...
[2022-08-13 09:09:40-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
It's like the second time the DatabricksSubmitMultiTaskRun is executed, it doesn't submit a new job. The run_id should be different! But I am creating two instances of the task, so I don't understand why.
@task(name='setup database')
def setup_database(
    *,
    git_source:GitSource,
    databricks_connstring:Dict[str,Any],
    git_ref:str,
    cluster_id:str,
    is_test:bool):
    tasks = setup.build_tasks(
        cluster_id=cluster_id,
        git_ref=git_ref,
        is_test=is_test,
    )
    databricks_job = DatabricksSubmitMultitaskRun(
        run_name="Setup",
        tasks=tasks,
    )
    databricks_job.run(
        databricks_conn_secret=databricks_connstring,
        git_source=git_source
    )
@task(name='phenotype ingestion')
def phenotype_ingestion(
    *,
    job_name:str,
    spark_version:str,
    git_source:GitSource,
    git_ref:str,
    databricks_connstring:str,
    cluster_id:str,
    is_test:bool
):
    tasks = build_pheno_tasks(git_ref, is_test)
    job_name = f'{job_name} - Tests - {spark_version} - {git_ref}'
    databricks_job = DatabricksSubmitMultitaskRun(
        run_name=f'{job_name} - Tests - {spark_version} - {git_ref}',
        tasks=tasks,
    )
    databricks_job.run(
        databricks_conn_secret=databricks_connstring,
        git_source=git_source
    )
It looks like invoking run from within the task function can cause wrong behavior? let me try to return the kob
If I remove the run call, the code succeed without submitting any real job to databricks.
[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'phenotype-homogenization'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Do I need to do something specific when invoking a Task from the task library within a function annotated with @task ?
a

Anna Geller

08/13/2022, 9:14 PM
the way you should call tasks from the task library is: you initialize at module scope and invoke initialize tasks within the Flow. You shouldn't call the task from another task, tasks should be called from a Flow
from prefect import task, Flow
from prefect.tasks.databricks import DatabricksSubmitMultitaskRun

brick = DatabricksSubmitMultitaskRun(your_init_args_go_here)

with Flow('databricks') as flow:
    brick(your_run_args_go_here)
e

Edmondo Porcu

08/14/2022, 3:38 PM
I see, however if brick is defined in a task function?
@task
def setup_brick(init_arg_1,init_arg_2,init_arg3):
    return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2,init_arg_3)

@task
def ingestion_brick(init_arg_1,init_arg2):
    return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2)

def create_flow():
    with Flow('flow-1') as flow:
          brick_1 = setup_brick(os.environ['P1'], os.environ['P2'], os.environ['P3'])
          brick_2 = ingestion_brick(os.environ['P3'], os.environ['P4'], upstream_tasks=[brick_1])
          ?
a

Anna Geller

08/14/2022, 4:48 PM
You shouldn't do it, but if you have to you would need to do it this way: DatabricksSubmitMultitaskrun.run(...)
e

Edmondo Porcu

08/14/2022, 6:10 PM
Do you mean,
DatabricksSubmitMultitaskRun.run(brick_1)
? Without the run, the two brick_1, brick_2 databricks tasks are not run. I mean, they are run as a function task, but no job is submitted to databricks.
If I invoke explicitly brick_1.run() instead, I get an error:
: brick_1() missing 3 required keyword-only arguments: init_arg_1, init_arg_2, init_arg_3
a

Anna Geller

08/14/2022, 7:23 PM
as mentioned before, the syntax is:
DatabricksSubmitMultitaskRun(init_args).run(run_method_args)
e

Edmondo Porcu

08/14/2022, 8:03 PM
The error does not mention missing args for the DatabricksSubmitMultitaskRun task, but for the function task that is create by the task annotation...
This is a fully reproducible example
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect import Flow, Parameter, task


@task
def task_1_builder(arg_1):
    return ShellTask(command=f"echo {arg_1}",  stream_output=True)

@task
def task_2_builder(arg_2):
    return ShellTask(command=f"echo {arg_2}",  stream_output=True)

def build_flow():
    with Flow("list_files") as flow:
        arg_1 = Parameter('arg_1')
        arg_2 = Parameter('arg_2')
        task_1 = task_1_builder(arg_1)
        task_2 = task_2_builder(arg_2, upstream_tasks=[task_1])
        return flow

if __name__ == '__main__':
    flow = build_flow()
    state = flow.run(
        arg_1="hello",
        arg_2="world"
    )
    assert state.is_successful(), 'One of the step in the pipeline failed'
I noticed that if I do change the builder function like so:
@task
def task_2_builder(arg_2):
    t = ShellTask(command=f"echo {arg_2}", stream_output=True)
    return t.run()
this now works, but not on Databricks. It literally believes the two "Shell tasks" are the same tasks.
OOOH! I found it. It's a risky thing in the Databricks Task @Anna Geller, apologies !
🙌 1
Let me open a bug, the problem is here: https://github.com/PrefectHQ/prefect/blob/1.x/src/prefect/tasks/databricks/databricks_submitjob.py#L1066. By defaulting the idempotency_token to the flow id, it's impossible to submit two Databricks Jobs within the same flow unless you set the impotency token. This is the reason why I was struggling 😞. Are you still accepting fixes and contribution on Prefect 1.0?
💯 1
a

Anna Geller

08/14/2022, 9:45 PM
Oh wow, well done getting to the root cause of the issue! We absolutely accept PRs for 1.0 and we'll keep releasing fixes
All done @Anna Geller, apologies for the long conversation here. The behavior was really surprising
a

Anna Geller

08/19/2022, 1:53 AM
nice! thanks for linking! I see Alex even already reviewed it