Jacq Crous
09/10/2021, 7:23 AMKevin Kho
create_flow_run
task. This should work, but why is the context of the child flow changing?
Also if that interface is giving you problems, you could also use the KV Store to persist a location and then retrieve the location downstream to use it. KV Store DocsJacq Crous
09/13/2021, 3:33 PMKevin Kho
flow.run()
, flow.run()
will not appear on Prefect cloud. How are you running the flow?Jacq Crous
09/14/2021, 6:27 AMKevin Kho
flow.run()
is for local testing only. You may have a StartFlowRun
or create_flow_run
somewhere in the script? Would you like to show me the code?Jacq Crous
09/15/2021, 1:06 PMimport pandas as pd
import sqlite3
import os
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.aws.s3 import S3Download, S3List
from prefect.executors import LocalDaskExecutor
#from prefect.agent.local import LocalAgent
from prefect.engine.results import LocalResult
import re
from dotenv import dotenv_values
import sys
"""
# Extract:
@task
def get_aws_credentials():
"""
Load aws credentials from a local .env file.
"""
return cred_dict
@task
def get_s3_data_list(bucket_name,source_prefix,cred_dict):
"""
Get a list of available tables in the specified S3 bucket with
the source_prefix prefix
"""
return table_list
@task()
def get_table_data(bucket_name,source_prefix,table_name,cred_dict):
"""
Download specified table from the S3 bucket and convert data to a
data frame. A payload dictionary is returned containing the constructed
dataframe and the associated tablename to allow for mapping of this task.
"""
return payload
# Transform:
def generate_sqlite_table_name(table_name,source_prefix):
"""
Generate the sqlite table name by removing the db_name prefix from the
AWS S3 bucket name.
"""
return sql_table_name
with Flow('AWS downloads') as flow:
# Define parameters:
source_prefix = Parameter('source_prefix',required=True,default=':memory')
db_name = Parameter('db_name',required=True,default=':memory:')
db_path = Parameter('db_path',default=os.getcwd())
bucket_name = Parameter('bucket_name',required=True)
# Define flow:
cred_dict = get_aws_credentials()
table_list = get_s3_data_list(bucket_name,source_prefix,cred_dict)
payload_list = get_table_data.map(unmapped(bucket_name),unmapped(source_prefix),table_list,unmapped(cred_dict))
if len(sys.argv) > 1:
if len(sys.argv) < 3:
raise Exception('Three inputs are required to run pipeline: <aws bucket name> <source prefix> <sqlite_dbname>')
flow.run(parameters={'bucket_name':sys.argv[1],'source_prefix':sys.argv[2],'db_name': sys.argv[3]})
if __name__=='__main__':
"""
Bucket Name: AWS bucket name where data is stored.
Source prefix: A prefixed used to search the storage bucket. The prefix is the name of the actual source file
"""
flow_parameters = {
'bucket_name':'bucket_name',
'source_prefix':'some_prefix'
'db_name': 'db_name',
}
executor = LocalDaskExecutor(scheduler="threads")
flow.register(project_name='AWS ETL Pipelines')
flow.run(parameters=flow_parameters)
So currently when I run this flow in does not show anything on the cloud. The question related to the parent and child flow is to run something like the following in folder B:
import pandas as pd
from prefect import Flow, task
from prefect.run_configs import LocalRun
from prefect.core.task import Parameter
from prefect.tasks.prefect import StartFlowRun, create_flow_run, get_task_run_result
from prefect.backend import FlowRunView
from prefect.engine.results.s3_result import S3Result
import os
data_dict1 = {
'bucket_name':'bucket_name',
'source_prefix':'prefix_1'
}
with Flow('parent_flow') as flow:
child_flow_id = create_flow_run(
project_name='AWS ETL Pipelines',
flow_name='AWS downloads',
parameters=kaeri_data_params,
run_name='test_run1'
)
child_flow_data = get_task_run_result(child_flow_id,task_slug='get_table_data-1')
flow.register(project_name='AWS ETL Pipelines')
flow.run()
So if I run the first flow directly in Folder A it does not show up on prefect cloud. If I run the second script the child flow runs, but when I try and retrieve the data I get the following error:
[2021-09-15 15:37:37+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Starting task run...
[2021-09-15 15:38:00+0200] ERROR - prefect.TaskRunner | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
result = TaskRunView.from_task_slug(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
cls._query_for_task_run(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get_table_data-1'}}, 'flow_run_id': {'_eq': '6553576c-cdf6-4306-9cc0-d848afc03203'}, 'map_index': {'_eq': 1}}
[2021-09-15 15:38:00+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Finished task run for task with final state: 'Failed'
[2021-09-15 15:38:00+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Kevin Kho
flow.run()
does not trigger a flow run with a backend (Cloud or Server). flow.run()
just runs your script but Prefect context is not filled. It’s the backend that injects stuff like the flow_run_name
and flow_run_id
. Only scheduled runs and runs triggered in the UI will appear in the UI. flow.run()
is not a real flow run.
I also don’t suggest the sys.argv
because that’s not going to be available when you are running on Prefect Cloud. Prefect will take care of passing the parameters for you. As long as you pass the Parameters in the schedule or through the UI, the flow run will use them. You also set them with required=True
so there will be an error if they are not supplied.
The reason flow A
shows in the UI when you run flow B
is because the create_flow_run
task hits the graphQL API to start a flow run. Runs on a schedule and runs in the UI also hit the graphQL API. flow.run
does not hit the API. This is so that if you run flow B
on a schedule, you will see all of flow A
runs in the UI as well.
In order to get flow A
to appear in the UI, you need to register and run in the UI or on a schedule.Jacq Crous
09/17/2021, 9:40 AMKevin Kho
flow_run_id
works so could you check the task slug in the subflow if it’s right?
Did you remove the first flow.run()
call when you registered?Jacq Crous
09/20/2021, 6:15 AMKevin Kho
'bdf0a323-0143-40c6-941d-08f65724c5a9'
Kevin Kho
map_index=-1
? Not sure why yours says 1