Jean-Baptiste Six
02/15/2022, 9:28 AM@task
def subtask():
return 1
with Flow("subflow") as subflow:
subtask()
@task
def main_task():
subflow.run()
with Flow("main_flow") as main_flow:
main_task()
But I faced this Error :
_Unexpected error while running flow: KeyError('Task slug init_dirs-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')_
I precise that I had already register the _main_flow_ (without the subflow inside) and it worked, but then I updated it and I registered the subflow, and it failed, could you help me please ? 🙏
The task "_init_dirs_" is in the main_flow, finishes in a success state, but this error append when subflow.run() is called (and init_dirs is not in the subflow)Anna Geller
create_flow_run
task triggering child flows via API calls. So you would have to replace the subflow.run()
by a create_flow_run task
To find out more, check out those Discourse topics:
• https://discourse.prefect.io/t/how-can-i-create-a-subflow/88
• https://discourse.prefect.io/t/how-can-i-create-a-subflow-and-block-until-it-s-completed/94Jean-Baptiste Six
02/16/2022, 10:18 AMwith Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call create_flow_run.run(...)
Also tried with .run() but it didn't work (same error), help please 🙏Anna Geller
Jean-Baptiste Six
02/16/2022, 10:36 AMwith Flow("main flow") as main_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
index(metadatas) # Error
with Flow("index_flow") as index_flow:
# Imports
metadatas = Parameter("metadatas_batch", required=True)
imports = build_import(metadatas)
index_weaviate(imports)
@task()
def index(metadatas: List[dict]) -> None:
metadata_batches = list(batch(metadatas, BATCH_SIZE_INDEX))
for i, metadata_batch in enumerate(metadata_batches):
print(f"Process batch {i+1}/{len(metadata_batches)} ...")
flow_id = create_flow_run(flow_name="index_flow", project_name="Document Pipeline", parameters={"metadatas_batch":metadata_batch}) # Error here
wait_for_flow_run(flow_id, raise_final_state=True, stream_logs=True)
print(f"Done !")
print('Success: All the indexations are done !')
@task()
def build_import(metadatas: List[dict]):
return weaviate.build_import(metadatas)
@task()
def index_weaviate(imports: dict) -> None:
client = get_client()
weaviate.index_multi_documents(client, imports)
metadatas
result to iterate, thus I think it's need to be inside a task, isn't ?Anna Geller
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
project_name=unmapped("your_project_name"),
)
@task
def get_parameter_values():
return list(batch(metadatas, BATCH_SIZE_INDEX))
and then:
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
params = get_parameter_values()
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline")
parameters=params,
)
Jean-Baptiste Six
02/17/2022, 10:04 AMwith Flow("sec daily") as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
metadatas_batches = split_batch(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters={"metadatas_batch": metadatas_batches},
upstream_tasks=[unmapped(metadatas_batches)]
)
@task(state_handlers=[fail_handler], log_stdout=True)
def split_batch(metadatas: List[dict]) -> List[List[dict]]:
return list(batch(metadatas, BATCH_SIZE_INDEX))
Anna Geller
@task
def get_parameter_values():
return list(batch(metadatas, BATCH_SIZE_INDEX))
Jean-Baptiste Six
02/17/2022, 10:07 AMAnna Geller
metadatas_batches
should be a list of dict, then you can pass it to the parameters with no modification. Additionally, you don’t need to set upstream task dependency since metadatas_batches is already passed as data dependency to the parameters.
with Flow("sec daily") as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
metadatas_batches = split_batch(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=metadata_batches,
)
Jean-Baptiste Six
02/17/2022, 10:15 AMTask 'create_flow_run[0]': Exception encountered during task execution!
requests.exceptions.HTTPError: 413 Client Error: Request Entity Too Large for url: <https://api.prefect.io/>
Is it because of the metadata dict ?Anna Geller
Jean-Baptiste Six
02/18/2022, 10:09 AMAnna Geller
Jean-Baptiste Six
02/18/2022, 10:14 AMAnna Geller
Jean-Baptiste Six
02/18/2022, 12:57 PMwith Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
Anna Geller
from google.cloud import storage as gcs
from google.oauth2 import service_account
from prefect.client.secrets import Secret
creds = Secret("GCP_CREDENTIALS").get()
PROJECT_NAME = "your gcp project name"
BUCKET_NAME = "your gcs bucket"
destination_file_name = "path/on/gcs/file.txt"
source_file_name = "local/path/file.txt"
credentials = service_account.Credentials.from_service_account_info(creds)
gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
bucket = gcs_client.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name=destination_file_name)
blob.upload_from_filename(source_file_name)