<@ULVA73B9P> I'm trying to run some @flows locall...
# ask-marvin
o
@Marvin I'm trying to run some @flows locally so I can debug them quicker... However I'm finding that any @flow function that's called from
_run_async
ends up being executed in the Cloud... Is there a way of getting asynchronous functions to execute locally via the
Prefect
engine?
Copy code
# prefect version
Version:             2.20.7
API version:         0.8.4
Python version:      3.11.10
Git commit:          e0267b46
Built:               Thu, Sep 12, 2024 12:53 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud
m
If you're finding that your asynchronous
@flow
functions are running in the cloud instead of locally, it seems you're encountering behavior that's not entirely documented. In Prefect 2.x, when you kick off a flow, Prefect’s engine can manage both local and cloud execution. Here are a few pointers to help you run your flows locally: 1. Local Environment Configuration: Ensure that your Prefect profile is set to execute locally. This typically involves configuring your profile to not connect to Prefect Cloud. You may want to check your environment and how it invokes Prefect flows since defaults might lead to cloud execution. 2. Using
await
and
asyncio
: If you're using async capabilities, make sure your local setup can handle them. Sometimes, the behavior in cloud setup might differ due to environment specifics in handling async flows. 3. Local running tip: Instead of relying on Prefect Cloud infrastructure, you can use Prefect CLI commands to run flows locally. The
prefect execute flow-run
command is something you might find useful if your profile/environment is set up to run locally. 4. Documentation and Resources: You can explore predefined methods in Prefect’s documentation for potentially configuring your flows to execute locally if you're not explicitly stating cloud usage. Here are some resources I gathered that might have snippets more directly related to your version: - Details about running flows locally and asynchronously may be found on Prefect 2.x official pages: Prefect 2.x Flows Nothing explicit popped up from GitHub issues, but if these pointers don’t solve the problem, feel free to reach out to Prefect’s support or community channels where similar issues might’ve been discussed more contextually. Remember, sometimes it's the small setting buried in configs that changes the execution pathway! And, as I like to say, "There's always a more unpleasant universe where everything works perfectly, except the debugging—that really sucks there!" 😉
o
@Marvin I'm using the following command to execute my flows locally
Copy code
$ python projects/dunstan/flows.py
m
Running your flows using the command
python projects/dunstan/flows.py
suggests you're executing the flow script directly with Python. This should indeed execute them locally, assuming your environment is set up not to default to Prefect Cloud. Here are a few additional tips to ensure they run locally: 1. Check for Cloud Configuration: Ensure there are no configurations in your script that explicitly send runs to Prefect Cloud. This includes any calls that might configure the flow run context or environment to connect to a remote server. 2. Inspect Environment Variables: Inspect your environment variables for any Prefect-specific variables, such as
PREFECT__CLOUD__API
or similar, which might redirect execution to the cloud. 3. Prefect Version: Double-check your Prefect version (which you've given as 2.20.7). Some behaviors might subtly differ between updates, so ensuring compatibility and checking release notes can sometimes point to quirks or updates that affect execution. 4. Flow Configuration: In your flow definitions, ensure that the decorators and metadata don't explicitly indicate cloud or remote execution settings. Check if you're using the
flow.run()
method without any overrides that might specify cloud defaults. If after trying these steps you still face the issue, there could be a hidden configuration that redirects async flows automatically. It's worth a deeper dive into Prefect forums or even opening an issue if none is existing, as it might be a less common edge-case others have encountered or documented outside of typical FAQs.
o
@Marvin Would the following command need to be overridden to avoid cloud runs?
Copy code
flow_run = await run_deployment(name=config.deployment_name, parameters={"tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
m
The
run_deployment
function is specifically used to trigger deployments defined within Prefect, and these are typically configured to run in a particular environment, whether locally or in the cloud. When you use
run_deployment
, it relies on the deployment settings that might include cloud execution. To override or avoid running this in the cloud, consider the following: 1. Check Deployment Configuration: Ensure the deployment for
config.deployment_name
is set to run locally. Inspect the deployment settings in your Prefect UI or configuration files to see where it is intended to run. 2. Local Execution Context: Instead of using
run_deployment
, if possible, directly call the flow function in your script. This ensures it runs within the same local Python process and environment. For example, if
run_deployment
is typically used to run flows that are packaged and deployed separately, calling the flow function directly might look like this:
Copy code
python
   flow_result = await my_flow_function(token_batch, **config.deployment_params)
3. Override Execution Setting per Flow: If Prefect provides an option to specify run contexts or overrides in your deployment configuration or directly in the script, ensure it doesn't default to the cloud. 4. Debug Logging: Consider logging where the function decides its execution context, sometimes adding debug logging can expose unexpected behavior or path the code follows. If avoiding the cloud for specific operations is imperative and the above doesn't provide a solution, documenting this behavior in an issue on Prefect's GitHub could also provide more targeted guidance from the community or the maintainers. Remember, sometimes these runtime decisions can be as stubborn as a mule, refusing to budge from default settings unless explicitly told otherwise!
n
hi @Omar Rizwani - hmm this doesn't sound right
However I'm finding that any @flow function that's called from
_run_async
ends up being executed in the Cloud...
can you explain what you're seeing? this should not be the case
o
Hi @Nate I've got to shoot off now but I can elaborate more tomorrow... My suspicion is that my
Prefect
buddy has nested
asynchronous
calls which is what's causing the perceived mixture of local/cloud run results & logging... I'll dig into it more tomoz & give you some more insights... (Thanks for getting back to me...X)
n
👍
o
Hi @Marvin Trying to run flows locally that update a dataset from a DWH source... Everything logs locally until flow reaches an
await run deployment
command...
Copy code
@flow(name="update-dataset", log_prints=True, on_completion=[dataset_update_complete],)
async def update_dataset(logger, dataset, **query_params):

# Local logs fine here...
tokens = update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params)
flow_run_ids = []
tokens_per_job = 5000

for config in dataset.data_deployments:
  for i in range(0, len(tokens), tokens_per_job):
    token_batch = tokens[i : i + tokens_per_job]
    # Here logs disappear locally...
    flow_run = await run_deployment(name=config.deployment_name, parameters={
               "tokens": token_batch, **config.deployment_params}, timeout=config.timeout)
    flow_run_ids.append(flow_run.id)

async with get_client() as client:
  while True:
    await anyio.sleep(5)
    # Here logs disappear locally...
    flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids]
    states = [flow_run.state for flow_run in flow_runs]
    if all([state.is_final() for state in states]):
      break

# Events trigger "on_completion" hook, logs aren't seen locally...
return {"dataset_name": dataset.name}
m
Your message was too long by 286 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Let me give you a clearer overview of what's happening for once you're back online... 💻 I'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱 Everything logs locally (as hoped) until the flow reaches an
await run deployment
command... ```# This whole flow is asynchronous as the data downloaded is large and needs to be broken up into packets... @flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): """ Update dataset based on config defined in Dataset dataclass """ # This call is done synchronously as the metadata is not significantly large... # All the actions here are captured in local logs no problem... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: # split tokens into chunks and run multiple cloud run jobs per feature, if necessary for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # At this point some logs disappear locally but do appear in the cloud runs for the
Your message was too long by 185 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Let me give you a clearer overview of what's happening for once you're back online... 💻 I'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱 Everything logs locally (as hoped) until the flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # All the actions here are captured in local logs no problem... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # At this point some logs disappear locally but do appear in the cloud runs for the various sub-tasks & hooks created... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5)
Your message was too long by 142 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
I'm trying to run some
Prefect
@flows locally that update a dataset with data & metadata from a DWH source... 🧱 Everything logs locally until the flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # All the actions here are captured in local logs no problem... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # At this point some logs disappear locally but do appear in the cloud runs... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # At this point some logs disappear locally but do appear in the cloud runs... flow_runs = [await client.read_flow_run(fr_id) for fr
Your message was too long by 78 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until the flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # The actions here are captured in local logs no problem... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # At this point logs disappear locally but appear in the cloud runs... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # At this point logs disappear locally but appear in the cloud runs... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [
Your message was too long by 34 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until the flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # Actions here captured in local logs... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # Here logs disappear locally but appear in cloud runs... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # Here logs disappear locally... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [flow_run.state for flow_run in flow_runs] if all([state
Your message was too long by 28 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # Actions here captured in local logs... tokens = update_dataset_metadata(logger=logger, name=dataset.name, table_config=dataset.table_config, query_config=dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # Here logs disappear locally... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # Here logs disappear locally... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [flow_run.state for flow_run in flow_runs] if all([state.is_final() for state in
Your message was too long by 18 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run
Prefect
@flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # Actions here captured in local logs... tokens = update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # Here logs disappear locally... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # Here logs disappear locally... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [flow_run.state for flow_run in flow_runs] if all([state.is_final() for state in states]): break # Events trigger "on
Your message was too long by 12 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command... ```@flow( name="update-dataset", log_prints=True, on_completion=[dataset_update_complete], ) async def update_dataset(logger, dataset, **query_params): # Actions here in local logs... tokens = update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # noqa # Here logs disappear locally... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # Here logs disappear locally... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [flow_run.state for flow_run in flow_runs] if all([state.is_final() for state in states]): break # Events trigger "on_completion" hook, logs aren't
Your message was too long by 4 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Hi
Trying to run flows locally that update a dataset from a DWH source... 🧱 Everything logs locally until flow reaches an
await run deployment
command... ```@flow(name="update-dataset", log_prints=True, on_completion=[dataset_update_complete],) async def update_dataset(logger, dataset, **query_params): # Actions here in local logs... tokens = update_dataset_metadata(logger, dataset.name, dataset.table_config, dataset.query_config, **query_params) flow_run_ids = [] tokens_per_job = 5000 for config in dataset.data_deployments: for i in range(0, len(tokens), tokens_per_job): token_batch = tokens[i : i + tokens_per_job] # Here logs disappear locally... flow_run = await run_deployment(name=config.deployment_name, parameters={ "tokens": token_batch, **config.deployment_params}, timeout=config.timeout) flow_run_ids.append(flow_run.id) async with get_client() as client: while True: await anyio.sleep(5) # Here logs disappear locally... flow_runs = [await client.read_flow_run(fr_id) for fr_id in flow_run_ids] states = [flow_run.state for flow_run in flow_runs] if all([state.is_final() for state in states]): break # Events trigger "on_completion" hook, logs aren't seen locally... return {"dataset_name":
It appears that your issue with logs disappearing when you use
await run_deployment
might stem from how Prefect orchestrates executions when using deployments. Here's how you might address the situation: 1. Local Execution Context: When
run_deployment
is executed, it follows the deployment configuration which usually defines whether it runs locally or in the cloud. If the deployment defaults to a cloud execution, the logs would naturally appear there and not locally. 2. Consider Direct Flow Execution: For local debugging purposes, if possible, circumvent the deployment execution by directly calling flow functions with appropriate parameters. This ensures the execution and its logs remain within the local context. 3. Deployment Configuration: Inspect your deployment setup to check if they automatically direct executions towards a Prefect server or cloud environment. Prefect UI or configurations files can show where the executions are intended based on the deployment settings. 4. Logs and Local Setup: Prefect defaults might capture logs at different verbosity levels r depending on the deployment settings. Check to ensure your logger or environment variables aren’t filtering them out. 5. Using
serve
for Local Execution
: If you need to execute multiple flows or deployments locally for debugging, consider using the
serve
method or similar approaches that ensure local execution and logging. This was mentioned in Prefect tutorials: find more in the Prefect 2.x documentation. By directly executing function calls or adjusting execution contexts in your script, you should be able to see the desired logs locally. If confusion persists or this behavior seems unintended, following up with the Prefect community or documented issues for your specific version might offer additional solutions or confirmations.