https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Sang Young Noh

05/12/2022, 3:50 PM
Hi all. I'm currently trying to refactor some prefect 1 code into prefect 2, and I'm trying to make use of some states in flows and tasks I'm currently developing, as noted here : https://orion-docs.prefect.io/concepts/states/#state-details However, I'm not entirely sure how this is meant to be implmented in, lets say a @flow function object to get information on that particular flow/task. Is there perhaps an example I can work with to expand on this on my side?
1
z

Zanie

05/12/2022, 3:53 PM
Can you share a little more about what your goal is?
Within the flow/task function, you can get informatinon about the particular flow/task with
get_run_context()
s

Sang Young Noh

05/12/2022, 3:55 PM
Hi. I'll try to get back to you asap.
a

Anna Geller

05/12/2022, 7:26 PM
@Sang Young Noh haven't we talked about it before? we have this Discourse tag with migration examples For flow definition, check this but if you have any specific questions, please ask
s

Sang Young Noh

05/13/2022, 6:45 AM
Hi Anna. It was more of controlling the prefect context to set some log messages, but it seems that I have not quite yet understood the prefect 1 architecture we have currently properly, which is why my questions sounds terrible haha. I will research this today and get back to you
a

Anna Geller

05/13/2022, 11:19 AM
awesome, keep us posted!
s

Sang Young Noh

05/13/2022, 11:19 AM
Hi Anna. just as a follow up, I was thinking on the question and basically, I wish to have a flow where if I have a late flow that has not been activated, then after a certain period I wish to skip it. So in prefect 1, The design is something this: if prefect.context["scheduled_start_time"] + max_late < datetime.datetime.now(tz=pytz.utc): raise SKIP("No backfilling late runs, will lead to server instability") Here, the SKIP from what I presume, will prevent late runs from being run. My impression is that it can be done with https://orion-docs.prefect.io/api-ref/prefect/futures/ and a combination of raising exceptions?
a

Anna Geller

05/13/2022, 11:20 AM
and yes 100%, you can totally customize the control flow using if/else, raising exceptions and retrieving results from a future - in Prefect 2.0 it's basically Python
s

Sang Young Noh

05/13/2022, 3:27 PM
So if I were to set a deployment with a schedule attached, I can access the start_time of that particular flow with the get_run_context() yes? and from that I can use the example you gave to give it an exception or not based on the current datetime?
z

Zanie

05/13/2022, 3:42 PM
Copy code
from datetime import timedelta

import pendulum
from prefect import flow, task
from prefect.context import FlowRunContext, get_run_context
from prefect.states import Failed


MAX_SECONDS_LATE = 10


@task
def my_task():
    pass


@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    seconds_late = (
        pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
        * -1
    )

    if seconds_late > 10:
        return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()



from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner

DeploymentSpec(
    flow=my_flow,
    flow_runner=SubprocessFlowRunner(),
    schedule={"interval": timedelta(seconds=20)},
)
s

Sang Young Noh

05/13/2022, 3:45 PM
Ah! Thank you. I'll have a look and get back to you.
Hello! When you define a work-queue with lets say a daskrunner, and the deployment was initially deployed with a Subprocessrunner, does the one in the work-queue overwrite the subprocessrunner initially defined?
a

Anna Geller

05/17/2022, 11:12 AM
Task runners such as
DaskTaskRunner
define how to run tasks. On deployment spec you specify flow runners. And the work queue is essentially a filter to determine which flow runs should be picked up e.g. pick up only deployments with tag "ubuntu" on ubuntu machine, or pick up only deployments with DockerFlowRunner on a VM that has Docker configured
s

Sang Young Noh

05/17/2022, 11:14 AM
Ah, so if I understand correctly, if I have a work-queue with the dasktaskrunner defined, then it will only filter out the deployments with dasktaskrunner defined?
a

Anna Geller

05/17/2022, 11:15 AM
you can't have a work queue with a dask task runner
work queue can filter for tags, deployment IDs and flow runners, not task runners
task runners is something you define on your flow decorator
can you explain the problem you try to solve this way? maybe what you are looking for is setting a tag "dask" on the respective deployments? there's also this issue that shows we may add a task runner setting on a deployment spec but I don't think you will be able ti use that for a work queue https://github.com/PrefectHQ/prefect/issues/5560
s

Sang Young Noh

05/17/2022, 11:19 AM
I think I have task runners and flow runners a bit confused here
a

Anna Geller

05/17/2022, 11:19 AM
not possible atm 😄 prove me wrong please, maybe I'm behind
s

Sang Young Noh

05/17/2022, 11:20 AM
So with work-queues, it can filter out between deployments with
UniversalFlowRunner, SubprocessFlowRunner, DockerFlowRunner, KubernetesFlowRunner
, in addition to any tags that deployment has, if my understand is correct
👍 1
a

Anna Geller

05/17/2022, 11:20 AM
maybe you can cross-check the concept docs re task runners and flow runners here https://orion-docs.prefect.io/concepts/task-runners/
correct
s

Sang Young Noh

05/17/2022, 11:21 AM
Ok, so that is a completely independent issue with dasktaskrunners, which are only to do with tasks within a flow
Yeah, that was a senior moment from me. I get it now haha
a

Anna Geller

05/17/2022, 11:23 AM
awesome 😄 haha
I actually thought that this has become a bit more self-explanatory given that task runners = running tasks, flow runners = running flows in Prefect 1.0 this was less obvious because executors were responsible for submitting task runs to the execution layer
s

Sang Young Noh

05/17/2022, 11:41 AM
Yeah I think that may be where the confusion came in - I am trying to refactor prefect 1 to 2 after all
👍 1
Also just in the case of prefect 1 to 2 -> I'm guessing the development of 2 is with the intention of prefect 1 eventually being deprecated? Do you know what the timescale is for this?
a

Anna Geller

05/17/2022, 11:46 AM
you can see that in the Orion docs FAQ - the plan is roughly one year after 2.0 comes out of beta afaik
s

Sang Young Noh

05/17/2022, 11:47 AM
Ok I see