Khuyen Tran
11/10/2022, 7:39 PMwait_for
to subflows so that they can wait for upstream tasks. In the example below, subflow
didnβt run because the fails
task failed.
This feature is available in Prefect 2.6.6.Khuyen Tran
11/11/2022, 5:32 PMKhuyen Tran
11/11/2022, 11:05 PMKalise Richmond
11/15/2022, 4:46 PMKhuyen Tran
11/15/2022, 4:59 PMimport asyncio
from prefect import flow, get_run_logger
@flow
async def test_async_flow():
await asyncio.sleep(3)
logger = get_run_logger()
logger.debug("a debug")
<http://logger.info|logger.info>("my info")
logger.warning("some warning")
logger.critical("super critical")
logger.error("error!")
print('all done')
@flow
def test_sync_flow():
test_async_flow()
test_sync_flow()
View more here. Kudos to @Andrew Huang for this cool feature.Anjaneyulu K
11/16/2022, 1:34 AMColin Bieberstein
11/17/2022, 3:29 AMAnderson Luiz Souza
11/17/2022, 11:44 AMKhuyen Tran
11/17/2022, 4:11 PMtimeout_seconds=n
and the task takes longer than n
seconds to execute, Prefect will raise a TimeOut error.
View more here. Kudos to @Peyton Runyan for this feature.jmill
11/21/2022, 7:27 PMBoris Tseytlin
11/28/2022, 3:13 PMblock.save("name", overwrite=True)
, but I donβt see it in UI. How can I know whatt is wrong?Boris Tseytlin
11/28/2022, 3:14 PMstorage_block = RemoteFileSystem(
basepath="<s3://prefect-flows>",
key_type="hash",
settings=dict(
use_ssl=False,
key=config.STORAGE_MINIO_USER,
secret=config.STORAGE_MINIO_USER,
client_kwargs=dict(endpoint_url=f"http://{config.STORAGE_MINIO_URL}"),
),
)
storage_block.save("flow-storage", overwrite=True)
<http://logging.info|logging.info>("Created storage block")
Kalise Richmond
11/29/2022, 4:41 PMSebastian Steele
11/29/2022, 7:10 PMimport helper_file.py
'''
flow that uses helper_file
'''
Sebastian Steele
11/29/2022, 7:11 PMJordy Wegman
11/29/2022, 8:05 PMar
11/30/2022, 3:00 AMKhuyen Tran
11/30/2022, 4:30 PMfrom prefect import flow, get_run_logger
@flow
def cow_say():
logger = get_run_logger()
<http://logger.info|logger.info>("I'm in [red]danger[/red]!")
if __name__ == "__main__":
cow_say()
View more styles in Richβs documentation.L
11/30/2022, 5:02 PMUzoma Emuchay
12/01/2022, 2:26 AMDragan
12/01/2022, 11:10 AMwait until finish
if it is running in same flow, we can check triggers, but can we do it on the flow level?ar
12/02/2022, 5:02 AMJordy Wegman
12/02/2022, 8:24 PMZack
12/03/2022, 7:08 PMAnthony
12/07/2022, 3:41 AMMatthew Ferry
12/07/2022, 12:42 PMDaryl Thomas
12/08/2022, 2:09 PMNils
12/08/2022, 2:27 PMregister_type_and_schema()
. However, now I want to delete those blocks but I can not find a way to do so..Bryce Morrow
12/08/2022, 6:07 PMpersist_result=True
, which helps speed up my workflow considerably. For simpler data transformations on the results of these expensive tasks, I don't tend to persist the results.
β’ For the tasks that I am persisting, their implementations often change as I iterate on ideas. So, for example, maybe I change the implementation of the task one
to return 10
instead of 5
. Whenever I change the implementation of one of my tasks with persisted results, I change the cache key. For this example, I'd change lambda *args: "one"
to lambda *args: "one1"
.
A simplified example:
@task(persist_result=True, cache_key_fn=lambda *args: "one")
async def one():
return 5
@task(persist_result=True, cache_key_fn=lambda *args: "two")
async def two():
return 2
@flow(cache_result_in_memory=False)
async def flow_test():
a = await one()
b = await two()
return a + b
await flow_test()
This seems to be working at the start, but eventually, I start running into an issue where tasks fail to run due to an error in decrypting their block data. I'll share the exact error log in the replies. I haven't yet determined the exact cause of the problem but I have some suspicions / notes.
The first concerning thing is that when I re-run this cell, say after changing a task implementation, I get some warnings that look like this:
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'one' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:1' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'two' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:5' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
And this makes sense, since I'm defining a task with the same name again. However, when I look at the path /var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py
, it doesn't exist. However, there are different folders stored at the path /var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/
with names of the form ipykernel_<some_number>
. If I look at the contents of these directories, they contain small snippets of code from my notebook but they aren't the task definitions themselves usually. So I think the ipykernel is storing this information temporarily but then it changes locations, contents, disappears, etc.
I've also tested running the notebook end to end, having everything work fine, then restarting the conda kernel and re-running the notebook. This causes the error I'll share below. This strongly suggests to me that the way the kernel is managing the location of the notebook source code is the culprit here.
Another point is that if I clear the orion database using the command prefect orion database reset -y
then my issues are resolved. However, I would like to determine the issue and fix it in some better way.Tony
12/09/2022, 3:04 PM