Gustavo de Paula
09/08/2021, 3:31 PMFilip Lindvall
09/08/2021, 3:35 PMclass ReadTable(Task):
def __init__(
self,
# table: str = None,
app: str = None,
formula: str = None,
api_key_name: str = "default_cred",
project_id: str = "some_id",
**kwargs: Any,
):
# self.table = table
<http://self.app|self.app> = app
self.formula = formula
self.api_key_name = api_key_name
self.project_id = project_id
super().__init__(**kwargs)
@defaults_from_attrs("app", "formula", "api_key_name", "project_id")
def run(
self,
table: str = None,
app: str = None,
formula: str = None,
api_key_name: str = None,
project_id: str = None,
) -> List[Any]:
# some code
return json.dumps(ret)
Trying to call this passing table as a prefect.Parameter
it does not "resolve" to a string. However wrapping the call in a
readTable = ReadTable(app="some_app_space")
@task
def wrapper(table: str) -> List[Any]:
return readTable.run(table=table)
Then it works and table
gets resolved correctly to its string value. Why is this? I've been looking allover for documentation.An Hoang
09/08/2021, 4:10 PMstate = flow.run()
object? I always get a KeyError
when trying to do state.result[mapped_task].result
but state.result[not_mapped_task].result
works fineKostas Chalikias
09/08/2021, 4:21 PMMartim Lobao
09/08/2021, 5:23 PMAn Hoang
09/08/2021, 6:49 PM#input : Have a list of X: [X1,X2, X3 ...etc]
#input: total_times = 1_000_000
#property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
#output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]
@task
def long_running_task(x, n_times):
result = x.do_work(n_times)
return result
I have hundreds of Dask workers and want to split this work into units of x.do_work(1000)
to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this?
Do I just generate a list of X_list = [x] *1000
for each X
and a list of iteration_list = [1000] * 1000
, then do long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
?Jose Chen
09/08/2021, 7:19 PMDavid Jenkins
09/08/2021, 7:24 PMrandom_string = GenerateRandomData(data=data)
3. random_string from flow1 will then be a parameter sent to flow2, so the parameter is dynamically set at runtime since the value of random_string is different with each execution of the three flows.
I learn best from reading code, so a basic example would be most helpful.Erik Amundson
09/08/2021, 8:49 PMimport prefect
@prefect.task
def do_nothing(n):
pass
with prefect.Flow("Dummy Flow") as flow:
listy = list(range(200000))
do_nothing.map(listy)
The scheduler pod runs out of memory after around 300 tasks, the screenshot of the dask dashboard is attached.
Has anyone ran into this issue or have any ideas for a fix? We normally run prefect 0.14.16 but I've tried on both 0.14.16 and latest (0.15.5) with the same results.Aric Huang
09/08/2021, 11:44 PMprefect.context
object or elsewhere?
• The flow ID used in the URL https://cloud.prefect.io/paravision/flow/<ID> seems to be different from the flow ID that's output when registering a flow. Is it possible to get that ID during a flow run through prefect.context
? prefect.context.flow_id
looks like the same one that's returned when registering the flow, but would like to also get this other flow ID.
• When registering a flow, it increments the version number and archives the previous flow version. Is it possible to un-archive a previous version so it can be run again?Gaylord Cherencey
09/09/2021, 5:48 AMFileNotFoundError, No such file or directory
while using Git storage to a private git server. I set Secret as ENV and specify git_token_secret_name
. Does this mean that the agent can connect to the repository but don't find the Flow python file?Eddie Atkinson
09/09/2021, 5:57 AMStartFlowRun
support passing parameters as lists of dictionaries in Python? I am currently getting the following error when I try to do that:
prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <https://api.prefect.io/>
This is likely caused by a poorly formatted GraphQL query or mutation but the response could not be parsed for more details
I tried performing the same operation via the interactive graphql api and I needed to escape the {
and "
to get the query to work. Looking at the source it seems as though this doesn’t happen and might be the cause of my problem.Ben Muller
09/09/2021, 7:13 AMschedule = Schedule(
clocks=[
CronClock(
cron="*/7 * * * *",
parameter_defaults=dict(account_identifier="inrun"),
start_date=datetime.now(tz=timezone("Australia/Brisbane")),
)
)
with Flow(
name="betfair_flow",
storage=Storage().in_a_s3_bucket(),
run_config=RunConfig().fargate_on_ecs(cpu=512, memory=2048),
schedule=schedule,
executor=LocalDaskExecutor(scheduler="threads"),
) as flow:
betfair_account_identifier = Parameter("account_identifier")
Error in 🧵Filip Lindvall
09/09/2021, 8:57 AMdef some_task(id:str, old: Dict, new: Dict)
and I have data as [{"id":"some_id", "old":{...}, "new":{...}}, ...]
is there some easy way to explode the dict into the tasks, or is it best to write some wrapper task whose only responsibility is to take the data and pass it to the intended task.Issam Assafi
09/09/2021, 11:48 AMMartim Lobao
09/09/2021, 12:19 PMRun(attractive-bug
) of flow821098cf-c921-49ef-93d2-dd741b440aca
failedexample
SLA (STARTED_NOT_FINISHED
) after 60 seconds. See the UI for more details.ee53f0cf-6d07-4dee-afb6-d1205baf141c
Rafael
09/09/2021, 12:29 PMimport prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
result = LocalResult(dir="./prefect-results", )
@task
def hello_task(named_params):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(named_params)
return 'asd'
with Flow("hello-flow", result=result) as flow:
named_params = Parameter("named_params",)
hello_task(named_params)
flow.run(named_params='param')
I was expecting to find a file in the the ./prefect-results
folder.. the folder is there but it is empty.. I also tried
import prefect
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
@task(checkpoint=True, result=LocalResult(dir="./prefect-results"))
def hello_task(named_params):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(named_params)
return 'asd'
with Flow("hello-flow", ) as flow:
named_params = Parameter("named_params",)
hello_task(named_params)
flow.run(named_params='param')
also empty..
Am I doing something wrong, or I misunderstood how this feature work?Abhishek
09/09/2021, 12:54 PMMichael
09/09/2021, 2:45 PM{upstream}
context option for a given task that somehow serializes the upstream dependencies into the result / target file name at run time. I know there is a {parameters}
context value (so I could use this in worst case), but not every task in a particular flow depends on all of that flow’s parameters, so this would result in unecessary computation.
Right now I implement something like this inside the task’s run logic itself, but I imagine it’s a helluva lot slower than letting the orchestrator deal with skipping tasks rather than entering every task to perform a file existence check before exiting (and also a lot less clean).
Cheers for any help!Sean Talia
09/09/2021, 2:53 PM@task
def custom_task_1(input) -> str:
return f"Custom Input: {input}"
with Flow(...) as flow:
custom_task_1_results = custom_task_1("Hello World!")
custom_task_2 = CustomTask2(message=custom_task_1_results)
but when I look at my flow's schematic, it doesn't show custom_task_1
as being upstream of CustomTask2
...I don't remember ever having run into this, so I feel like there's something quite obvious that I must be missing. I appreciate any help!Alex
09/09/2021, 3:07 PMModuleNotFoundError
. To make this work will I need to create a docker image that includes all the dependencies? Also, this means that I will have to use another option for storage?Jack Sundberg
09/09/2021, 3:17 PMMichael
09/09/2021, 6:04 PM*prefect*.config.flows.checkpointing = True
somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set --env PREFECT__FLOWS__CHECKPOINTING=true
as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as neededJack Sundberg
09/09/2021, 7:00 PMclient.create_project(project_name="Hello, World!")
is_successful = client.delete_project(project_name="Hello, World!")
client.create_project(project_name="Hello, World!") # FAILS HERE
The second call to create_project fails with a ClientError due to a uniqueness violation -- but in raising that error, the client is unable to tell which existing project causes the uniqueness violation (bc I just deleted it).
Would anyone have insight as to what's going on here? I've tried sleeping between each line too so I don't think this is a race condition, but I'm not totally sure.joshua mclellan
09/09/2021, 7:17 PMMaikel Penz
09/09/2021, 8:29 PMNoCredentialsError('Unable to locate credentials')
from the job execution.
In the past I used the ECS agent and I was able to pass a task_role_arn
to it. However I see that both the KubernetesAgent
and the run config for Kubernetes KubernetesRun
don’t have a “role” parameter to inform.
My next thought was that either the role attached to the EKS cluster
or the Fargate profile
should do the work, but as a test I gave admin to both and I still get the credentials issue.
What am I missing ?Constantino Schillebeeckx
09/09/2021, 8:47 PMf"{os.path.dirname(os.path.realpath(__file__))}/layouts"
, however this causes the following (when executing from Prefect cloud):
Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
how come that global isn't available?Abhas P
09/09/2021, 10:33 PMsean williams
09/09/2021, 10:38 PMThe LogRocket software has a feature that allows for the “video-like” replay of a previous user sessionit looks like disabling telemetry might not disable logrocket's session recording? Am I interpreting that correctly or am I missing a config to disable it? If anything, logrocket seems more invasive than GA telemetry. Please let me know if this belongs in a different channel. Thanks
Jacq Crous
09/10/2021, 7:23 AMJacq Crous
09/10/2021, 7:23 AMKevin Kho
09/10/2021, 2:07 PMcreate_flow_run
task. This should work, but why is the context of the child flow changing?
Also if that interface is giving you problems, you could also use the KV Store to persist a location and then retrieve the location downstream to use it. KV Store DocsJacq Crous
09/13/2021, 3:33 PMKevin Kho
09/13/2021, 3:35 PMflow.run()
, flow.run()
will not appear on Prefect cloud. How are you running the flow?Jacq Crous
09/14/2021, 6:27 AMKevin Kho
09/14/2021, 2:03 PMflow.run()
is for local testing only. You may have a StartFlowRun
or create_flow_run
somewhere in the script? Would you like to show me the code?Jacq Crous
09/15/2021, 1:06 PMimport pandas as pd
import sqlite3
import os
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.aws.s3 import S3Download, S3List
from prefect.executors import LocalDaskExecutor
#from prefect.agent.local import LocalAgent
from prefect.engine.results import LocalResult
import re
from dotenv import dotenv_values
import sys
"""
# Extract:
@task
def get_aws_credentials():
"""
Load aws credentials from a local .env file.
"""
return cred_dict
@task
def get_s3_data_list(bucket_name,source_prefix,cred_dict):
"""
Get a list of available tables in the specified S3 bucket with
the source_prefix prefix
"""
return table_list
@task()
def get_table_data(bucket_name,source_prefix,table_name,cred_dict):
"""
Download specified table from the S3 bucket and convert data to a
data frame. A payload dictionary is returned containing the constructed
dataframe and the associated tablename to allow for mapping of this task.
"""
return payload
# Transform:
def generate_sqlite_table_name(table_name,source_prefix):
"""
Generate the sqlite table name by removing the db_name prefix from the
AWS S3 bucket name.
"""
return sql_table_name
with Flow('AWS downloads') as flow:
# Define parameters:
source_prefix = Parameter('source_prefix',required=True,default=':memory')
db_name = Parameter('db_name',required=True,default=':memory:')
db_path = Parameter('db_path',default=os.getcwd())
bucket_name = Parameter('bucket_name',required=True)
# Define flow:
cred_dict = get_aws_credentials()
table_list = get_s3_data_list(bucket_name,source_prefix,cred_dict)
payload_list = get_table_data.map(unmapped(bucket_name),unmapped(source_prefix),table_list,unmapped(cred_dict))
if len(sys.argv) > 1:
if len(sys.argv) < 3:
raise Exception('Three inputs are required to run pipeline: <aws bucket name> <source prefix> <sqlite_dbname>')
flow.run(parameters={'bucket_name':sys.argv[1],'source_prefix':sys.argv[2],'db_name': sys.argv[3]})
if __name__=='__main__':
"""
Bucket Name: AWS bucket name where data is stored.
Source prefix: A prefixed used to search the storage bucket. The prefix is the name of the actual source file
"""
flow_parameters = {
'bucket_name':'bucket_name',
'source_prefix':'some_prefix'
'db_name': 'db_name',
}
executor = LocalDaskExecutor(scheduler="threads")
flow.register(project_name='AWS ETL Pipelines')
flow.run(parameters=flow_parameters)
So currently when I run this flow in does not show anything on the cloud. The question related to the parent and child flow is to run something like the following in folder B:
import pandas as pd
from prefect import Flow, task
from prefect.run_configs import LocalRun
from prefect.core.task import Parameter
from prefect.tasks.prefect import StartFlowRun, create_flow_run, get_task_run_result
from prefect.backend import FlowRunView
from prefect.engine.results.s3_result import S3Result
import os
data_dict1 = {
'bucket_name':'bucket_name',
'source_prefix':'prefix_1'
}
with Flow('parent_flow') as flow:
child_flow_id = create_flow_run(
project_name='AWS ETL Pipelines',
flow_name='AWS downloads',
parameters=kaeri_data_params,
run_name='test_run1'
)
child_flow_data = get_task_run_result(child_flow_id,task_slug='get_table_data-1')
flow.register(project_name='AWS ETL Pipelines')
flow.run()
So if I run the first flow directly in Folder A it does not show up on prefect cloud. If I run the second script the child flow runs, but when I try and retrieve the data I get the following error:
[2021-09-15 15:37:37+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Starting task run...
[2021-09-15 15:38:00+0200] ERROR - prefect.TaskRunner | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
result = TaskRunView.from_task_slug(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
cls._query_for_task_run(
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get_table_data-1'}}, 'flow_run_id': {'_eq': '6553576c-cdf6-4306-9cc0-d848afc03203'}, 'map_index': {'_eq': 1}}
[2021-09-15 15:38:00+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Finished task run for task with final state: 'Failed'
[2021-09-15 15:38:00+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Kevin Kho
09/15/2021, 2:12 PMflow.run()
does not trigger a flow run with a backend (Cloud or Server). flow.run()
just runs your script but Prefect context is not filled. It’s the backend that injects stuff like the flow_run_name
and flow_run_id
. Only scheduled runs and runs triggered in the UI will appear in the UI. flow.run()
is not a real flow run.
I also don’t suggest the sys.argv
because that’s not going to be available when you are running on Prefect Cloud. Prefect will take care of passing the parameters for you. As long as you pass the Parameters in the schedule or through the UI, the flow run will use them. You also set them with required=True
so there will be an error if they are not supplied.
The reason flow A
shows in the UI when you run flow B
is because the create_flow_run
task hits the graphQL API to start a flow run. Runs on a schedule and runs in the UI also hit the graphQL API. flow.run
does not hit the API. This is so that if you run flow B
on a schedule, you will see all of flow A
runs in the UI as well.
In order to get flow A
to appear in the UI, you need to register and run in the UI or on a schedule.Jacq Crous
09/17/2021, 9:40 AMKevin Kho
09/17/2021, 2:30 PMflow_run_id
works so could you check the task slug in the subflow if it’s right?
Did you remove the first flow.run()
call when you registered?Jacq Crous
09/20/2021, 6:15 AMKevin Kho
09/20/2021, 9:23 PM'bdf0a323-0143-40c6-941d-08f65724c5a9'
map_index=-1
? Not sure why yours says 1