Hi all. I'm currently trying to refactor some pre...
# prefect-community
s
Hi all. I'm currently trying to refactor some prefect 1 code into prefect 2, and I'm trying to make use of some states in flows and tasks I'm currently developing, as noted here : https://orion-docs.prefect.io/concepts/states/#state-details However, I'm not entirely sure how this is meant to be implmented in, lets say a @flow function object to get information on that particular flow/task. Is there perhaps an example I can work with to expand on this on my side?
1
z
Can you share a little more about what your goal is?
Within the flow/task function, you can get informatinon about the particular flow/task with
get_run_context()
s
Hi. I'll try to get back to you asap.
a
@Sang Young Noh haven't we talked about it before? we have this Discourse tag with migration examples For flow definition, check this but if you have any specific questions, please ask
s
Hi Anna. It was more of controlling the prefect context to set some log messages, but it seems that I have not quite yet understood the prefect 1 architecture we have currently properly, which is why my questions sounds terrible haha. I will research this today and get back to you
a
awesome, keep us posted!
s
Hi Anna. just as a follow up, I was thinking on the question and basically, I wish to have a flow where if I have a late flow that has not been activated, then after a certain period I wish to skip it. So in prefect 1, The design is something this: if prefect.context["scheduled_start_time"] + max_late < datetime.datetime.now(tz=pytz.utc): raise SKIP("No backfilling late runs, will lead to server instability") Here, the SKIP from what I presume, will prevent late runs from being run. My impression is that it can be done with https://orion-docs.prefect.io/api-ref/prefect/futures/ and a combination of raising exceptions?
a
and yes 100%, you can totally customize the control flow using if/else, raising exceptions and retrieving results from a future - in Prefect 2.0 it's basically Python
s
So if I were to set a deployment with a schedule attached, I can access the start_time of that particular flow with the get_run_context() yes? and from that I can use the example you gave to give it an exception or not based on the current datetime?
z
Copy code
from datetime import timedelta

import pendulum
from prefect import flow, task
from prefect.context import FlowRunContext, get_run_context
from prefect.states import Failed


MAX_SECONDS_LATE = 10


@task
def my_task():
    pass


@flow
def my_flow():
    ctx: FlowRunContext = get_run_context()
    seconds_late = (
        pendulum.now().diff(ctx.flow_run.expected_start_time, abs=False).in_seconds()
        * -1
    )

    if seconds_late > 10:
        return Failed(message=f"Flow started {seconds_late} seconds late.")

    my_task()



from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner

DeploymentSpec(
    flow=my_flow,
    flow_runner=SubprocessFlowRunner(),
    schedule={"interval": timedelta(seconds=20)},
)
s
Ah! Thank you. I'll have a look and get back to you.
Hello! When you define a work-queue with lets say a daskrunner, and the deployment was initially deployed with a Subprocessrunner, does the one in the work-queue overwrite the subprocessrunner initially defined?
a
Task runners such as
DaskTaskRunner
define how to run tasks. On deployment spec you specify flow runners. And the work queue is essentially a filter to determine which flow runs should be picked up e.g. pick up only deployments with tag "ubuntu" on ubuntu machine, or pick up only deployments with DockerFlowRunner on a VM that has Docker configured
s
Ah, so if I understand correctly, if I have a work-queue with the dasktaskrunner defined, then it will only filter out the deployments with dasktaskrunner defined?
a
you can't have a work queue with a dask task runner
work queue can filter for tags, deployment IDs and flow runners, not task runners
task runners is something you define on your flow decorator
can you explain the problem you try to solve this way? maybe what you are looking for is setting a tag "dask" on the respective deployments? there's also this issue that shows we may add a task runner setting on a deployment spec but I don't think you will be able ti use that for a work queue https://github.com/PrefectHQ/prefect/issues/5560
s
I think I have task runners and flow runners a bit confused here
a
not possible atm 😄 prove me wrong please, maybe I'm behind
s
So with work-queues, it can filter out between deployments with
UniversalFlowRunner, SubprocessFlowRunner, DockerFlowRunner, KubernetesFlowRunner
, in addition to any tags that deployment has, if my understand is correct
👍 1
a
maybe you can cross-check the concept docs re task runners and flow runners here https://orion-docs.prefect.io/concepts/task-runners/
correct
s
Ok, so that is a completely independent issue with dasktaskrunners, which are only to do with tasks within a flow
Yeah, that was a senior moment from me. I get it now haha
a
awesome 😄 haha
I actually thought that this has become a bit more self-explanatory given that task runners = running tasks, flow runners = running flows in Prefect 1.0 this was less obvious because executors were responsible for submitting task runs to the execution layer
s
Yeah I think that may be where the confusion came in - I am trying to refactor prefect 1 to 2 after all
👍 1
Also just in the case of prefect 1 to 2 -> I'm guessing the development of 2 is with the intention of prefect 1 eventually being deprecated? Do you know what the timescale is for this?
a
you can see that in the Orion docs FAQ - the plan is roughly one year after 2.0 comes out of beta afaik
s
Ok I see