Hey folks, in Prefect 2.0, is there any way to use...
# prefect-community
m
Hey folks, in Prefect 2.0, is there any way to use a method like Prefect 1.0 'set_upstream' but on a subflow? The reason i ask is that there is a convenience method on the 'prefect-databricks' library that triggers a fire and monitor status until complete call, but this is a @flow instead of an @task, just wondering if it is possible to set many flows to be upstream of a flow?
j
Hey Michael, take a look into this discussion https://prefect-community.slack.com/archives/CL09KU1K7/p1660916071451349?thread_ts=1660903250.753849&cid=CL09KU1K7 It seems to be somewhat close to your request
m
Yeah, kinda. The issue I have is in the old world world 'DatabricksSubmitRun' was a task, but in this brave new world in the prefect-databricks repository it is now a flow, which is restricting my ability to manage concurrency.
Specifically the method in flows.py called jobs_run_submit_and_wait_for_completion which replaced DatabrickSubmitRun in the old world, do you know why it was changed from a task to a flow?
Scratching the surface of @task too, does it have a method like .set_upstream, from what I can see it seems to be removed altogether?
j
Hi Michael, apologies if I’m oversimplifying your question. Also, I started Prefect with
v2
and just looked a little at
v1
, so I might be misunderstanding. It sounds like in general you want control over having certain tasks/flows run sequentially instead of concurrently, is that right? I believe the way
v2
determines dependencies is through input arguments. So if you have this:
Copy code
@task 
def first():
	# do stuff
	# do more stuff

@task 
def second():
	# query data from `first`
	# do even more stuff


@flow
def _flow():
	first()
	second()
Prefect won’t know that
second
depends on
first
, and will try to run them concurrently. The fix is:
Copy code
@task 
def first():
	# do stuff
	# do more stuff
	return data

@task 
def second(data):
	# do even more stuff


@flow
def _flow():
	data = first()
	second(data)
If the dependency is not that simple/straightforward I think the recommended way would be to use
.submit()
to get a
PrefectFuture
, then
.wait()
to await it. But there’s a potential problem with that:
Copy code
@task 
def first():
	# do stuff
	# do more stuff

@task 
def second():
	# query data from first_task
	# do even more stuff

@task 
def third():
    # do unrelated stuff


@flow
def _flow():
	future = first.submit()  # returns PrefectFuture 
    future.wait()            # holds up execution waiting for `first to return`
	second()                 # runs after `first`
    third()                  # runs after `first`
I think everything will wait for
first
in this case, so if you have another task
third
that is independent of the other two it will still be held up. I would use this workaround:
Copy code
@task 
def first():
	# do stuff
	# do more stuff
    return True

@task 
def second(flag):
	# query data from first_task
	# do even more stuff

@task 
def third():
    # do unrelated stuff


@flow
def _flow():
	flag = first()  # runs before `second`
	second(flag)    # waits for `first`
    third()         # does not wait, can run concurrently
This will work for
flows
and
tasks
, and of course you can create a dependency chain:
Copy code
@task 
def first():
	# do stuff
	# do more stuff
    return True

@task 
def second(flag):
	# query data from first_task
	# do even more stuff
    return True

@task 
def third():
    # do unrelated stuff

@flow 
def subflow(flag1):
    # call tasks
    # do stuff

@task  
def fourth(flag1):
    # more stuff


@flow
def _flow():
	flag = first()        # runs before `second`
	flag1 = second(flag)  # waits for `first`
    third()               # does not wait, can run concurrently
    subflow(flag1)        # waits for `second`
    fourth(flag1)         # waits for `second`, concurrent with `subflow`
m
Thanks for your input @Jason Thomas. I think I understand that, and also the ability to await groups of task or flows when their configuration is understood up front. My issues is we build a DAG of tasks based on an input of JSON, this enables us to dynamically 'set_upstream' based on the JSON received. I don't see hw this is possible in the v2 world. I think I'm going to have give this some thought.
Copy code
def run_table_dependencies(table_name, table_dependencies, finished_tables_set):
    if table_name in finished_tables_set.keys():
        return None

    run_table = DatabricksSubmitRun(databricks_conn_secret=runner.databricks_connection,
                                    json=runner.job_json(object_py_files[table_name], parameters=[]),
                                    name=table_name,
                                    max_retries=3,
                                    retry_delay=timedelta(seconds=10)
                                    )

    if table_dependencies:
        for parent in table_dependencies:
            if parent in finished_tables_set:
                run_table.set_upstream(finished_tables_set[parent])

    finished_tables[table_name] = run_table
This is currently what we do to 'set_upstream' for tasks which have already been completed which builds our dependency graph