Hi, I am using Prefect 1.0 and I tried to create t...
# prefect-community
e
Hi, I am using Prefect 1.0 and I tried to create two tasks that share a lot of common code, and at a certain moment they invoke a function which uses the task library to create an instance on of a task and runs it. However, this function is not annotated via @task, while the two invoking functions they are. I observed a weird behavior where only one of the two tasks was run. Should mytask.run() only be invoked within fuction annotated with @task ?
1
m
Hi Edmondo, could you expand a little bit on a basic structure of your code please, to help get a little bit better understanding of the format.
e
Sure @Matt Conger
a
you don't need to call the run method, Prefect calls it automatically for you
moving with Flow() out of the function to the top level may help to clarify the dependencies
also if you're just getting started, I recommend switching to the default Prefect version which is now Prefect 2.0
docs.prefect.io gives more info about this new product version
e
I need to call run to pass the secret, I cannot pass the databricks connection secret upon constructor... I think
A month ago the tasks were not migrated yet to 2.0, so that's why we started with 1.0
a
Was lack of Databricks task in 2.0 collections really the only reason you started with 1.0?
e
Yes
this is not going to help us, we need to submit databricks jobs
we would need to refactor
a
I let our Integrations team know that this is important for you, we'll check when we can start working on the Databricks collection in 2.0
e
Thank you!
I am still struggling, looks like only one task is run 😞
a
What's your current flow code, can you share? Can you in the worst case draw how your flow is supposed to look like? We can then see how to set dependencies to get there
e
Copy code
def build_flow():
    with Flow('my-flow') as flow:
        spark_version = Parameter('spark_version')
        project_name = Parameter('project_name')
        github_repo = Parameter('github_repo')
        git_ref = Parameter('git_ref')
        databricks_connstring = {
            'host': os.environ['DATABRICKS_HOST'],
            'token': os.environ['DATABRICKS_TOKEN'],
        }
        cluster_id = '0602-201750-t8xnk4qp'
        git_source = create_jobs_git_source(
            github_repo=github_repo,
            git_ref=git_ref
        )
        setup_task = setup_database(
            git_source=git_source,
            databricks_connstring=databricks_connstring,
            git_ref=git_ref,
            cluster_id=cluster_id,
            is_test=True,
        )
        ingestion_task = ingestion(
            job_name=project_name,
            spark_version=spark_version,
            git_source=git_source,
            git_ref=git_ref,
            databricks_connstring=databricks_connstring,
            cluster_id=cluster_id,
            is_test=True,
            upstream_tasks=[setup_task]
        )
        return flow
a
Looks good, again moving this out of this function and visualizing may help you troubleshoot
e
Do you mean create the flow in the main of the script? code in the the function "build flow" should be in the "main" part of the script?
This is the dag, phenotype ingestion is not submitted, in reality. Only setup database is submitted
😞
a
yes, I meant using it the same way as in the docs
do you want to build a dummy example I could reproduce that would show what you have and draw what would you like to see? otherwise, maybe look at examples here https://discourse.prefect.io/tag/state-dependencies
e
Thank you! Will have a look
🙌 1
This is what I am doing... but what I see in the log is the following
Copy code
[2022-08-13 09:05:06-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 09:05:07-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
## cropped lines here , waiting for the task to succeed
INFO:prefect.TaskRunner:Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 09:09:40-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
INFO:prefect.TaskRunner:Task 'phenotype ingestion': Starting task run...
[2022-08-13 09:09:40-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
It's like the second time the DatabricksSubmitMultiTaskRun is executed, it doesn't submit a new job. The run_id should be different! But I am creating two instances of the task, so I don't understand why.
Copy code
@task(name='setup database')
def setup_database(
    *,
    git_source:GitSource,
    databricks_connstring:Dict[str,Any],
    git_ref:str,
    cluster_id:str,
    is_test:bool):
    tasks = setup.build_tasks(
        cluster_id=cluster_id,
        git_ref=git_ref,
        is_test=is_test,
    )
    databricks_job = DatabricksSubmitMultitaskRun(
        run_name="Setup",
        tasks=tasks,
    )
    databricks_job.run(
        databricks_conn_secret=databricks_connstring,
        git_source=git_source
    )
@task(name='phenotype ingestion')
def phenotype_ingestion(
    *,
    job_name:str,
    spark_version:str,
    git_source:GitSource,
    git_ref:str,
    databricks_connstring:str,
    cluster_id:str,
    is_test:bool
):
    tasks = build_pheno_tasks(git_ref, is_test)
    job_name = f'{job_name} - Tests - {spark_version} - {git_ref}'
    databricks_job = DatabricksSubmitMultitaskRun(
        run_name=f'{job_name} - Tests - {spark_version} - {git_ref}',
        tasks=tasks,
    )
    databricks_job.run(
        databricks_conn_secret=databricks_connstring,
        git_source=git_source
    )
It looks like invoking run from within the task function can cause wrong behavior? let me try to return the kob
If I remove the run call, the code succeed without submitting any real job to databricks.
Copy code
[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'phenotype-homogenization'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Do I need to do something specific when invoking a Task from the task library within a function annotated with @task ?
a
the way you should call tasks from the task library is: you initialize at module scope and invoke initialize tasks within the Flow. You shouldn't call the task from another task, tasks should be called from a Flow
Copy code
from prefect import task, Flow
from prefect.tasks.databricks import DatabricksSubmitMultitaskRun

brick = DatabricksSubmitMultitaskRun(your_init_args_go_here)

with Flow('databricks') as flow:
    brick(your_run_args_go_here)
e
I see, however if brick is defined in a task function?
Copy code
@task
def setup_brick(init_arg_1,init_arg_2,init_arg3):
    return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2,init_arg_3)

@task
def ingestion_brick(init_arg_1,init_arg2):
    return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2)

def create_flow():
    with Flow('flow-1') as flow:
          brick_1 = setup_brick(os.environ['P1'], os.environ['P2'], os.environ['P3'])
          brick_2 = ingestion_brick(os.environ['P3'], os.environ['P4'], upstream_tasks=[brick_1])
          ?
a
You shouldn't do it, but if you have to you would need to do it this way: DatabricksSubmitMultitaskrun.run(...)
e
Do you mean,
DatabricksSubmitMultitaskRun.run(brick_1)
? Without the run, the two brick_1, brick_2 databricks tasks are not run. I mean, they are run as a function task, but no job is submitted to databricks.
If I invoke explicitly brick_1.run() instead, I get an error:
Copy code
: brick_1() missing 3 required keyword-only arguments: init_arg_1, init_arg_2, init_arg_3
a
as mentioned before, the syntax is:
Copy code
DatabricksSubmitMultitaskRun(init_args).run(run_method_args)
e
The error does not mention missing args for the DatabricksSubmitMultitaskRun task, but for the function task that is create by the task annotation...
This is a fully reproducible example
Copy code
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect import Flow, Parameter, task


@task
def task_1_builder(arg_1):
    return ShellTask(command=f"echo {arg_1}",  stream_output=True)

@task
def task_2_builder(arg_2):
    return ShellTask(command=f"echo {arg_2}",  stream_output=True)

def build_flow():
    with Flow("list_files") as flow:
        arg_1 = Parameter('arg_1')
        arg_2 = Parameter('arg_2')
        task_1 = task_1_builder(arg_1)
        task_2 = task_2_builder(arg_2, upstream_tasks=[task_1])
        return flow

if __name__ == '__main__':
    flow = build_flow()
    state = flow.run(
        arg_1="hello",
        arg_2="world"
    )
    assert state.is_successful(), 'One of the step in the pipeline failed'
I noticed that if I do change the builder function like so:
Copy code
@task
def task_2_builder(arg_2):
    t = ShellTask(command=f"echo {arg_2}", stream_output=True)
    return t.run()
this now works, but not on Databricks. It literally believes the two "Shell tasks" are the same tasks.
OOOH! I found it. It's a risky thing in the Databricks Task @Anna Geller, apologies !
🙌 1
Let me open a bug, the problem is here: https://github.com/PrefectHQ/prefect/blob/1.x/src/prefect/tasks/databricks/databricks_submitjob.py#L1066. By defaulting the idempotency_token to the flow id, it's impossible to submit two Databricks Jobs within the same flow unless you set the impotency token. This is the reason why I was struggling 😞. Are you still accepting fixes and contribution on Prefect 1.0?
💯 1
a
Oh wow, well done getting to the root cause of the issue! We absolutely accept PRs for 1.0 and we'll keep releasing fixes
All done @Anna Geller, apologies for the long conversation here. The behavior was really surprising
a
nice! thanks for linking! I see Alex even already reviewed it