Edmondo Porcu
08/11/2022, 11:58 PMMatt Conger
08/12/2022, 12:19 AMEdmondo Porcu
08/12/2022, 1:10 AMEdmondo Porcu
08/12/2022, 1:11 AMAnna Geller
Anna Geller
Anna Geller
Anna Geller
Edmondo Porcu
08/12/2022, 2:34 PMEdmondo Porcu
08/12/2022, 2:35 PMAnna Geller
Anna Geller
Edmondo Porcu
08/12/2022, 4:19 PMEdmondo Porcu
08/12/2022, 4:19 PMEdmondo Porcu
08/12/2022, 4:20 PMAnna Geller
Edmondo Porcu
08/12/2022, 7:41 PMEdmondo Porcu
08/12/2022, 7:41 PMAnna Geller
Edmondo Porcu
08/12/2022, 11:29 PMdef build_flow():
with Flow('my-flow') as flow:
spark_version = Parameter('spark_version')
project_name = Parameter('project_name')
github_repo = Parameter('github_repo')
git_ref = Parameter('git_ref')
databricks_connstring = {
'host': os.environ['DATABRICKS_HOST'],
'token': os.environ['DATABRICKS_TOKEN'],
}
cluster_id = '0602-201750-t8xnk4qp'
git_source = create_jobs_git_source(
github_repo=github_repo,
git_ref=git_ref
)
setup_task = setup_database(
git_source=git_source,
databricks_connstring=databricks_connstring,
git_ref=git_ref,
cluster_id=cluster_id,
is_test=True,
)
ingestion_task = ingestion(
job_name=project_name,
spark_version=spark_version,
git_source=git_source,
git_ref=git_ref,
databricks_connstring=databricks_connstring,
cluster_id=cluster_id,
is_test=True,
upstream_tasks=[setup_task]
)
return flow
Anna Geller
Edmondo Porcu
08/13/2022, 3:19 PMEdmondo Porcu
08/13/2022, 4:03 PMEdmondo Porcu
08/13/2022, 4:03 PMEdmondo Porcu
08/13/2022, 4:03 PMAnna Geller
Anna Geller
Anna Geller
Edmondo Porcu
08/13/2022, 6:43 PMEdmondo Porcu
08/13/2022, 7:46 PM[2022-08-13 09:05:06-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 09:05:07-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
## cropped lines here , waiting for the task to succeed
INFO:prefect.TaskRunner:Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 09:09:40-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
INFO:prefect.TaskRunner:Task 'phenotype ingestion': Starting task run...
[2022-08-13 09:09:40-0700] INFO - prefect.DatabricksSubmitMultitaskRun | Run submitted with run_id: 1811743
INFO:prefect.DatabricksSubmitMultitaskRun:Run submitted with run_id: 1811743
It's like the second time the DatabricksSubmitMultiTaskRun is executed, it doesn't submit a new job. The run_id should be different!
But I am creating two instances of the task, so I don't understand why.
@task(name='setup database')
def setup_database(
*,
git_source:GitSource,
databricks_connstring:Dict[str,Any],
git_ref:str,
cluster_id:str,
is_test:bool):
tasks = setup.build_tasks(
cluster_id=cluster_id,
git_ref=git_ref,
is_test=is_test,
)
databricks_job = DatabricksSubmitMultitaskRun(
run_name="Setup",
tasks=tasks,
)
databricks_job.run(
databricks_conn_secret=databricks_connstring,
git_source=git_source
)
@task(name='phenotype ingestion')
def phenotype_ingestion(
*,
job_name:str,
spark_version:str,
git_source:GitSource,
git_ref:str,
databricks_connstring:str,
cluster_id:str,
is_test:bool
):
tasks = build_pheno_tasks(git_ref, is_test)
job_name = f'{job_name} - Tests - {spark_version} - {git_ref}'
databricks_job = DatabricksSubmitMultitaskRun(
run_name=f'{job_name} - Tests - {spark_version} - {git_ref}',
tasks=tasks,
)
databricks_job.run(
databricks_conn_secret=databricks_connstring,
git_source=git_source
)
Edmondo Porcu
08/13/2022, 7:47 PMEdmondo Porcu
08/13/2022, 7:50 PM[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'phenotype-homogenization'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'git_ref': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'project_name': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'github_repo': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'spark_version': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'create_jobs_git_source': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'setup database': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Starting task run...
[2022-08-13 12:49:47-0700] INFO - prefect.TaskRunner | Task 'phenotype ingestion': Finished task run for task with final state: 'Success'
[2022-08-13 12:49:47-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Edmondo Porcu
08/13/2022, 7:59 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.databricks import DatabricksSubmitMultitaskRun
brick = DatabricksSubmitMultitaskRun(your_init_args_go_here)
with Flow('databricks') as flow:
brick(your_run_args_go_here)
Edmondo Porcu
08/14/2022, 3:38 PM@task
def setup_brick(init_arg_1,init_arg_2,init_arg3):
return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2,init_arg_3)
@task
def ingestion_brick(init_arg_1,init_arg2):
return DatabricksSubmitMultitaskRun(init_arg_1,init_arg_2)
def create_flow():
with Flow('flow-1') as flow:
brick_1 = setup_brick(os.environ['P1'], os.environ['P2'], os.environ['P3'])
brick_2 = ingestion_brick(os.environ['P3'], os.environ['P4'], upstream_tasks=[brick_1])
?
Anna Geller
Edmondo Porcu
08/14/2022, 6:10 PMDatabricksSubmitMultitaskRun.run(brick_1)
? Without the run, the two brick_1, brick_2 databricks tasks are not run. I mean, they are run as a function task, but no job is submitted to databricks.Edmondo Porcu
08/14/2022, 6:14 PM: brick_1() missing 3 required keyword-only arguments: init_arg_1, init_arg_2, init_arg_3
Anna Geller
DatabricksSubmitMultitaskRun(init_args).run(run_method_args)
Edmondo Porcu
08/14/2022, 8:03 PMEdmondo Porcu
08/14/2022, 8:26 PMfrom prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect import Flow, Parameter, task
@task
def task_1_builder(arg_1):
return ShellTask(command=f"echo {arg_1}", stream_output=True)
@task
def task_2_builder(arg_2):
return ShellTask(command=f"echo {arg_2}", stream_output=True)
def build_flow():
with Flow("list_files") as flow:
arg_1 = Parameter('arg_1')
arg_2 = Parameter('arg_2')
task_1 = task_1_builder(arg_1)
task_2 = task_2_builder(arg_2, upstream_tasks=[task_1])
return flow
if __name__ == '__main__':
flow = build_flow()
state = flow.run(
arg_1="hello",
arg_2="world"
)
assert state.is_successful(), 'One of the step in the pipeline failed'
Edmondo Porcu
08/14/2022, 8:34 PM@task
def task_2_builder(arg_2):
t = ShellTask(command=f"echo {arg_2}", stream_output=True)
return t.run()
this now works, but not on Databricks. It literally believes the two "Shell tasks" are the same tasks.Edmondo Porcu
08/14/2022, 8:36 PMEdmondo Porcu
08/14/2022, 8:40 PMAnna Geller
Edmondo Porcu
08/19/2022, 12:35 AMEdmondo Porcu
08/19/2022, 12:36 AMAnna Geller
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by