https://prefect.io logo
Title
c

Chris O'Brien

11/18/2019, 11:12 PM
Hi All, I could be going about this wrong but I think I want the capability of running a flow from a flow. My use case is similar to something what’s been discussed previously but where the final set of tasks in a flow are a re-usable flow. Eg.
X1 -> X2 -> A -> B -> C
and
Y1 -> A -> B -> C
. The idea is that
A -> B -> C
are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful. Does this make any sense or is there a more Prefect way to attack this?
d

Dylan

11/19/2019, 12:38 AM
Hi @Chris O'Brien, I think your use case may be solved by Mapping: https://docs.prefect.io/core/concepts/mapping.html#mapping
X1 -> X2 ->
would return a list that
A -> B -> C
can map over and tasks would only be created if upstream tasks succeed
Let me know if this solves your usecase! If not, I’d be interested to dive a little deeper into your flow-to-flow dependency
c

Chris O'Brien

11/19/2019, 1:00 AM
Thanks @Dylan I will give it a go and see how it goes
I don’t think that quite matches what I need Here is my flow where everything from
backup_to_staging
is a flow I want to create from the upstream flow AND that only executes once
join_subscription_table
is complete, if that helps with more context? Essentially I want to be able to use that flow in other ETL’s
Happy to share code excerpts if that helps?
Sort of hoping for something like this?
def create_write_flow(dataframe, run_date, schema_name, table_name):
    with prefect.Flow('database_flow') as db_flow:
        backup = backup_to_staging(schema_name, table_name, run_date)
        delete = delete_from_warehouse(schema_name, table_name, run_date)
        insert = write_to_warehouse(dataframe, schema_name, table_name, ['subscription_state'], run_date)
        restore = restore_from_staging(schema_name, table_name, run_date)
        drop_backup = drop_staging(schema_name, table_name)
        cleanup = vacuum_and_analyze(schema_name, table_name)
        
        backup.set_upstream(dataframe)
        delete.set_upstream(backup)
        insert.set_upstream(delete)

        restore.set_upstream(insert)
        restore.set_downstream(cleanup)

        drop_backup.set_upstream(insert)
        drop_backup.set_downstream(cleanup)
    return db_flow
d

Dylan

11/19/2019, 2:03 AM
🧐
Interesting!
c

Chris O'Brien

11/19/2019, 2:06 AM
Haha #RedshiftLife
d

Dylan

11/19/2019, 2:06 AM
One thing that’s possible right now is for a flow to call the Prefect Cloud API using the python client
If the downstream flow is parameterized, this would work
However, there are a few kinks with this solution at the moment
1. every time you redeploy, a new version of a flow is created with a new ID
Meaning a redeploy of the downstream flow would necessitate a redeploy of the upstream flow
We hope to address this issue soon
c

Chris O'Brien

11/19/2019, 2:08 AM
Ah interesting
d

Dylan

11/19/2019, 2:09 AM
Also, parameters must be JSON-serializible
(hope I spelled that correctly)
Which would mean you wouldn’t be able to directly pass the DataFrame, I believe
But I’m not 100% sure on that
We’re also investigating flow-to-flow dependencies as a first class concept in Prefect Cloud
Which would solve the first problem, but not the second
c

Chris O'Brien

11/19/2019, 2:12 AM
Cool, yea, it would be really cool to generate a flow off the back of another flow based on inputs
d

Dylan

11/19/2019, 2:12 AM
Now, all that being said…..if you built your flows in a single docker container you could share those tasks across flows
So at least you’re sharing code in a DRY way and your flows don’t run into issues at deployment time
c

Chris O'Brien

11/19/2019, 2:14 AM
Which makes sense, we just have quite a few different ETLs which would be nice to have running in different containers to minimize the impact of deployments
d

Dylan

11/19/2019, 2:14 AM
Totally makes sense as well
c

Chris O'Brien

11/19/2019, 2:15 AM
Thanks for all the help though! Really love the product!
So pandas does have a way to export a DataFrame as JSON
Not sure how easy it is to convert it back, though
You’d be better off uploading it to cloud storage (GCS/S3) and passing a uri
👍 1
j

Joe Schmid

11/19/2019, 1:29 PM
> every time you redeploy, a new version of a flow is created with a new ID Here's a simple utility function we wrote to get the latest named Flow that is not archived:
from prefect import Client

def get_latest_non_archived_flow(flow_name):
    client = Client()
    flows = client.graphql(
        """
             query {
                  flow(where: { _and: [ { name: { _eq: "%s" } }
                                        { archived: { _eq: false } } ] } 
                       order_by: { version: desc }) {
                    id
                    name
                    archived
                    version
                  }
                }
                """
        % flow_name
    )
    if flows:
        return flows.data.flow[0]
    else:
        return None