Anna Geller
--prefetch-seconds
flag, which is especially useful when interacting with serverless infrastructure.
Imagine that you want to schedule a specific flow to run at 9 AM, but it roughly takes a minute to provision a serverless container. With this new CLI flag, your Prefect agent will begin flow run submission earlier than scheduled (in the example below, 60 seconds earlier) to ensure your container starts on time:
prefect agent start --prefetch-seconds 60 -q default
The release includes many bug fixes and a new prefect-vault
collection allowing you to interact with Hashicorp Vault - big thanks to Pavel Chekin for contributing it gratitude thank you
As always, you can find more details in the release notes.
Happy engineering! prefect duckAnna Geller
orjson
serialization (this is important for logging to external systems such as Datadog - more on that soon!)
β’ avoid duplication when configuring logging e.g. to configure logs to be JSON-formatted, you can now use PREFECT_LOGGING_HANDLERS_CONSOLE_FORMATTER="json" python myflow.py
Apart from that, the prefect cloud login
CLI has been given a serious upgrade. β¨ For a detailed description of that new Cloud CLI login UX, and to see a full list of fixes and enhancements, see the release notes.
Happy Engineering! PAnna Geller
Will Raphaelson
12/07/2022, 8:23 PMAnna Geller
PREFECT_EXTRA_ENTRYPOINTS
setting for module injection to allow registration of custom blocks (e.g. during CI/CD)
β’ improved timeout handling on the KubernetesJob
block - this timeout is now disabled by default (i.e. job_watch_timeout_seconds
is set to None
)
β’ fix for the prefect agent start --prefetch-seconds
command
β’ fix for the flow run URL from notification blocks
...and several more intricate enhancements and fixes you can read about in the release notes.
Big thanks to t-yuki
and padbk
for their first contributions. gratitude thank you
Happy engineering! prefect duckWill Raphaelson
12/13/2022, 5:51 PMKalise Richmond
12/13/2022, 8:26 PMKalise Richmond
12/14/2022, 5:36 PMAnna Geller
reschedule=True
argument to the pause_flow_run
utility to gracefully stop the infrastructure process and put the flow run back into a Scheduled
state. This way, you can free up the infrastructure resources instead of keeping the original flow run process running while waiting for approval.
π‘ Note that: this kind of rescheduling pauses works only with flow runs created from deployments and requires results persistence using either theYou can now also manually pause a flow run in progress using the new Pause button directly from the flow run UI page (see the screenshot below). Resuming a run can be performed also from the UI, or from code:option on the flow decorator, or by setting it globally:persist_result=True
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
from prefect import resume_flow_run, pause_flow_run
id_ = "pasy_your_flow_run_uuid_here"
pause_flow_run(id_, timeout=4200, reschedule=True)
# once you're ready to resume (you can do it from the UI, too):
resume_flow_run(id_)
Other enhancements and improvements in this release:
β’ Two new Prefect Collections: prefect-kubernetes kubernetes and prefect-bitbucket πͺ£
β’ Task Run Concurrency UI page now includes details and shows active task runs currently using allocated concurrency slots (see the second screenshot)
β’ New Opsgenie notifications block - kudos to Julien Lutran for contributing! gratitude thank you
β’ Fixes to imports and their performance
β’ New SecretDict
block field that allows to obfuscate nested values in a dictionary on blocks such as GcpCredentials
or FugueEngine
β’ If you have a flow call (e.g. myflow()
) in your flow script, not protected by an if __name__ == "__main__:
block, any run from that deployment will now work without giving you a warning about that unprotected call
β’ Several documentation improvements incl. new with_options
usage examples, recipe and tutorial references, Automations docs, and more!
For all enhancements and fixes, check the release notes - it even includes a Marvin joke. marvin duck
Happy engineering! πprefect duckChris Reuter
12/21/2022, 5:20 PMJeff Hale
12/22/2022, 10:37 PMJeff Hale
12/29/2022, 11:41 PM--start-in
or --start-at
e.g. prefect deployment run foo/test --start-at "3pm tomorrow"
β’ Get the logs for a flow run using prefect flow-run logs <flow run UUID>
e.g. prefect flow-run logs 7aec7a60-a0ab-4f3e-9f2a-479cd85a2aaf
Collection updates
β’ Added BigQueryWarehouse
block in prefect-gcp
v0.2.1
β’ Added AirbyteConnection
block in prefect-airbyte
v0.2.0
β’ Added dbt Cloud metadata API client to DbtCloudCredentials
in prefect-dbt
v0.2.7
As always, see the release notes for details.
Thank you to everyone for your feedback and especially to @ohadch and @mohitsaxenaknoldus for their first contributions!
Happy New Year! π₯³Kingsley Blatter
Anna Geller
validate
flag on the Block.load
method to facilitate block document migration, and PR 8056 allowed schema migration of block documents using Block.save
β’ when you register new block types with the prefect block register
CLI command, you will now get a link to the UI Blocks page to easily create new blocks (8017)
β’ when you try to save a block with an invalid name, you'll see better error messages (8038)
Apart from that, there are improvements and fixes to (among others!):
β’ flow timeouts (7993)
β’ prefect cloud login
(8034)
β’ Kubernetes job (8018) and its documentation (8044)
β’ SecretDict
serialization (8074)
β’ annotations (7263) and their serialization (8037)
β’ task run cache key size (7275)
β’ RRule schedule length - there is a limit now (7762)
β’ deployment troubleshooting - we added a log message when Prefect downloads flow code from storage and from which storage location (8075)
β’ the result factory, which now avoids creating unnecessary DB connections (8072)
β’ documentation - lots of improvements here, including not only docs but also a new parser and renderer (7855)
For all enhancements and fixes, check the release notes.
Happy engineering and have a great weekend!prefect duckAnna Geller
refresh_cache
on the task
decorator - this allows you to rerun the task and refresh the cached result. You can think of it as manually invalidating the cache for a given task when needed. You may also temporarily change your Prefect Setting to refresh all tasks without changing your code using the command: prefect config set PREFECT_TASKS_REFRESH_CACHE=true
3. Support for memory limits and privileged containers on the DockerContainer
infrastructure block.
Apart from that, there are improvements to client retriable exceptions, documentation, orjson
serialization, and more.
As always, check the release notes for more information.
Big thanks to toro-berlin
, muddi900
, ddelange
, and Ewande
for making their first contributions! PWill Raphaelson
01/18/2023, 8:13 PMAnna Geller
linux/amd64
and arm64
-based development images directly from DockerHub, for example:
docker pull prefecthq/prefect-dev:main-python3.10 # default: linux/amd64
docker pull prefecthq/prefect-dev:main-python3.10 --platform linux/arm64
There is also a new Prefect OpenAI collection for all AI fans!
We've also:
β’ G added a new interface to filter .git
files when using the GitHub
block
β’ terminal added the --head
and --num-logs
flags to the flow-run logs
CLI:
β¦ prefect flow-run logs run_uuid --head
will give you 20 log messages for a given flow run UUID
β¦ prefect flow-run logs run_uuid --num-logs 5
will give you 5 log messages for a given flow run UUID
β’ π fixed several bugs, a.o. one for blocks and one for Windows subprocesses
β’ π added a new recipe contribution documentation page
β’ aws added a new AWS Chalice recipe
Big thanks to ddelange for consistently working on contributing several PRs to add the arm64
-based Docker images as well as to mj0nez
, and ohadch
for contributing to Prefect!P
For all enhancements and fixes, check the release notes.
Happy engineering!prefect duckChris Reuter
01/20/2023, 7:34 PMAnna Geller
linux/amd64
and arm64
-based* images directly from DockerHub* as follows:
docker pull prefecthq/prefect:2-python3.10 # default: linux/amd64
docker pull prefecthq/prefect:2-python3.10 --platform linux/arm64
And to validate the Prefect version and architecture:
docker run --platform linux/arm64 prefecthq/prefect:2-latest prefect version
Shoutout to @ddelange who led implementation of the feature (#7902).
Further enhancements and fixes added in this release:
β’ added is_schedule_active
to the Deployment
object
β’ added Kubernetes permissions to the prefect agent YAML template to enable it to query the kube-system
namespace and retrieve the namespace UUID
β’ improved handling of long-running Kubernetes jobs with the KubernetesJob
infrastructure block
β’ added support for obscuring secrets in nested block fields in the UI
β’ several documentation fixes and enhancements incl. KubernetesJob
options, contribution page, keep-alive settings, and Prefect Cloud Quickstart guide.
Collection updates:
β’ prefect-openai got a new release v0.1.1
with a very cool new use case - you can expect a blog post about it soon!
β’ prefect-daskβs landing page has been entirely updated
For all enhancements and fixes, check the release notes. P
Happy engineering!prefect duckChris Reuter
01/31/2023, 4:43 PMjustabill
PREFECT_LOGGING_ORION_WHEN_MISSING_FLOW
to allow configuration of this behavior to silence the warning or raise an error as before. This means that you can attach Prefect's logging handler to existing loggers without breaking your workflows. Check it out:
from prefect import flow
import logging
my_logger = logging.getLogger("my-logger")
<http://my_logger.info|my_logger.info>("outside the flow")
@flow
def foo():
<http://my_logger.info|my_logger.info>("inside the flow")
if __name__ == "__main__":
foo()
You can even see messages from my-logger
in the UI with PREFECT_LOGGING_EXTRA_LOGGERS
its beautiful:
$ PREFECT_LOGGING_EXTRA_LOGGERS="my-logger" python example.py
example.py:6: UserWarning: Logger 'my-logger' attempted to send logs to Orion without a flow run id. The Orion log handler can only send logs within flow run contexts unless the flow run id is manually provided.
<http://my_logger.info|my_logger.info>("outside the flow")
18:09:30.518 | INFO | my-logger - outside the flow
18:09:31.028 | INFO | prefect.engine - Created flow run 'elated-curassow' for flow 'foo'
18:09:31.104 | INFO | my-logger - inside the flow
18:09:31.179 | INFO | Flow run 'elated-curassow' - Finished in state Completed()
Collection updates:
β’ prefect-shell got a new ShellOperation
block that runs a shell operation
β’ prefect-census got CensusSync
block that runs a census sync
Check out the release notes for information on the other enhancements and fixers in this release.
Special thanks to Chia Berry, @Hans Lellelid, and Tomek for their contributions to this release! β
Happy Engineering! marvinjustabill
flow_run_name
for flows, task_run_name
for tasks) accepts a string that will be used to create a run name for each run of the function. For example:
from datetime import datetime
from prefect import flow, task
@task(task_run_name="custom-static-name")
def my_task(name):
print(f"hi {name}")
@flow(flow_run_name="custom-but-fixed-name")
def my_flow(name: str, date: datetime):
return my_task(name)
my_flow()
In order to make these names dynamic, you can template them using the parameter names of the task or flow function, using all of the basic rules of Python string formatting as follows:
from datetime import datetime
from prefect import flow, task
@task(task_run_name="{name}")
def my_task(name):
print(f"hi {name}")
@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime):
return my_task(name)
my_flow()
Check out the Prefect release notes for more information.Ryan Peden
02/09/2023, 10:06 PMdefault-agent-pool
. Creating a new work pool can be done via the Work Pools page in the UI or via the CLI.
To create a new work pool named βmy-poolβ via the CLI:
prefect work-pool create "my-pool"
Each work pool starts out with a default queue. New queues can be added to a work pool via the UI or the CLI.
To create a new work queue in a work pool via the CLI:
prefect work-queue create "high-priority" --pool "my-pool"
Deployments can now be assigned to a work queue in a specific work pool. Use the --pool
flag to specify the work pool and the --queue
flag to specify the work queue when building a deployment.
prefect deployment build \
--pool my-pool \
--queue high-priority \
--name high-priority \
high_priority_flow.py:high_priority_flow
Once a deployment has been created and is scheduling flow runs on a work queue, you can start an agent to pick up those flow runs by starting an agent with the --pool
flag.
prefect agent start --pool my-pool
Starting an agent with the --pool
command allows the agent to pick up flow runs for the entire pool even as new queues are added to the pool. If you want to start an agent that only picks up flow runs for a specific queue, you can use the --queue
flag.
prefect agent start --pool my-pool --queue high-priority
A few other important parts of this release:
β’ The ability to create a flow run from the UI with parameters from a previous run β #8405
β’ A generic Webhook
block β #8401
β’ Override customizations functionality for CLI deployments vβ #8349
β’ It's now possible to reset concurrency limits in CLI to purge existing runs from taking concurrency slots β #8408
To learn more about work pools, and everything else we've added or updated, check out the docs or see the relevant pull requests in the release notes.Chris Reuter
02/16/2023, 3:19 PMRyan Peden
02/16/2023, 7:12 PMprefect config validate
command.
A few other key fixes and enhancements:
β’ We added a MattermostWebhook
notification block.
β’ You can now pass RRule strings to --rrule
option in prefect set-schedule
command. The JSON input it used to require still works, too!
β’ Default deployment parameters now populate correctly in the UI.
β’ We fixed the ability to use anchor dates when setting an interval schedule with the prefect set-schedule
command.
β’ The docs now have examples of custom automation triggers!
Finally, we had two new contributors:
β’ @qheuristics made their first contribution in #8478
β’ @KernelErr made their first contribution in #8485
See the release notes for a complete list of enhances, fixes, and deprecations in Prefect 2.8.1.Kalise Richmond
02/16/2023, 8:13 PMprefect-dbt
v0.3.0 was just released! π
This release includes:
β’ two new blocks: DbtCloudJob
and DbtCoreOperation
, making it easier to store configs and trigger+monitor both dbt Cloud jobs and Core CLI commands.
β’ run_dbt_job_flow
which incorporates the DbtCloudJob
block.
β’ support for the new SqlAlchemyConnector
block when using dbt Core with Postgres database.
β’ bug fixes, including saving / loading target config profiles, proper typing of log_format in GlobalConfigs
β’ Check out the release notes for more!
Check out the new landing page to get started on these new blocks!Ryan Peden
02/17/2023, 11:18 PMprefect
module init β #8569
Documentation
β’ Fix logging format override example β #8565
Experimental
β’ Add events client to PrefectClient
β #8546Ryan Peden
02/24/2023, 1:01 AMCompleted
or Failed
state. This is great for any case where you want to execute code without involvement of the Prefect API.
Both flows and tasks include on_completion
and on_failure
options where a list of callable hooks can be provided. The callable will receive three arguments:
β’ flow
, flow_run
, and state
in the case of a flow hook
β’ task
, task_run
, and state
in the case of a task hook
For example, here we add completion hooks to a flow and a task:
from prefect import task, flow
def my_completion_task_hook_1(task, task_run, state):
print("This is the first hook - Task completed!!!")
def my_completion_task_hook_2(task, task_run, state):
print("This is the second hook - Task completed!!!")
def my_completion_flow_hook(flow, flow_run, state):
print("Flow completed!!!")
@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
print("This is the task!")
@flow(on_completion=[my_completion_flow_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
Next, weβll include a failure hook as well. Itβs worth noting that you can supply both on_completion
and on_failure
hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called.
from prefect import task, flow
def my_task_completion_hook(task, task_run, state):
print("Our task completed successfully!")
def my_task_failure_hook(task, task_run, state):
print("Our task failed :(")
@task(on_completion=[my_task_completion_hook], on_failure=[my_task_failure_hook])
def my_task():
raise Exception("Oh no!")
@flow
def my_flow():
my_task.submit()
if __name__ == "__main__":
my_flow()
Key enhancements and fixes
Other highlights of this weekβs enhancements and fixes include:
β’ We added light and dark mode color and contrast enhancements to the UI. #8629
β’ The Task.map
type hint now provides type-checker compatibility for async tasks. β #8607
β’ The Kubernetes Job block now correctly handles timeouts when streaming logs. β #8618
β’ We fixed date range filter selection on the flow runs UI page. β #8616
See the release notes on GitHub for a full list of this weekβs changes!
Contributors
Thanks to the external contributors to this weekβs release!
β’ New contributor @jefflaporte added support for log streaming from multiple containers in a K8s job.
β’ New contributor @AzemaBaptiste added the `prefect-sifflet` collection to our collections catalog.
β’ Veteran contributor @darrida improved the Docker infrastructure blockβs handling of Docker version names.Chris Reuter
02/28/2023, 6:28 PM