Ryan Peden
03/30/2023, 7:44 PMcreate_table_artifact()
function:
βββ β
from prefect import task, flow
from prefect.artifacts import create_table_artifact
@task
def my_table_task():
table_data = [
{"id": 0, "name": "Dublin", "lat": 53.3498, "lon": -6.2603,},
{"id": 1, "name": "London", "lat": 51.5074, "lon": -0.1278,},
{"id": 2, "name": "New York", "lat": 40.7128, "lon": -74.0060,},
{"id": 3, "name": "Oslo", "lat": 59.9139, "lon": 10.7522,},
{"id": 4, "name": "Paris", "lat": 48.8566, "lon": 2.3522,},
{"id": 5, "name": "Rome", "lat": 41.9028, "lon": 12.4964,},
{"id": 6, "name": "Tokyo", "lat": 35.6895, "lon": 139.6917,},
{"id": 7, "name": "Vancouver", "lat": 49.2827, "lon": -123.1207,}
]
return create_table_artifact(
key="cities-table",
table=table_data,
description="A table of cities and their coordinates",
)
@flow
def my_flow():
table = my_table_task()
return table
if __name__ == "__main__":
my_flow()
You can view your artifacts in the Artifacts page of the Prefect UI, easily search the data in your new table artifact, and toggle between a rendered and raw version of your data - see the second screenshot attached below.
β’ See the documentation to learn more about Artifacts!
βββ β
Configure result storage keys π
When persisting results, Prefect stores data at a unique, randomly-generated path. While this is convenient for ensuring the result is never overwritten, it limits organization of result files. In this release, weβve added configuration of result storage keys, which gives you control over the result file path. Result storage keys can be dynamically formatted with access to all of the modules in prefect.runtime
and the runβs parameters
.
βββ β
For example, you can name each result to correspond to the flow run that produced it and a parameter it received:
from prefect import flow, task
@flow()
def my_flow():
hello_world()
hello_world(name="foo")
hello_world(name="bar")
@task(
persist_result=True,
result_storage_key="hello__{flow_run.name}__{parameters[name]}.json",
)
def hello_world(name: str = "world"):
return f"hello {name}"
my_flow()
βββ β
Which will persist three result files in the storage directory:
$ ls ~/.prefect/storage | grep "hello__"
hello__rousing-mushroom__bar.json
hello__rousing-mushroom__foo.json
hello__rousing-mushroom__world.json
βββ βββ ββ
See the documentation for more information.
βββ β
βββ β
Expanded prefect.runtime
βββ β
The prefect.runtime
module is now the preferred way to access information about the current run. In this release, weβve added the following attributes:
β’ prefect.runtime.task_run.id
β’ prefect.runtime.task_run.name
β’ prefect.runtime.task_run.task_name
β’ prefect.runtime.task_run.tags
β’ prefect.runtime.task_run.parameters
β’ prefect.runtime.flow_run.name
β’ prefect.runtime.flow_run.flow_name
β’ prefect.runtime.flow_run.parameters
βββ β
See the documentation for more information!
Contributors
We have several first-time contributors in this release. Letβs give them a round of applause! π
βββ β
β’ @andreadistefano made their first contribution in #8942
β’ @knl made their first contribution in #8974
β’ @thomas-te made their first contribution in #8959
Here are a few key enhancements and fixes:
βββ β
Enhancements
β’ π’ Add unique integers to worker thread names for inspection - #8908
β’ π Add support to JSONSerializer
for serialization of exceptions so they are persisted even on failure - #8922
β’ ποΈ Add Gzip middleware to the UI and API FastAPI apps for compressing responses - #8931
β’ π Update the runtime to detect flow run information from task run contexts β #8951
βββ β
Fixes
β’ π οΈ Fix imports in copytree backport for Python 3.7 - #8925
β’ π Retry on sqlite operational errors - #8950
β’ β±οΈ Add 30 second timeout to shutdown of the log worker thread β #8983
ββββββ ββ
See the release notes for full details on the updates in Prefect 2.9.0!