Fancy Arora
11/07/2022, 5:42 PMJeff Hale
11/07/2022, 5:54 PMLukasz Mentel
11/07/2022, 6:44 PMio.StringIO
objects between tasks in prefect 2.0? Here's a minimal example I'm trying to get to work but getting weird results, appreciate any help on this.
import io
from prefect import task, flow
@task
def string_to_io():
return io.StringIO("hello prefect")
@task
def consume(string_io):
print(type(string_io))
return string_io.read()
@flow
def test():
# s = string()
str_io = string_to_io()
r = consume(str_io)
print(r)
test()
when I run it I get
9:38:59.511 | INFO | prefect.engine - Created flow run 'congenial-longhorn' for flow 'test'
19:38:59.708 | INFO | Flow run 'congenial-longhorn' - Created task run 'string_to_io-78a25967-0' for task 'string_to_io'
19:38:59.709 | INFO | Flow run 'congenial-longhorn' - Executing 'string_to_io-78a25967-0' immediately...
19:38:59.768 | INFO | Task run 'string_to_io-78a25967-0' - Finished in state Completed()
19:38:59.789 | INFO | Flow run 'congenial-longhorn' - Created task run 'consume-884672dc-0' for task 'consume'
19:38:59.790 | INFO | Flow run 'congenial-longhorn' - Executing 'consume-884672dc-0' immediately...
<class 'list'>
19:38:59.828 | ERROR | Task run 'consume-884672dc-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1222, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/lukasz/projects/zlotyryjek/crap.py", line 18, in consume
return string_io.read()
AttributeError: 'list' object has no attribute 'read'
So for some reason stringIO
is converted to an empty list 🤔Kalise Richmond
11/07/2022, 7:07 PMNick Coy
11/07/2022, 9:12 PMprefect.agent - An error occured while monitoring flow run <flow_run_id> The flow run will not be marked as failed, but an issue may have occurred.
Philip MacMenamin
11/07/2022, 9:28 PMwith Flow(
"...",
state_handlers=[utils.notify_completion, utils.notify_running],...
with
def notify_running(flow: Flow, old_state, new_state) -> State:
And I could ask what the new state is doing. eg if
if new_state.is_running()
I'm reading https://docs.prefect.io/concepts/states/to try to work out how to capture these flow level state changes and am struggling.
Any pointers?Tim Ricablanca
11/07/2022, 10:09 PMalembic.util.exc.CommandError: Can't locate revision identified by '41e5ed9e1034'
i’ll post some details in a thread about what i tried before nuking the database and starting over, but 1) is there guidance on how i should be using prefect orion database stamp
or … revision
before upgrading orion from 2.6.4 -> 2.6.6 (or any other minor version) ? and 2) should i be running … downgrade
from the 2.6.6 package or 2.6.4 package if i wanted to go from 2.6.6->2.6.4?Zac Hooper
11/07/2022, 11:53 PMCrash detected! Execution was interrupted by an unexpected exception.
Looking over the logs produced by the agent I can't see the Exception thrown.
The flow looks like below. I've changed the variables used but the premise is that two lambdas are invoked but the 2nd needs to wait until the first is completed.
import os
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect.orion.schemas.schedules import CronSchedule
from utils.aws import invoke_lambda
# --- Flow Details --- #
client = "example_client"
project = "example_project"
flow_name = "Example Name"
description = """
Example Description
"""
tags = ["example"] # Optional list of string tags for the flow. Helps to filter in UI
schedule = CronSchedule(cron="* 7-19 * * MON-FRI", timezone="Australia/Sydney")
# --- Other Flow Details (Leave as is) --- #
work_queue = "example_workqueue"
storage = S3.load("s3-storage")
output_file_name = f"{os.path.splitext(os.path.basename(__file__))[0]}.yaml"
def invoke_lambda(function_full_name, payload, blocking=False, **kwargs):
"""
Invokes the given lambda function in AWS.
Args:
function_full_name (str): Exact name of the lambda function
payload (dict): Dictionary of data you may want to pass to lambda
blocking (bool, optional): If you want to wait for a response from the lambda. Defaults to False.
"""
config = Config(max_pool_connections=250, read_timeout=900, connect_timeout=900)
lambda_client = boto3.client("lambda", config=config)
# Change client's region
if kwargs.get("region"):
lambda_client = boto3.client(
"lambda", config=config, region_name=kwargs.get("region")
)
# DO NOT BLOCK IF WILL RUN OVER 30 SECONDS
payload = bytes(json.dumps(payload), encoding="utf8")
if blocking:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=payload,
)
res_payload = response.get("Payload").read()
return json.loads(res_payload)
else:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="Event",
LogType="None",
Payload=payload,
)
return response
@task
def first_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("first_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@task
def second_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("second_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@flow
def example_flow():
x = first_lambda_to_invoke.submit()
y = second_lambda_to_invoke.submit(wait_for=[x])
if __name__ == "__main__":
Deployment.build_from_flow(
flow=example_flow,
name=flow_name,
work_queue_name=work_queue,
tags=[client, project] + tags,
schedule=schedule,
description=description,
output=output_file_name,
storage=storage,
apply=True,
path="/",
)
Here is an example of the log output from a failed run
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Crash detected! Execution was interrupted by an unexpected exception.
09:32:10 AM
first_lambda_to_invoke-ec90046a-0
{'statusCode': 200}
09:32:21 AM
first_lambda_to_invoke-ec90046a-0
Crash detected! Execution was interrupted by an unexpected exception.
Note the {'statusCode': 200}
response from the Lambda showing that it ran successfully.Ben Muller
11/08/2022, 1:57 AM.set_dependencies(
upstream_tasks=[my_upstream_task]
)
Oh - I can see in the message above there is a wait_for
kwarg in the submit handler... Ignore me 👀Faheem Khan
11/08/2022, 4:09 AMBrokenPipeError: [Errno 32] Broken pipe
ConnectionResetError: [Errno 104] Connection reset by peer
. Any leads would be much appreciated, cheersEvgeny Ivanov
11/08/2022, 5:46 AMcache_key_fn=task_input_hash
and cache_expiration=timedelta(days=1)
. Now for some reason I want to clear cache and run the task again (suppose I've changed the logic in a module I'm using from the task).
My options (including non-working):
1. Deleting cache files doesn't work. If I run a task after deleting files I get an error. Prefect doesn't check if the file exists before deciding on using cache.
2. Deleting flow runs or task runs does work. But it's not convenient for two reasons:
a. I should delete history of runs.
b. I have to remember which flow/task run to delete or delete all of them.
3. Adding extra parameter cache_num
to task should work. I can just change it's value every time I want to avoid using cache. But it generates extra boilerplate code and I have to change flow/task code to change parameter value. It looks like a dirty duct tape to me)
4. Changing data in OrionDB maybe possible, but I'm sure that it's strongly not recommended. And I'd like to avoid it.
Maybe there is a better option I don't know about? A perfect solution would be CLI or API with flow/task name as an input parameter.Michiel Verburg
11/08/2022, 8:10 AMIkkyu Choi
11/08/2022, 8:13 AMis_schedule_active
means status of flow (turn on or turn off) ?
I got a some flow’s`is_schedule_active` bool value as true by using graphql, but on my prefect UI this flow is turned off state. Am i think wrong?Rohith
11/08/2022, 11:05 AMRohith
11/08/2022, 11:05 AMPiotr Bródka
11/08/2022, 11:17 AMEvgeny Ivanov
11/08/2022, 11:36 AMparam=value
, 'param': 'value'
or something else?
Anyway after editing this field I cannot create the Block. Chome gives me the following error.
UPD. I've found out that I have to fill Query and Connect Args either with some values or with null
. And the right format for Connect Args is JSON:
{
"param": "value"
}
P.S. Btw, I cannot create Block in Firefox. Is this browser not supported?Denis
11/08/2022, 1:08 PMflow_run_ids=[]
for param in params:
flow_run_id = create_flow_run(
flow=flow_model,
parameters=run_params,
name=run_name,
state=state
)
flow_run_ids.append(flow_run_id)
wait_for_flows(flow_run_ids)
Thanks for any helpSunjay
11/08/2022, 2:14 PMFlorian Kühnlenz
11/08/2022, 2:19 PMGordon Silvera
11/08/2022, 2:51 PMAttributeError: module 'prefect.tasks' has no attribute 'dbt'
. I've tried to run this locally and within the jupyter-scipy Docker image. Any thoughts why Prefect cannot find the module?
# dbt_shell_task.py
import sys
import prefect
from prefect import task, flow, get_run_logger
@flow
def dbt_flow(cmd='dbt run'):
# Execute specified command
task = prefect.tasks.dbt.dbt.DbtShellTask(
command=cmd,
profile_name='default',
environment='Development',
dbt_kwargs={'type': 'bigquery'},
overwrite_profiles=False,
profiles_dir='/home/jovyan/.dbt/profiles.yml'
)
logger = get_run_logger()
<http://logger.info|logger.info>("Command Run: %s!", name)
return task
if __name__ == "__main__":
cmd = sys.argv[1]
dbt_flow(cmd)
# packages
prefect==2.6.6
prefect-dbt==0.2.4
prefect-shell==0.1.3
dbt-bigquery==1.3.0
dbt-core==1.3.0
dbt-extractor==0.4.1
Jared Robbins
11/08/2022, 3:45 PMAxel Sundberg
11/08/2022, 3:55 PM@task
annotation. I have defined two tasks in a flow like this:
@task(name="Backup product catalogue", log_stdout=True)
def backup_task():
backup_product_catalogue_to_gcs()
@task(name="Update Product Catalogue", log_stdout=True, trigger=triggers.all_finished)
def upsert_pack_products_task():
upsert_pack_products()
with Flow(
name="Product catalogue",
run_config=get_run_config(),
storage=get_storage("product_catalogue/flow.py"),
) as product_catalogue_flow:
backup_task()
upsert_pack_products_task(upstream_tasks=[backup_task])
I renamed the function name upsert_pack_products_task
and re-deployed. Now I can see two Backup product catalogue
in my flow on Prefect and they both run (see image) when the flow is triggered. How can I remove that extra task? And how can I debug what is going on?Sunjay
11/08/2022, 4:13 PMJames Brady
11/08/2022, 5:19 PMCrash detected! Execution was interrupted by an unexpected exception.
(full logs in thread). I'm trying to figure out the best way to debug this further.
In general – and to help diagnose this particular case – I'd like to shuttle logs to our DataDog instance: I found this discussion on the topic, but is there something more recent / more complete for Prefect v2?
Aside from getting logs out, any other ideas for diagnosing this terse error?Madison Schott
11/08/2022, 5:23 PMsync = await fivetran_sync_flow(
^
SyntaxError: 'await' outside async function
Blake Stefansen
11/08/2022, 7:27 PMOrion logging error
with 2.6.6 for our k8 jobs
Will post logs in the threadDavid Beck
11/08/2022, 8:08 PMPaco Ibañez
11/08/2022, 10:43 PMRohith
11/09/2022, 8:40 AM