Mike Grabbe
07/26/2022, 5:39 PMChris Reuter
07/26/2022, 5:41 PMTaylor Curran
07/26/2022, 5:44 PMfrom prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed
from prefect.blocks.notifications import SlackWebhook
from prefect.tasks import task_input_hash
from datetime import timedelta
from prefect import tags
import os
import random
@task(
name="Always Succeeds Task",
version=os.getenv("GIT_COMMIT_SHA")
)
def always_succeeds_task():
return "foo"
@task(
name="Depnds on AST",
)
def depends_on_ast(ast):
if ast == 'foo':
return "fa"
else:
return "na?"
@task(
name="Often Fails Task",
retries=20
)
def often_fails_task():
"""A task that benefits from task retries"""
outcome = random.choice(['Fail', 'Success', 'Fail', 'Fail', 'Fail'])
if outcome == 'Fail':
raise Exception('Random Choice was Failure')
elif outcome == 'Success':
return 'Success!!'
@task(
name="Very Large Computation",
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=30)
)
def large_computation(small_int):
"""A task that benefits from """
for i in range(1000000):
j = i + small_int
print('Done large computation!')
return small_int * 5
@task(
name="Follows Large Computation",
)
def follows_large_computation(result_from_lc, succeed=True):
if succeed == True:
output = result_from_lc / 2
return output
else:
raise Exception("I am bad task")
@task(
name="Second After Large Computation",
)
def second_after_large_computation(result_from_flc):
output = result_from_flc
return output
@task(
name="Task with Tag",
tags=['Specific_Tag']
)
def task_with_tag():
"""A task that is called by virtue of its tag."""
print("This is a task with a Specific Tag")
@task(name="Always Fails Task")
def always_fails_task():
raise Exception("I am bad task")
@flow(name="Sub Flow")
def sub_flow():
print("Sub Flow")
@flow(name="Demo Flow")
def demo_flow(desired_outcome='Success'):
ast = always_succeeds_task.submit()
depends_on_ast.submit(ast)
sub_flow()
often_fails_task.submit()
task_result_0 = large_computation.submit(5)
if desired_outcome == 'Fail':
task_result_1 = follows_large_computation.submit(task_result_0, False)
else:
task_result_1 = follows_large_computation.submit(task_result_0)
second_after_large_computation.submit(task_result_1)
# TODO - ca marche pas
if task_result_1.get_state().type != 'COMPLETED':
slack_webhook_block = SlackWebhook.load('demo_slack_block')
slack_webhook_block.notify("Hello from Prefect! Your task failed!! :(")
with tags('Specific_Tag'):
task_with_tag.submit()
if desired_outcome == 'Fail':
always_fails_task.submit()
always_succeeds_task.submit()
print('Done!')
if __name__ == "__main__":
demo_flow('Fail')
"""Where we define the depoloyment object."""
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect.deployments import FlowScript
Deployment(
name="staging",
flow=FlowScript(
path="vanilla_flow/my_flow_code.py",
name="Demo Flow",
),
parameters={
'desired_outcome': (['Fail', 'Success'][1])
},
infrastructure=Process(),
# infrastructure=KubernetesJob(
# image_pull_policy=KubernetesImagePullPolicy.IF_NOT_PRESENT,
# env=dict(PREFECT_LOGGING_LEVEL="DEBUG"),
# )
tags=['Demo']
)
Mike Grabbe
07/26/2022, 5:45 PM