Hi All, I could be going about this wrong but I th...
# prefect-community
Hi All, I could be going about this wrong but I think I want the capability of running a flow from a flow. My use case is similar to something what’s been discussed previously but where the final set of tasks in a flow are a re-usable flow. Eg.
X1 -> X2 -> A -> B -> C
Y1 -> A -> B -> C
. The idea is that
A -> B -> C
are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful. Does this make any sense or is there a more Prefect way to attack this?
Hi @Chris O'Brien, I think your use case may be solved by Mapping: https://docs.prefect.io/core/concepts/mapping.html#mapping
X1 -> X2 ->
would return a list that
A -> B -> C
can map over and tasks would only be created if upstream tasks succeed
Let me know if this solves your usecase! If not, I’d be interested to dive a little deeper into your flow-to-flow dependency
Thanks @Dylan I will give it a go and see how it goes
I don’t think that quite matches what I need Here is my flow where everything from
is a flow I want to create from the upstream flow AND that only executes once
is complete, if that helps with more context? Essentially I want to be able to use that flow in other ETL’s
Happy to share code excerpts if that helps?
Sort of hoping for something like this?
Copy code
def create_write_flow(dataframe, run_date, schema_name, table_name):
    with prefect.Flow('database_flow') as db_flow:
        backup = backup_to_staging(schema_name, table_name, run_date)
        delete = delete_from_warehouse(schema_name, table_name, run_date)
        insert = write_to_warehouse(dataframe, schema_name, table_name, ['subscription_state'], run_date)
        restore = restore_from_staging(schema_name, table_name, run_date)
        drop_backup = drop_staging(schema_name, table_name)
        cleanup = vacuum_and_analyze(schema_name, table_name)


    return db_flow
Haha #RedshiftLife
One thing that’s possible right now is for a flow to call the Prefect Cloud API using the python client
If the downstream flow is parameterized, this would work
However, there are a few kinks with this solution at the moment
1. every time you redeploy, a new version of a flow is created with a new ID
Meaning a redeploy of the downstream flow would necessitate a redeploy of the upstream flow
We hope to address this issue soon
Ah interesting
Also, parameters must be JSON-serializible
(hope I spelled that correctly)
Which would mean you wouldn’t be able to directly pass the DataFrame, I believe
But I’m not 100% sure on that
We’re also investigating flow-to-flow dependencies as a first class concept in Prefect Cloud
Which would solve the first problem, but not the second
Cool, yea, it would be really cool to generate a flow off the back of another flow based on inputs
Now, all that being said…..if you built your flows in a single docker container you could share those tasks across flows
So at least you’re sharing code in a DRY way and your flows don’t run into issues at deployment time
Which makes sense, we just have quite a few different ETLs which would be nice to have running in different containers to minimize the impact of deployments
Totally makes sense as well
Thanks for all the help though! Really love the product!
So pandas does have a way to export a DataFrame as JSON
Not sure how easy it is to convert it back, though
You’d be better off uploading it to cloud storage (GCS/S3) and passing a uri
👍 1
> every time you redeploy, a new version of a flow is created with a new ID Here's a simple utility function we wrote to get the latest named Flow that is not archived:
Copy code
from prefect import Client

def get_latest_non_archived_flow(flow_name):
    client = Client()
    flows = client.graphql(
             query {
                  flow(where: { _and: [ { name: { _eq: "%s" } }
                                        { archived: { _eq: false } } ] } 
                       order_by: { version: desc }) {
        % flow_name
    if flows:
        return flows.data.flow[0]
        return None