Hi all, I’m having some trouble with retrieving d...
# ask-community
j
Hi all, I’m having some trouble with retrieving data from a child flow. I have a child flow register and that sits in folder A, the child flow is an ETL process that downloads and formats data from a S3 bucket. I want to retrieve the transformed data from the child flow after it ran. I am using create_flow_run to start the child flow and it seems to run, however when I use get_task_run_result to get the data from the child flow it just gets stuck. I’m not sure if I’m not passing the correct task-slug or if the problem is related to the context of the child flow changing. In principle I want to define a ETL process that I can use in other projects and just use the generated data from the child process as the data source in the new project I’m working on. Thank you in advance for the help!
k
Hey @Jacq Crous, you can get the slug if you go into the Flow page for the
create_flow_run
task. This should work, but why is the context of the child flow changing? Also if that interface is giving you problems, you could also use the KV Store to persist a location and then retrieve the location downstream to use it. KV Store Docs
j
Hi Kevin, thanks for the response. I tried to look at the issue again, and I realised that the flow (when I run it in the folder) executes (it creates the right outputs) but it does not register that it ran on prefect. This is the only flow that seems to do this so the issue might be something other than create_flow_run. I guess it’s a separate issue but any ideas on what would cause that? I have a local agent register that runs all my other flows but for this particular flow it runs but does not appear on the agent or prefect dashboard’s run histories. The flow itself is there but there is no run history reordered for it.
k
Are you using
flow.run()
,
flow.run()
will not appear on Prefect cloud. How are you running the flow?
j
Yes I am using flow.run(). But when I use flow.run() for my other flows it does appear on Prefect cloud.
k
It should not at
flow.run()
is for local testing only. You may have a
StartFlowRun
or
create_flow_run
somewhere in the script? Would you like to show me the code?
j
Hi Kevin, so in principle what I have looks like this: In Folder A I have a script for downloading data from an S3 bucket and then transforms the data. So the script is AWS_download.py and looks like:
Copy code
import pandas as pd
import sqlite3
import os
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.aws.s3 import S3Download, S3List
from prefect.executors import LocalDaskExecutor
#from prefect.agent.local import LocalAgent
from prefect.engine.results import LocalResult
import re
from dotenv import dotenv_values

import sys

"""
# Extract:
@task
def get_aws_credentials():
    """
    Load aws credentials from a local .env file. 

    """
    return cred_dict

@task
def get_s3_data_list(bucket_name,source_prefix,cred_dict):
    """
    Get a list of available tables in the specified S3 bucket with 
    the source_prefix prefix
    """

    return table_list

@task()
def get_table_data(bucket_name,source_prefix,table_name,cred_dict):
    """
    Download specified table from the S3 bucket and convert data to a
    data frame. A payload dictionary is returned containing the constructed
    dataframe and the associated tablename to allow for mapping of this task.
    """
    return payload

# Transform:
def generate_sqlite_table_name(table_name,source_prefix):
    """
    Generate the sqlite table name by removing the db_name prefix from the 
    AWS S3 bucket name.
    """
    return sql_table_name

with Flow('AWS downloads') as flow:
    # Define parameters:
    source_prefix = Parameter('source_prefix',required=True,default=':memory')
    db_name = Parameter('db_name',required=True,default=':memory:')
    db_path = Parameter('db_path',default=os.getcwd())
    bucket_name = Parameter('bucket_name',required=True)

    # Define flow:
    cred_dict = get_aws_credentials()

    table_list = get_s3_data_list(bucket_name,source_prefix,cred_dict)

    payload_list = get_table_data.map(unmapped(bucket_name),unmapped(source_prefix),table_list,unmapped(cred_dict))

if len(sys.argv) > 1:
    if len(sys.argv) < 3:
        raise Exception('Three inputs are required to run pipeline: <aws bucket name> <source prefix> <sqlite_dbname>')

    flow.run(parameters={'bucket_name':sys.argv[1],'source_prefix':sys.argv[2],'db_name': sys.argv[3]})
    
if __name__=='__main__':
    """
    Bucket Name: AWS bucket name where data is stored.
    Source prefix: A prefixed used to search the storage bucket. The prefix is the name of the actual source file


    """
    flow_parameters = {
        'bucket_name':'bucket_name',
        'source_prefix':'some_prefix'
        'db_name': 'db_name',
        }

    executor = LocalDaskExecutor(scheduler="threads")
    flow.register(project_name='AWS ETL Pipelines')
    flow.run(parameters=flow_parameters)
So currently when I run this flow in does not show anything on the cloud. The question related to the parent and child flow is to run something like the following in folder B:
Copy code
import pandas as pd
from prefect import Flow, task
from prefect.run_configs import LocalRun
from prefect.core.task import Parameter
from prefect.tasks.prefect import StartFlowRun, create_flow_run, get_task_run_result
from prefect.backend import FlowRunView
from prefect.engine.results.s3_result import S3Result
import os

data_dict1 = {
    'bucket_name':'bucket_name',
    'source_prefix':'prefix_1'
    }

with Flow('parent_flow') as flow:
    child_flow_id = create_flow_run(
        project_name='AWS ETL Pipelines',
        flow_name='AWS downloads',
        parameters=kaeri_data_params,
        run_name='test_run1'
        )

   child_flow_data = get_task_run_result(child_flow_id,task_slug='get_table_data-1')


flow.register(project_name='AWS ETL Pipelines')
flow.run()
So if I run the first flow directly in Folder A it does not show up on prefect cloud. If I run the second script the child flow runs, but when I try and retrieve the data I get the following error:
Copy code
[2021-09-15 15:37:37+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Starting task run...
[2021-09-15 15:38:00+0200] ERROR - prefect.TaskRunner | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
    task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
    result = TaskRunView.from_task_slug(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
    cls._query_for_task_run(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
    task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
    raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get_table_data-1'}}, 'flow_run_id': {'_eq': '6553576c-cdf6-4306-9cc0-d848afc03203'}, 'map_index': {'_eq': 1}}
[2021-09-15 15:38:00+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Finished task run for task with final state: 'Failed'
[2021-09-15 15:38:00+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
k
So
flow.run()
does not trigger a flow run with a backend (Cloud or Server).
flow.run()
just runs your script but Prefect context is not filled. It’s the backend that injects stuff like the
flow_run_name
and
flow_run_id
. Only scheduled runs and runs triggered in the UI will appear in the UI.
flow.run()
is not a real flow run. I also don’t suggest the
sys.argv
because that’s not going to be available when you are running on Prefect Cloud. Prefect will take care of passing the parameters for you. As long as you pass the Parameters in the schedule or through the UI, the flow run will use them. You also set them with
required=True
so there will be an error if they are not supplied. The reason
flow A
shows in the UI when you run
flow B
is because the
create_flow_run
task hits the graphQL API to start a flow run. Runs on a schedule and runs in the UI also hit the graphQL API.
flow.run
does not hit the API. This is so that if you run
flow B
on a schedule, you will see all of
flow A
runs in the UI as well. In order to get
flow A
to appear in the UI, you need to register and run in the UI or on a schedule.
j
Hi Kyle, thanks for the response. That makes sense. I forgot about that distinction. I still however have the same issue of retrieving the data from the child flow as before: Error during execution of task: ValueError(“No task runs found while querying for task runs where {‘task’: {‘slug’: {‘_eq’: ‘get_table_data-1’}}, ‘flow_run_id’: {‘_eq’: ‘bdf0a323-0143-40c6-941d-08f65724c5a9’}, ‘map_index’: {‘_eq’: 1}}“) This error I pulled from prefect cloud. Could you please advise on why this is happening?
k
Was this a one time thing or always happens? The set-up for
flow_run_id
works so could you check the task slug in the subflow if it’s right? Did you remove the first
flow.run()
call when you registered?
j
So this happens every time. I tried a couple of different task slugs based on some graphql queries in the flows but none of them seem to work. I did remove the flow.run(), I’m running everything from the command-line, so the script only contains the tasks and the flow.
k
When you navigate to the flow run page, what does the slug show up as? The
'bdf0a323-0143-40c6-941d-08f65724c5a9'
Wait sorry, is that task mapped? You might want
map_index=-1
? Not sure why yours says 1