Eric Feldman
10/18/2021, 11:23 AMAnna Geller
flow.run()
or by leveraging the prefect run
CLI.
But when you want this flow to be scheduled and orchestrated by Prefect Cloud or Prefect Server, then you need to register it.
Does this answer your question? 🙂Eric Feldman
10/18/2021, 11:42 AMAnna Geller
from prefect import Flow, Parameter
from prefect.tasks.shell import ShellTask
shell = ShellTask(return_all=True)
with Flow("shell-flow") as Flow:
script = Parameter("script",default="your_script.py")
shell(command=f"python {script}")
You would need to register this flow, but then, you can call any script from it by overwriting the parameter value.
Alternatively, you could have a bunch of flows that you could register all at once using CLI: prefect register --project your_project -p /path/to/your/flows/
You can also check Orion - what you describe will be even easier to accomplish in Prefect in the future.Eric Feldman
10/18/2021, 11:57 AMKevin Kho
Eric Feldman
10/19/2021, 8:04 AMDaskExecutor
(with a local cluster running on threads) and when I run a flow it is blocks the main thread
when I’m doing the same thing directly with Dask, it isn’t block the main thread (submit
isn’t blocking)
can it be done?Eric Feldman
10/19/2021, 8:20 AMFlow.run
is blocking
So I don’t have any way to run a flow in a non-blocking way other then register it?Eric Feldman
10/19/2021, 1:59 PMFlowRunner
that dosents waits for the dask future?Kevin Kho
flow.run()
but you are right a scheduled flow run. You need to register it for sure. There is agentless execution as seen here but it still requires the Flow to be registered. flow.run()
is not really meant for production. Have you seen the Orion announcement though? I think we will be moving in a direction that doesn’t have separation of registration and runtimeEric Feldman
10/19/2021, 2:27 PMNonblockingFlowRunner
that isn’t waits for the tasks (fire and forget way) - but I still have some issues that i’m trying to fix
for now it seems that the solution for me is to use native dask for non-registered tasks and prefect for the other
I would have register the task and run it using the client cli, but as far as i understand the hash is the same if the code changes - and I dont want to have multiple versions of the same api
my scenario isn’t really ETL like here, I want to have a celery-like task, run the heavy task not in the server thread
for example, I build the DAG each request (lets say I have a download route that puts a file in S3)
I might build all of the flows in the deployment phase, but i would like to use it like celery, build a DAG in RT and put it in the queueAnna Geller
Eric Feldman
10/19/2021, 2:41 PMrun_task
in FlowRunner
) , and I could build DAGs using this task
The DAGs are dynamic, meaning the same route might result in different DAGs for different values, I know the DAG only in runtime. The current solution I have using Prefect is to create a DAG and register it for every call, but it isn’t really working because it isn’t detects code changes, or its create too many versionsAnna Geller
case
statement that allows you to build conditional logic based on runtime discoverable conditions. This documentation page has several examples showing how it can be used.
â—¦ Prefect has a Parameter
task that allows you to build custom logic based on the runtime discoverable value of those parameters.
Overall, registration is only important to register the flow structure. But how the flow behaves can vary based on runtime discoverable information e.g. in Parameters or conditional logic.Anna Geller
Eric Feldman
10/19/2021, 4:18 PMAnna Geller
Eric Feldman
10/19/2021, 5:01 PMAre you trying to move to Prefect from some other system,I’m trying to Prefect from Celery yes In Celery I’m submitting only 1 task (a general task that runs callables) - i’m not submitting the final flow - Celery isn’t familiar with the business logic, it only runs callables outside of the main thread I’m always gonna need to rebuild the DAG using the 1 task and push it to the queue so the celery workers will execute it If I could run a flow in the cluster without having to register And when I submit a task to Celery (wrapping a method with
celery.task
, it happens in the first setup) it dosen’t have the history of all of the prevues versions)
Is this something Dask distributed would be good for?my assumption that it is I had a good POC using Dask client without Prefect I think that the main issue that I have 1 server route that can create multiple flows, 1 flow for each user and the tasks are different in those flows (but they are all built the in the same way, it is just depends on the family graph for example) and those flows might right only 1 time and they can run 100 times - and time this route is being called, I want to have the current version of code
Kevin Kho
flow.run()
will still used the attached DaskExecutor
so it should still be lightweight on your machine if you are pushing things to Dask.
The issue is that there is some dynamic code and you want Prefect to be able to change the execution without re-registering. This can be done, but I think it’s quite tricky. What you need to do is use Docker and make the ENTRYPOINT
clone a git repo, pip install in editable, and then your Flow that runs on top of that image will now have the latest code. This way, the code can change because it’s just installed on the container during runtime. From there, you can do client.submit()
like this:
from dask.distributed import worker_client
@task
def calling_compute_in_a_task(filepath):
with worker_client():
df = dd.read_csv(filepath)
return df.describe().compute()
@task(checkpoint=False)
def using_checkpoint_false(filepath):
with worker_client():
return dd.read_csv(filepath)
@task
def compute_describe(df):
with worker_client():
return df.describe().compute()
Eric Feldman
10/19/2021, 6:06 PMwhy you can’t start a new terminal and run it?I want the execution of the task to be detached from the k8s pod of the server I think I explained my self poorly and my examples where not that good haha but I have a better explanation basically I have 2 types of tasks in my system • background tasks - CPU bound task that I don’t want the server to handle • pipeline tasks - A pipeline that can be run manually and recurrent using scheduler right now both types of tasks are running in Celery, and the way to represent tasks are using DAGs (even the simple background task) we are facing a lot of issues with Celery in the pipeline tasks, so we want to try different solutions Prefect sounds amazing for the pipeline tasks solution, but as for the background tasks solution I need to change too many things and it isn’t that straight forwards as I wound want one solution that I had in mind is to run the background tasks directly on the Dask cluster, and for the pipeline tasks using Prefect with the same cluster
Kevin Kho