Hi there Is there a way to use dynamic flows with ...
# prefect-community
m
Hi there Is there a way to use dynamic flows with the Prefect Server (Cloud or on-premise)? This register stuff, is not useful sometimes. With dynamic flow i mean: the tasks will be generated at runtime. Not possible with mapping, because they have dependencies. I already asked here, but nobody answered about this: https://prefect-community.slack.com/archives/C014Z8DPDSR/p1612986542265800 Thanks a lot for every help!
a
Hey @Michael Hadorn I believe there is a way to run a Python function/method as a task. Let me check
👍 1
maybe something like this will work for your use case
*<http://prefect.utilities.tasks.as|prefect.utilities.tasks.as>_task*(x, flow=None)
https://github.com/PrefectHQ/prefect/blob/b438af1436f6baa3f941600557309648e838f111/src/prefect/utilities/tasks.py#L233
m
Thanks a lot for your answer. I'm not sure if you understood my question correctly... I do have a flow with dynamic tasks. But when I register this task to the backend, he complains while runtime, that the registered flow does not match the current flow. (I use the github storage, to achieve a rebuild of the flow). I would like: • flow is scheduled in backend • will build the flow • and run this builded flow
a
Ok, now I got it, thanks for the clarification. I don’t think Prefect is currently able to handle this case out-of-the-box. I mean, tasks have to be already defined when registering a Flow. What you can try to do is to build your flow dynamically and THEN register the new flow to Prefect Server/Cloud
m
Thanks. Sorry for not writing clear before. ☺️ Yes, i tried a separate flow, who is only registering the other flow (can even use the same docker image). This is working in general. But then I need a possibility to concat two flows - as i see, this is also not possible. Anyhow I think this is not really solving my problem. Why is this registration so important? In my case, I see only unnecessary constraints. If the flow is stored as file, then it should be able to register his self again, if there were a change I think. I would be interested why this is catched with the error message:
Copy code
KeyError: 'Task slug XX not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.'
a
You don't necessarily need to concat the dynamic flow with the flow that registers it though. You could provide a schedule for the dynamic flow that is say, a few minutes from
now()
and so the dynamic flow will run shortly after registration.
m
Also big thanks for your answer. yes. that's true, but if the register is broke because of any reason, it would be nice, if the other job (with outdated schema) would not start. So is it correct, that: • no dynamic flows • no concat of flows (managed in the gui) ?
a
If you use one of the schedules that only schedule for a particular date/time rather than a repeat, then if a future registration fails, the old flow version will not run. As for "no dynamic flows": it sounds like you have succeeded in using dynamic flows since you say the two flow approach "is working in general"? The concat of flows looks like it can only be done if you define the flows first then construct a flow of flows, so it wouldn't work with your dynamic flow idea (unless someone figures out a trick we're missing). But the one-off schedule method should be good enough.
m
Ok. Thanks again - it's really awesome how active this community/prefect members are! 🙂 Yes with two flows (register & run) it's working. Can anyone explain, why this registration is needed? What is the benefit to have this information?
a
This is what the docs say:
Flows can be registered with the Prefect API for scheduling and execution, as well as management of run histories, logs, and other important metrics.
https://docs.prefect.io/orchestration/concepts/flows.html
In my understanding it means that prefect cloud/server knows your flow's DAG and other metadata, so it can allow for nifty things like restarting a flow from the middle where a task failed, or recording run times for each task so you can figure out what needs to be optimised.
m
yes. that's really cool stuff, but it would be nice to have a kind of flow, which can do this at runtime. (and store this informatoin per run). not in general. anyhow, big thanks for your support! 🙌
🎈 1