Michael Law
08/29/2022, 3:54 PMJosé Duarte
08/29/2022, 4:03 PMMichael Law
08/29/2022, 4:10 PMJason Thomas
08/29/2022, 5:43 PMv2
and just looked a little at v1
, so I might be misunderstanding.
It sounds like in general you want control over having certain tasks/flows run sequentially instead of concurrently, is that right? I believe the way v2
determines dependencies is through input arguments. So if you have this:
@task
def first():
# do stuff
# do more stuff
@task
def second():
# query data from `first`
# do even more stuff
@flow
def _flow():
first()
second()
Prefect won’t know that second
depends on first
, and will try to run them concurrently. The fix is:
@task
def first():
# do stuff
# do more stuff
return data
@task
def second(data):
# do even more stuff
@flow
def _flow():
data = first()
second(data)
If the dependency is not that simple/straightforward I think the recommended way would be to use .submit()
to get a PrefectFuture
, then .wait()
to await it. But there’s a potential problem with that:
@task
def first():
# do stuff
# do more stuff
@task
def second():
# query data from first_task
# do even more stuff
@task
def third():
# do unrelated stuff
@flow
def _flow():
future = first.submit() # returns PrefectFuture
future.wait() # holds up execution waiting for `first to return`
second() # runs after `first`
third() # runs after `first`
I think everything will wait for first
in this case, so if you have another task third
that is independent of the other two it will still be held up.
I would use this workaround:
@task
def first():
# do stuff
# do more stuff
return True
@task
def second(flag):
# query data from first_task
# do even more stuff
@task
def third():
# do unrelated stuff
@flow
def _flow():
flag = first() # runs before `second`
second(flag) # waits for `first`
third() # does not wait, can run concurrently
This will work for flows
and tasks
, and of course you can create a dependency chain:
@task
def first():
# do stuff
# do more stuff
return True
@task
def second(flag):
# query data from first_task
# do even more stuff
return True
@task
def third():
# do unrelated stuff
@flow
def subflow(flag1):
# call tasks
# do stuff
@task
def fourth(flag1):
# more stuff
@flow
def _flow():
flag = first() # runs before `second`
flag1 = second(flag) # waits for `first`
third() # does not wait, can run concurrently
subflow(flag1) # waits for `second`
fourth(flag1) # waits for `second`, concurrent with `subflow`
Michael Law
08/30/2022, 6:57 AMdef run_table_dependencies(table_name, table_dependencies, finished_tables_set):
if table_name in finished_tables_set.keys():
return None
run_table = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection,
json=runner.job_json(object_py_files[table_name], parameters=[]),
name=table_name,
max_retries=3,
retry_delay=timedelta(seconds=10)
)
if table_dependencies:
for parent in table_dependencies:
if parent in finished_tables_set:
run_table.set_upstream(finished_tables_set[parent])
finished_tables[table_name] = run_table