Hey :slightly_smiling_face: I tried to implement a...
# prefect-community
j
Hey 🙂 I tried to implement a subflow in my main flow (like this) :
Copy code
@task
def subtask():
   return 1

with Flow("subflow") as subflow:
   subtask()

@task
def main_task():
   subflow.run()

with Flow("main_flow") as main_flow:
   main_task()
But I faced this Error : _Unexpected error while running flow: KeyError('Task slug init_dirs-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')_ I precise that I had already register the _main_flow_ (without the subflow inside) and it worked, but then I updated it and I registered the subflow, and it failed, could you help me please ? 🙏 The task "_init_dirs_" is in the main_flow, finishes in a success state, but this error append when subflow.run() is called (and init_dirs is not in the subflow)
a
There is no true concept of subflows in Prefect 1.0, but in Orion you can do something very similar to what you described. In Prefect 1.0 there is orchestrator pattern using
create_flow_run
task triggering child flows via API calls. So you would have to replace the
subflow.run()
by a create_flow_run task To find out more, check out those Discourse topics: • https://discourse.prefect.io/t/how-can-i-create-a-subflow/88https://discourse.prefect.io/t/how-can-i-create-a-subflow-and-block-until-it-s-completed/94
👍 1
j
Hi @Anna Geller, I tried, but it seems impossible to call 'create_flow_run' inside a task, I have the following error : ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a
with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call
create_flow_run.run(...)
Also tried with .run() but it didn't work (same error), help please 🙏
a
That's true, you shouldn't be calling this task within another task. Can you explain the problem a bit more? Why are you trying to call tasks within other tasks?
Also, can you share your flow(s)?
j
Indeed, because I need the result of the previous task to iterate in a loop
I can share, I'll try to simplify them
Copy code
with Flow("main flow") as main_flow:
    root_dir = init_dirs("sec", archive=False)
    download_dir = download(root_dir, False)
    metadatas = metadata(download_dir)
    index(metadatas) # Error


with Flow("index_flow") as index_flow:
    # Imports
    metadatas = Parameter("metadatas_batch", required=True)
    imports = build_import(metadatas)
    index_weaviate(imports)


@task()
def index(metadatas: List[dict]) -> None:
    metadata_batches = list(batch(metadatas, BATCH_SIZE_INDEX))
    for i, metadata_batch in enumerate(metadata_batches):
        print(f"Process batch {i+1}/{len(metadata_batches)} ...")
        flow_id = create_flow_run(flow_name="index_flow", project_name="Document Pipeline", parameters={"metadatas_batch":metadata_batch}) # Error here
        wait_for_flow_run(flow_id, raise_final_state=True, stream_logs=True)
        print(f"Done !")

    print('Success: All the indexations are done !')
And I have the tasks :
Copy code
@task()
def build_import(metadatas: List[dict]):
    return weaviate.build_import(metadatas)


@task()
def index_weaviate(imports: dict) -> None:
    client = get_client()
    weaviate.index_multi_documents(client, imports)
My issue is that I need
metadatas
result to iterate, thus I think it's need to be inside a task, isn't ?
a
instead of using a for loop, you can leverage mapping and call the create_flow_run and wait_for_flow_run as shown here:
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )
so you need to wrap this into a task and return the list of parameter values, ideally convert this task to return a list of dictionaries `{"metadatas_batch":metadata_batch}`:
Copy code
@task
def get_parameter_values():
    return list(batch(metadatas, BATCH_SIZE_INDEX))
and then:
Copy code
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    params = get_parameter_values()
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("index_flow"),
        project_name=unmapped("Document Pipeline")
        parameters=params,
    )
additional benefit is that you'll get parallelism while the for loop would trigger those flow runs sequentially
j
Thank you 🙂 I did smth like this but I still have an issue (KeyError[ 0]), where did I make a mistake ?
Copy code
with Flow("sec daily") as sec_daily_flow:
    root_dir = init_dirs("sec", archive=False)
    download_dir = download(root_dir, False)
    metadatas = metadata(download_dir)
    upload(metadatas)
    metadatas_batches = split_batch(metadatas)
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("index_flow"),
        project_name=unmapped("Document Pipeline"),
        parameters={"metadatas_batch": metadatas_batches},
        upstream_tasks=[unmapped(metadatas_batches)]
    )

@task(state_handlers=[fail_handler], log_stdout=True)
def split_batch(metadatas: List[dict]) -> List[List[dict]]:
    return list(batch(metadatas, BATCH_SIZE_INDEX))
a
convert this task to return a list of dictionaries in the format `{"metadatas_batch":metadata_batch}`:
Copy code
@task
def get_parameter_values():
    return list(batch(metadatas, BATCH_SIZE_INDEX))
j
Mhh I see
a
Copy code
metadatas_batches
should be a list of dict, then you can pass it to the parameters with no modification. Additionally, you don’t need to set upstream task dependency since metadatas_batches is already passed as data dependency to the parameters.
Copy code
with Flow("sec daily") as sec_daily_flow:
    root_dir = init_dirs("sec", archive=False)
    download_dir = download(root_dir, False)
    metadatas = metadata(download_dir)
    upload(metadatas)
    metadatas_batches = split_batch(metadatas)
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=unmapped("index_flow"),
        project_name=unmapped("Document Pipeline"),
        parameters=metadata_batches,
        
    )
j
It works !! Thank you so much for your time 🙏🙏🙏
👍 1
@Anna Geller Small question: I have the following error :
Copy code
Task 'create_flow_run[0]': Exception encountered during task execution!
requests.exceptions.HTTPError: 413 Client Error: Request Entity Too Large for url: <https://api.prefect.io/>
Is it because of the metadata dict ?
a
Yes, it can be that your Parameter value payload is too large to be stored in the backend. This page provides more info about payload size limits
j
Oh 😕 Thus this solution is not working in my case 😔
a
Are you sure there is no other way to e.g. retrieve this data within the respective child flow rather than passing the entire large payload via parameter? How big is this Parameter value?
j
~50Mb for 1 metadata_batch
I could retrieve the result stored in Cloud Storage
And pass the path as a parameter instead of the data itself
a
you mean 50 MB? This is too large, yes. Exactly, this is a great idea, your parameter may point to Cloud Storage location of the data, but shouldn't be passing large data objects itself.
1
j
Is it possible to get the location of the Task result inside the Flow itself ?
Do you have an example ? Because I'm not sure how to use it 😕 I use GCSResult to store my tasks results :
Copy code
with Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
a
I think for your use case it would be much easier to just return the destination GCS path using something like this in your task (i.e. uploading and returning the relevant path yourself):
Copy code
from google.cloud import storage as gcs
from google.oauth2 import service_account
from prefect.client.secrets import Secret

creds = Secret("GCP_CREDENTIALS").get()
PROJECT_NAME = "your gcp project name"
BUCKET_NAME = "your gcs bucket"
destination_file_name = "path/on/gcs/file.txt"
source_file_name = "local/path/file.txt"
credentials = service_account.Credentials.from_service_account_info(creds)
gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
bucket = gcs_client.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name=destination_file_name)
blob.upload_from_filename(source_file_name)
1