Omar Rizwani
11/18/2024, 4:00 PM_run_async
ends up being executed in the Cloud...
Is there a way of getting asynchronous functions to execute locally via the Prefect
engine?
# prefect version
Version: 2.20.7
API version: 0.8.4
Python version: 3.11.10
Git commit: e0267b46
Built: Thu, Sep 12, 2024 12:53 PM
OS/Arch: linux/x86_64
Profile: default
Server type: cloud
Marvin
11/18/2024, 4:00 PM@flow
functions are running in the cloud instead of locally, it seems you're encountering behavior that's not entirely documented. In Prefect 2.x, when you kick off a flow, Prefect’s engine can manage both local and cloud execution. Here are a few pointers to help you run your flows locally:
1. Local Environment Configuration: Ensure that your Prefect profile is set to execute locally. This typically involves configuring your profile to not connect to Prefect Cloud. You may want to check your environment and how it invokes Prefect flows since defaults might lead to cloud execution.
2. Using await
and asyncio
: If you're using async capabilities, make sure your local setup can handle them. Sometimes, the behavior in cloud setup might differ due to environment specifics in handling async flows.
3. Local running tip: Instead of relying on Prefect Cloud infrastructure, you can use Prefect CLI commands to run flows locally. The prefect execute flow-run
command is something you might find useful if your profile/environment is set up to run locally.
4. Documentation and Resources: You can explore predefined methods in Prefect’s documentation for potentially configuring your flows to execute locally if you're not explicitly stating cloud usage.
Here are some resources I gathered that might have snippets more directly related to your version:
- Details about running flows locally and asynchronously may be found on Prefect 2.x official pages: Prefect 2.x Flows
Nothing explicit popped up from GitHub issues, but if these pointers don’t solve the problem, feel free to reach out to Prefect’s support or community channels where similar issues might’ve been discussed more contextually.
Remember, sometimes it's the small setting buried in configs that changes the execution pathway! And, as I like to say, "There's always a more unpleasant universe where everything works perfectly, except the debugging—that really sucks there!" 😉Omar Rizwani
11/18/2024, 4:26 PM$ python projects/dunstan/flows.py
Marvin
11/18/2024, 4:26 PMpython projects/dunstan/flows.py
suggests you're executing the flow script directly with Python. This should indeed execute them locally, assuming your environment is set up not to default to Prefect Cloud.
Here are a few additional tips to ensure they run locally:
1. Check for Cloud Configuration: Ensure there are no configurations in your script that explicitly send runs to Prefect Cloud. This includes any calls that might configure the flow run context or environment to connect to a remote server.
2. Inspect Environment Variables: Inspect your environment variables for any Prefect-specific variables, such as PREFECT__CLOUD__API
or similar, which might redirect execution to the cloud.
3. Prefect Version: Double-check your Prefect version (which you've given as 2.20.7). Some behaviors might subtly differ between updates, so ensuring compatibility and checking release notes can sometimes point to quirks or updates that affect execution.
4. Flow Configuration: In your flow definitions, ensure that the decorators and metadata don't explicitly indicate cloud or remote execution settings. Check if you're using the flow.run()
method without any overrides that might specify cloud defaults.
If after trying these steps you still face the issue, there could be a hidden configuration that redirects async flows automatically. It's worth a deeper dive into Prefect forums or even opening an issue if none is existing, as it might be a less common edge-case others have encountered or documented outside of typical FAQs.Omar Rizwani
11/18/2024, 4:31 PMflow_run = await run_deployment(name=config.deployment_name, parameters={"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
Marvin
11/18/2024, 4:32 PMrun_deployment
function is specifically used to trigger deployments defined within Prefect, and these are typically configured to run in a particular environment, whether locally or in the cloud. When you use run_deployment
, it relies on the deployment settings that might include cloud execution.
To override or avoid running this in the cloud, consider the following:
1. Check Deployment Configuration: Ensure the deployment for config.deployment_name
is set to run locally. Inspect the deployment settings in your Prefect UI or configuration files to see where it is intended to run.
2. Local Execution Context: Instead of using run_deployment
, if possible, directly call the flow function in your script. This ensures it runs within the same local Python process and environment.
For example, if run_deployment
is typically used to run flows that are packaged and deployed separately, calling the flow function directly might look like this:
python
flow_result = await my_flow_function(token_batch, **config.deployment_params)
3. Override Execution Setting per Flow: If Prefect provides an option to specify run contexts or overrides in your deployment configuration or directly in the script, ensure it doesn't default to the cloud.
4. Debug Logging: Consider logging where the function decides its execution context, sometimes adding debug logging can expose unexpected behavior or path the code follows.
If avoiding the cloud for specific operations is imperative and the above doesn't provide a solution, documenting this behavior in an issue on Prefect's GitHub could also provide more targeted guidance from the community or the maintainers.
Remember, sometimes these runtime decisions can be as stubborn as a mule, refusing to budge from default settings unless explicitly told otherwise!Nate
11/18/2024, 4:56 PMHowever I'm finding that any @flow function that's called fromcan you explain what you're seeing? this should not be the caseends up being executed in the Cloud..._run_async
Omar Rizwani
11/18/2024, 5:00 PMPrefect
buddy has nested asynchronous
calls which is what's causing the perceived mixture of local/cloud run results & logging...
I'll dig into it more tomoz & give you some more insights...
(Thanks for getting back to me...X)Nate
11/18/2024, 5:01 PMOmar Rizwani
11/19/2024, 10:23 AMawait run deployment
command...
@flow(name="update-dataset", log_prints=True, on_completion=[dataset_update_complete],)
async def update_dataset(logger, dataset, **query_params):
# Local logs fine here...
tokens = update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job]
# Here logs disappear locally...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([state.is_final() for state in states]):
break
# Events trigger "on_completion" hook, logs aren't seen locally...
return {"dataset_name": dataset.name}
Marvin
11/20/2024, 9:07 AMHiLet me give you a clearer overview of what's happening for once you're back online... 💻 I'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱
Everything logs locally (as hoped) until the flow reaches an await run deployment
command...
```# This whole flow is asynchronous as the data downloaded is large and needs to be broken up into packets...
@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
"""
Update dataset based on config defined in Dataset dataclass
"""
# This call is done synchronously as the metadata is not significantly large...
# All the actions here are captured in local logs no problem...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
# split tokens into chunks and run multiple cloud run jobs per feature, if necessary
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# At this point some logs disappear locally but do appear in the cloud runs for theMarvin
11/20/2024, 10:02 AMHiLet me give you a clearer overview of what's happening for once you're back online... 💻 I'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱
Everything logs locally (as hoped) until the flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# All the actions here are captured in local logs no problem...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# At this point some logs disappear locally but do appear in the cloud runs for the various sub-tasks & hooks created...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)Marvin
11/20/2024, 10:04 AMHiI'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱
Everything logs locally until the flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# All the actions here are captured in local logs no problem...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# At this point some logs disappear locally but do appear in the cloud runs...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# At this point some logs disappear locally but do appear in the cloud runs...
flow_runs = [await client.read_flow_run(fr_id) for frMarvin
11/20/2024, 10:07 AMHiTrying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱
Everything logs locally until the flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# The actions here are captured in local logs no problem...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# At this point logs disappear locally but appear in the cloud runs...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# At this point logs disappear locally but appear in the cloud runs...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [Marvin
11/20/2024, 10:09 AMHiTrying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱
Everything logs locally until the flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# Actions here captured in local logs...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# Here logs disappear locally but appear in cloud runs...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([stateMarvin
11/20/2024, 10:12 AMHiTrying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱
Everything logs locally until flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# Actions here captured in local logs...
tokens =
update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# Here logs disappear locally...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([state.is_final() for state inMarvin
11/20/2024, 10:15 AMHiTrying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱
Everything logs locally until flow reaches an await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# Actions here captured in local logs...
tokens =
update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# Here logs disappear locally...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([state.is_final() for state in states]):
break
# Events trigger "onMarvin
11/20/2024, 10:17 AMHiTrying to run flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command...
```@flow(
name="update-dataset",
log_prints=True,
on_completion=[dataset_update_complete],
)
async def update_dataset(logger, dataset, **query_params):
# Actions here in local logs...
tokens =
update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job] # noqa
# Here logs disappear locally...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([state.is_final() for state in states]):
break
# Events trigger "on_completion" hook, logs aren'tMarvin
11/20/2024, 10:20 AMHiTrying to run flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command...
```@flow(name="update-dataset", log_prints=True, on_completion=[dataset_update_complete],)
async def update_dataset(logger, dataset, **query_params):
# Actions here in local logs...
tokens =
update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000
for config in dataset.data_deployments:
for i in range(0, len(tokens), tokens_per_job):
token_batch = tokens[i : i + tokens_per_job]
# Here logs disappear locally...
flow_run = await run_deployment(name=config.deployment_name, parameters={
"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
flow_run_ids.append(flow_run.id)
async with get_client() as client:
while True:
await anyio.sleep(5)
# Here logs disappear locally...
flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
states = [flow_run.state for flow_run in flow_runs]
if all([state.is_final() for state in states]):
break
# Events trigger "on_completion" hook, logs aren't seen locally...
return {"dataset_name":Marvin
11/20/2024, 10:22 AMawait run_deployment
might stem from how Prefect orchestrates executions when using deployments. Here's how you might address the situation:
1. Local Execution Context: When run_deployment
is executed, it follows the deployment configuration which usually defines whether it runs locally or in the cloud. If the deployment defaults to a cloud execution, the logs would naturally appear there and not locally.
2. Consider Direct Flow Execution: For local debugging purposes, if possible, circumvent the deployment execution by directly calling flow functions with appropriate parameters. This ensures the execution and its logs remain within the local context.
3. Deployment Configuration: Inspect your deployment setup to check if they automatically direct executions towards a Prefect server or cloud environment. Prefect UI or configurations files can show where the executions are intended based on the deployment settings.
4. Logs and Local Setup: Prefect defaults might capture logs at different verbosity levels r depending on the deployment settings. Check to ensure your logger or environment variables aren’t filtering them out.
5. Using serve
for Local Execution: If you need to execute multiple flows or deployments locally for debugging, consider using the serve
method or similar approaches that ensure local execution and logging. This was mentioned in Prefect tutorials: find more in the Prefect 2.x documentation.
By directly executing function calls or adjusting execution contexts in your script, you should be able to see the desired logs locally. If confusion persists or this behavior seems unintended, following up with the Prefect community or documented issues for your specific version might offer additional solutions or confirmations.