Hi :slightly_smiling_face: can I run a Flow on an ...
# ask-community
e
Hi 🙂 can I run a Flow on an agent without registering it before?
a
For local development, you can absolutely run your flow without registering it - either by calling
flow.run()
or by leveraging the
prefect run
CLI. But when you want this flow to be scheduled and orchestrated by Prefect Cloud or Prefect Server, then you need to register it. Does this answer your question? 🙂
e
I want to run a flow manually but not registering it because it might not be run again, so I don’t really want to register it But I don’t want the flow to run on my local server, but on the agent so the server won’t use too much resources I might run it on a dask cluster to have the same effect of not having a heavy task on my server
a
@Eric Feldman If you want to run an arbitrary script without registering it, you could currently do it as a shell task. Let’s say, you use a local agent and you want to run it on this agent without needing any visibility into what this script is doing:
Copy code
from prefect import Flow, Parameter
from prefect.tasks.shell import ShellTask

shell = ShellTask(return_all=True)

with Flow("shell-flow") as Flow:
    script = Parameter("script",default="your_script.py")
    shell(command=f"python {script}")
You would need to register this flow, but then, you can call any script from it by overwriting the parameter value. Alternatively, you could have a bunch of flows that you could register all at once using CLI:
prefect register --project your_project -p /path/to/your/flows/
You can also check Orion - what you describe will be even easier to accomplish in Prefect in the future.
e
using Orion is too premature for me, so I don’t think this is the solution Otherwise, I think that running it on a dask cluser is the way to go thanks!
👍 1
k
Yeah, I believe Anna is right that you would need to register anything to run it on an agent.
e
I tried to have a
DaskExecutor
(with a local cluster running on threads) and when I run a flow it is blocks the main thread when I’m doing the same thing directly with Dask, it isn’t block the main thread (
submit
isn’t blocking) can it be done?
I see in the config now that
Flow.run
is blocking So I don’t have any way to run a flow in a non-blocking way other then register it?
dose it make sense to an implantation of
FlowRunner
that dosents waits for the dask future?
k
It can’t be done with
flow.run()
but you are right a scheduled flow run. You need to register it for sure. There is agentless execution as seen here but it still requires the Flow to be registered.
flow.run()
is not really meant for production. Have you seen the Orion announcement though? I think we will be moving in a direction that doesn’t have separation of registration and runtime
e
yes i saw the announcement, but it is still in alpha so i dont really want to use it yet I started by having a
NonblockingFlowRunner
that isn’t waits for the tasks (fire and forget way) - but I still have some issues that i’m trying to fix for now it seems that the solution for me is to use native dask for non-registered tasks and prefect for the other I would have register the task and run it using the client cli, but as far as i understand the hash is the same if the code changes - and I dont want to have multiple versions of the same api my scenario isn’t really ETL like here, I want to have a celery-like task, run the heavy task not in the server thread for example, I build the DAG each request (lets say I have a download route that puts a file in S3) I might build all of the flows in the deployment phase, but i would like to use it like celery, build a DAG in RT and put it in the queue
a
@Eric Feldman Can you tell us more about the problem you try to solve? Once we have a clear understanding of the problem, then it's easier to move forward. Perhaps you can share an anonymized mini-version of your current solution, as well. We could then find out together what's the best way to tackle it in Prefect.
e
I have a server that I want the CPU bound tasks to run somerwhere else until now I used Celery for that, and now I trying to see if we can move to Prefect in Celery I had to “register” only 1 task that gets a callable and parameters (kinda like
run_task
in
FlowRunner
) , and I could build DAGs using this task The DAGs are dynamic, meaning the same route might result in different DAGs for different values, I know the DAG only in runtime. The current solution I have using Prefect is to create a DAG and register it for every call, but it isn’t really working because it isn’t detects code changes, or its create too many versions
a
Awesome, thanks for sharing @Eric Feldman! Let's unpack this and see how we can do it in the best way in Prefect: • When it comes to Celery, the way it works is that it can distribute tasks from the queue across workers based on worker queues. ◦ In Prefect, you could accomplish the same by assigning labels to your flows and agents. ◦ Flows will be then picked up by agents with matching labels. • When it comes to building conditional, dynamic, and runtime discoverable logic: ◦ Prefect has a
case
statement that allows you to build conditional logic based on runtime discoverable conditions. This documentation page has several examples showing how it can be used. â—¦ Prefect has a
Parameter
task that allows you to build custom logic based on the runtime discoverable value of those parameters. Overall, registration is only important to register the flow structure. But how the flow behaves can vary based on runtime discoverable information e.g. in Parameters or conditional logic.
Additionally, if you use Script-based storage, it's possible that your task code can change, and you don't have to reregister it, as long as your Flow structure doesn't change.
e
• yeah, I know about the agents but it is still not solves that issue that the flow needs to be registered (and the inner issue that i’ll have too many versions) • its not really a case - lets say parent1 has 5 children 2 grandchildren, the DAG will have 8 tasks, and parent2 has only 2 children and no grandchildren so the DAG will have 3 tasks. I need to have a task per person and conditions are not really helping me here regards the script-base storage, since the flow is built in RT base on the person data in my DB, I don’t understand how can it work for me I might be trying to use Prefect for something it shouldn’t be used to - its not an ETL that runs multiple times, its tasks that can’t run on the main thread because there are CPU bound
a
Thanks for explaining a bit more, @Eric Feldman. Are you trying to move to Prefect from some other system, e.g. from Celery/Airflow/some message queue? I think the registering and versioning is only confusing you. You can think of it as letting Prefect know what to do, how to do it and where to do it. When you submit your tasks to Celery or message queue, you would also need to communicate this information to the system in some way. You can think of flow registration as something equivalent: • you let Prefect know which tasks to do - your business logic defined in the flow, • in which order - dependencies between tasks, • where your flow is (storage) and how to run your task (executor ex. DaskExecutor). But you're right, it's possible that Prefect is not the best tool for the job in your use case. I think I still don't know enough about the problem you try to solve to determine that. Perhaps you can share more about the problem - you mentioned it's CPU-bound task. Is this something Dask distributed would be good for? If so, Prefect mapping would be an option worth considering. And when it comes to parent-child flow, you could have a look at Running dependent flows and this blog post.
e
Are you trying to move to Prefect from some other system,
I’m trying to Prefect from Celery yes In Celery I’m submitting only 1 task (a general task that runs callables) - i’m not submitting the final flow - Celery isn’t familiar with the business logic, it only runs callables outside of the main thread I’m always gonna need to rebuild the DAG using the 1 task and push it to the queue so the celery workers will execute it If I could run a flow in the cluster without having to register And when I submit a task to Celery (wrapping a method with
celery.task
, it happens in the first setup) it dosen’t have the history of all of the prevues versions)
Is this something Dask distributed would be good for?
my assumption that it is I had a good POC using Dask client without Prefect I think that the main issue that I have 1 server route that can create multiple flows, 1 flow for each user and the tasks are different in those flows (but they are all built the in the same way, it is just depends on the family graph for example) and those flows might right only 1 time and they can run 100 times - and time this route is being called, I want to have the current version of code
k
Gotcha ok. I think what we are converging at here is that Prefect-Celery isn’t so analogous. So Airflow can use Celery as a task queue to parallelize tasks. Similarly, Prefect uses Dask. You are also finding success using the Dask Client without Prefect, so at this point, I am think that the DAG requirement is restricting you, which makes sense because Dask and Celery are more optimized to be task queues while Prefect is designed to be a batch scheduler first. These limitations are why we are moving away from the DAG requirement in Prefect 2.0. You also mentioned versioning as an issue and we agree with that, which is why moving forward versions will be provided by users in Prefect 2.0. At this point though, I am also wondering if Prefect is the wrong tool for this. But let’s try seeing what we can do in current Prefect. So there are two things here. The first one is just running ad-hoc in an async way where you send it to a different thread (without registering). The second one is that there are difficulties with registration. The registration is the one I have seen workarounds for before (explained below). For the ad-hoc execution on a different thread, maybe it’s a stupid question but I am wondering why you can’t start a new terminal and run it? Using
flow.run()
will still used the attached
DaskExecutor
so it should still be lightweight on your machine if you are pushing things to Dask. The issue is that there is some dynamic code and you want Prefect to be able to change the execution without re-registering. This can be done, but I think it’s quite tricky. What you need to do is use Docker and make the
ENTRYPOINT
clone a git repo, pip install in editable, and then your Flow that runs on top of that image will now have the latest code. This way, the code can change because it’s just installed on the container during runtime. From there, you can do
client.submit()
like this:
Copy code
from dask.distributed import worker_client


@task
def calling_compute_in_a_task(filepath):
    with worker_client():
        df = dd.read_csv(filepath)
        return df.describe().compute()


@task(checkpoint=False)
def using_checkpoint_false(filepath):
    with worker_client():
        return dd.read_csv(filepath)

@task
def compute_describe(df):
    with worker_client():
        return df.describe().compute()
upvote 1
e
why you can’t start a new terminal and run it?
I want the execution of the task to be detached from the k8s pod of the server I think I explained my self poorly and my examples where not that good haha but I have a better explanation basically I have 2 types of tasks in my system • background tasks - CPU bound task that I don’t want the server to handle • pipeline tasks - A pipeline that can be run manually and recurrent using scheduler right now both types of tasks are running in Celery, and the way to represent tasks are using DAGs (even the simple background task) we are facing a lot of issues with Celery in the pipeline tasks, so we want to try different solutions Prefect sounds amazing for the pipeline tasks solution, but as for the background tasks solution I need to change too many things and it isn’t that straight forwards as I wound want one solution that I had in mind is to run the background tasks directly on the Dask cluster, and for the pipeline tasks using Prefect with the same cluster
k
Yeah I think we’re in agreement here 🙂 Though hopefully in the future Orion will enable those background tasks
🙌 1