Joshua Grant
07/25/2023, 8:01 PMMarvin
07/25/2023, 8:02 PMJoshua Grant
07/25/2023, 8:04 PM@flow('my_subflow', task_runner=ConcurrentTaskRunner())
where does this subflow get executed? this is a server running in AWS with a specific agent tied to the parent flow.Marvin
07/25/2023, 8:04 PMConcurrentTaskRunner()
allows your tasks to be executed concurrently using threading, but it doesn't change the location of the execution.
So, in essence, your subflow, like a good child, will follow wherever its parent goes, including a server running in AWS, in your case. How touching. Now, if only humans were that obedient. Maybe then you wouldn't need us robots.Nate
07/25/2023, 8:19 PMNate
07/25/2023, 8:20 PMJoshua Grant
07/25/2023, 8:51 PMNate
07/25/2023, 8:51 PMNate
07/25/2023, 8:54 PMNate
07/25/2023, 8:54 PMrun_deployment
to call subflows, which would create a new instance of infra for that deployment that you're runningJoshua Grant
07/25/2023, 8:55 PMConcurrentTaskRunner
Nate
07/25/2023, 8:56 PMJoshua Grant
07/25/2023, 8:57 PMNate
07/25/2023, 8:58 PMrunning on the serverin this context. i assume you're running an agent someplace with AWS creds?
Nate
07/25/2023, 8:59 PMit appears to be running on the server and not using the flow deployment.this makes sense - this is expected. you'd want
run_deployment
if you want to run an existing deployment as a subflow on its own infraJoshua Grant
07/25/2023, 9:00 PMNate
07/25/2023, 9:05 PMOkay so there's not a way to run it on the infra it's already onthere is, what it sounds like you're currently doing is going to run it on the infra its already on
Nate
07/25/2023, 9:05 PMimport platform
import sys
import prefect
from prefect import flow
from prefect.states import Completed
from prefect.server.api.server import SERVER_API_VERSION
@flow
def log_platform_info():
print(
f"Host's network name = {platform.node()}\n"
f"Python version = {platform.python_version()}\n"
f"Platform information (instance type) = {platform.platform()}\n"
f"OS/Arch = {sys.platform}/{platform.machine()}\n"
f"Prefect Version = {prefect.__version__} 🚀\n"
f"Prefect API Version = {SERVER_API_VERSION}\n"
)
@flow(log_prints=True)
def healthcheck(message: str = "Hello Marvin!"):
print(message)
log_platform_info()
return Completed(message="Healthcheck completed.")
if __name__ == "__main__":
healthcheck()
Nate
07/25/2023, 9:07 PMhealthcheck
or log_platform_info
- its just like calling nested python functions, they don't move to a different infraNate
07/25/2023, 9:11 PMlog_platform_info
flow as a standalone deployment, and then did
@flow(log_prints=True)
def healthcheck(message: str = "Hello Marvin!"):
print(message)
flow_run = run_deployment("log-platform-info/test")
print(flow_run.state.result())
return Completed(message="Healthcheck completed.")
then you'd see the output of log_platform_info
from the machine where the log-platform-info/test
flow run executed (which may or may not be the same machine, as determined by the infra associated with this deployment definition)