Nicolas van de Walle
07/09/2020, 2:55 PMprefect agent install kubernetes -t <MY_TOKEN> -n prefect --label prefect-namespace --rbac | kubectl apply -n prefect -f -Laura Lorenz (she/her)
07/09/2020, 3:07 PM--labelsprefect-namespaceNicolas van de Walle
07/09/2020, 3:22 PMenvironment = DaskKubernetesEnvironment(labels=["prefect-namespace"])
with Flow("Example Flow Without Mail", environment=environment) as flow:
    ...Laura Lorenz (she/her)
07/09/2020, 3:30 PMlabels=["prefect-namespace"]ScheduledNicolas van de Walle
07/09/2020, 3:32 PM____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/
[2020-07-09 14:50:43,735] INFO - agent | Starting KubernetesAgent with labels ['prefect-namespace']
[2020-07-09 14:50:43,736] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-07-09 14:50:43,736] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
[2020-07-09 14:50:44,019] INFO - agent | Waiting for flow runs...Nicolas van de Walle
07/09/2020, 3:34 PMLaura Lorenz (she/her)
07/09/2020, 3:40 PM"prefect-namespace"Nicolas van de Walle
07/09/2020, 3:42 PMNicolas van de Walle
07/09/2020, 3:46 PMimport prefect
from prefect import task, Flow
from random import choice
from prefect.tasks.control_flow.case import case
from prefect.tasks.control_flow.conditional import merge
from time import sleep
from prefect.schedules.schedules import IntervalSchedule
from datetime import timedelta
from prefect import Client
from prefect.environments import LocalEnvironment
from prefect.environments import DaskKubernetesEnvironment
@task
def hello_task():
    log('Setting up tasks and waiting for 1s')
    sleep(1)
@task
def is_true_task() -> None:
    log('Choosing branch')
    return choice([True, False]) # Chooses a random boolean value
@task
def first_task() -> int:
    log('Executing first_task')
    return 1234
@task
def second_task() -> int:
    log('Executing second_task')
    return 5678
@task
def end_task(x: int):
    log('This is the end. Final value is {}'.format(x))
@task
def run_in_parrallel():
    log('Running concurrently')
def log(msg: str) -> None:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(msg)
environment = DaskKubernetesEnvironment(labels=["prefect-namespace"])
with Flow("Example Flow", environment=environment) as flow:
    hello = hello_task()
    cond = is_true_task()
    hello.set_downstream(cond) 
    with case(cond, True):
        val1 = first_task()
    with case(cond, False):
        val2 = second_task()
    parallel = run_in_parrallel()
    val = merge(val1, val2, parallel)
    
    end = end_task(val)
flow.register(project_name="Test project 1", labels=["prefect-namespace"])Laura Lorenz (she/her)
07/09/2020, 3:50 PMNicolas van de Walle
07/09/2020, 3:53 PMLaura Lorenz (she/her)
07/09/2020, 4:03 PMNicolas van de Walle
07/09/2020, 4:27 PMLaura Lorenz (she/her)
07/09/2020, 4:46 PM