Hi All, I could be going about this wrong but I th...
# prefect-community
c
Hi All, I could be going about this wrong but I think I want the capability of running a flow from a flow. My use case is similar to something what’s been discussed previously but where the final set of tasks in a flow are a re-usable flow. Eg.
X1 -> X2 -> A -> B -> C
and
Y1 -> A -> B -> C
. The idea is that
A -> B -> C
are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful. Does this make any sense or is there a more Prefect way to attack this?
d
Hi @Chris O'Brien, I think your use case may be solved by Mapping: https://docs.prefect.io/core/concepts/mapping.html#mapping
X1 -> X2 ->
would return a list that
A -> B -> C
can map over and tasks would only be created if upstream tasks succeed
Let me know if this solves your usecase! If not, I’d be interested to dive a little deeper into your flow-to-flow dependency
c
Thanks @Dylan I will give it a go and see how it goes
I don’t think that quite matches what I need Here is my flow where everything from
backup_to_staging
is a flow I want to create from the upstream flow AND that only executes once
join_subscription_table
is complete, if that helps with more context? Essentially I want to be able to use that flow in other ETL’s
Happy to share code excerpts if that helps?
Sort of hoping for something like this?
Copy code
def create_write_flow(dataframe, run_date, schema_name, table_name):
    with prefect.Flow('database_flow') as db_flow:
        backup = backup_to_staging(schema_name, table_name, run_date)
        delete = delete_from_warehouse(schema_name, table_name, run_date)
        insert = write_to_warehouse(dataframe, schema_name, table_name, ['subscription_state'], run_date)
        restore = restore_from_staging(schema_name, table_name, run_date)
        drop_backup = drop_staging(schema_name, table_name)
        cleanup = vacuum_and_analyze(schema_name, table_name)
        
        backup.set_upstream(dataframe)
        delete.set_upstream(backup)
        insert.set_upstream(delete)

        restore.set_upstream(insert)
        restore.set_downstream(cleanup)

        drop_backup.set_upstream(insert)
        drop_backup.set_downstream(cleanup)
    return db_flow
d
🧐
Interesting!
c
Haha #RedshiftLife
d
One thing that’s possible right now is for a flow to call the Prefect Cloud API using the python client
If the downstream flow is parameterized, this would work
However, there are a few kinks with this solution at the moment
1. every time you redeploy, a new version of a flow is created with a new ID
Meaning a redeploy of the downstream flow would necessitate a redeploy of the upstream flow
We hope to address this issue soon
c
Ah interesting
d
Also, parameters must be JSON-serializible
(hope I spelled that correctly)
Which would mean you wouldn’t be able to directly pass the DataFrame, I believe
But I’m not 100% sure on that
We’re also investigating flow-to-flow dependencies as a first class concept in Prefect Cloud
Which would solve the first problem, but not the second
c
Cool, yea, it would be really cool to generate a flow off the back of another flow based on inputs
d
Now, all that being said…..if you built your flows in a single docker container you could share those tasks across flows
So at least you’re sharing code in a DRY way and your flows don’t run into issues at deployment time
c
Which makes sense, we just have quite a few different ETLs which would be nice to have running in different containers to minimize the impact of deployments
d
Totally makes sense as well
c
Thanks for all the help though! Really love the product!
So pandas does have a way to export a DataFrame as JSON
Not sure how easy it is to convert it back, though
You’d be better off uploading it to cloud storage (GCS/S3) and passing a uri
👍 1
j
> every time you redeploy, a new version of a flow is created with a new ID Here's a simple utility function we wrote to get the latest named Flow that is not archived:
Copy code
from prefect import Client

def get_latest_non_archived_flow(flow_name):
    client = Client()
    flows = client.graphql(
        """
             query {
                  flow(where: { _and: [ { name: { _eq: "%s" } }
                                        { archived: { _eq: false } } ] } 
                       order_by: { version: desc }) {
                    id
                    name
                    archived
                    version
                  }
                }
                """
        % flow_name
    )
    if flows:
        return flows.data.flow[0]
    else:
        return None