Hi everyone, I am having issues with one task in m...
# ask-community
o
Hi everyone, I am having issues with one task in my flow and not sure why. I see this on the logs when the task the run. Other tasks run find and I have tried a different flow on the same agent with this error. I have also tried to run the flow as a process with flow.run() and it completes successfully
On the terminal where the agent is running
Process PID 924 returned non-zero exit code 2!
is returned
Running the logger on debug, it seems to get stuck at checking the flow state
Setting the agent log-level to DEBUG shows:
Copy code
[2022-01-02 23:05:25-0800] DEBUG - prefect.CloudTaskRunner | Task 'Upload Order Shipping to Azure': Handling state change from Pending to Running
[2022-01-02 23:05:25-0800] DEBUG - prefect.CloudTaskRunner | Task 'Upload Order Shipping to Azure': Calling task.run() method...
[2022-01-03 07:05:27,024] DEBUG - agent | Querying for ready flow runs...
[2022-01-03 07:05:27,066] DEBUG - agent | No ready flow runs found.
[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
[2022-01-02 23:05:32-0800] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2022-01-03 07:05:35,077] DEBUG - agent | Querying for ready flow runs...
[2022-01-03 07:05:35,106] DEBUG - agent | No ready flow runs found.
The task:
@task(name="Upload Order Shipping to Azure", state_handlers=[send_email_on_failure])
def upload():
upload_shipping = importlib.import_module("some_local_module")
upload_shipping.main()
upload_shipping.main()
uses the argparse library to accept arguments with a default set. It looks like on the backend, other arguments are passed that prefect server uses to run the task which causes the fail. Please how can I solve this?
a
Your task lost heartbeat and therefore Prefect can no longer handle its state changes. Is this a long-running task? Do you have some unclosed database connection in this task? Do you have enough memory on the agent to complete the task? Prefect Flows have heartbeats that signal to Prefect Cloud that your Flow is alive. If Prefect didn’t have heartbeats, Flows that lose communication and die would permanently be shown as Running in the UI. Most of the time, we have seen “no heartbeat detected” to be a sign of memory issues (out of memory error). Prefect version 0.15.4 has additional logging that propagates the real error in the event this error happens. We have also seen this error happen with long-running jobs. To mitigate the issue, you can configure heartbeats to be threads instead of processes. This has proven to be more stable for a lot of users. The documentation for that is here.
Copy code
from prefect.run_configs import UniversalRun
flow.run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
o
Hi @Anna Geller, thanks. I am running prefect server. It is not a long running process and I am quite sure it's not a memory error either (The flow completes locally when I use prefect core flow.run()). I added a few more details / updates in the first 3 comments. The task that fails uses the argparse library to accept arguments (--run-date). When i run the agent with log-level debug it shows
Copy code
[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
A different argument is passed in addition to run-date. I guess this is needed for prefect backend somehow.
a
you mentioned you run it with argparse but I only see importing and executing a module. Can you share your Flow or try to build a small example I could use to reproduce the issue? btw if you want to call a shell script, you can use a ShellTask
o
@Anna Geller argparse is used inside the module. I am migrating our workflows to prefect and didn't write the code so can't really refactor it
This is a dummy file but it should replicate the behaviour
a
The problem is that you run Python code globally outside of tasks:
Copy code
BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
print(f"This is base dir: {BASE_DIR}")
DATA_PIPELINE_DIR = os.path.join(BASE_DIR, 'data-pipeline')
logger = prefect.context.get("logger")
if you move it into tasks, this should work
also the logger - you need to call this line in each task separately because the context is available in tasks, not in a flow:
Copy code
logger = prefect.context.get("logger")
o
@Anna Geller Thank you. Is there an article that explains why we can't run python code outside tasks? Can we run them when we define a flow? I updated the flow but still get the same error. Where you able to reproduce the error?
Copy code
project_name = "proj_name"
flow_name = "Argparse test"
labels = ["WinDev"]

WORKFLOW_DIR = "C:\\Users\\xxxx\\Documents\\Develop\\workflow_management"
PROJ = "proj_dir"


@task(name="Change Directory", )
def change_dir():
    BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Current dir: {os.getcwd()}")
    os.chdir(BASE_DIR)
    <http://logger.info|logger.info>(f"New dir: {os.getcwd()}")

    
@task(name="Upload Order Shipping to Azure", )
def upload_data():
    BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
    DATA_PIPELINE_DIR = os.path.join(BASE_DIR, 'data-pipeline')
    logger = prefect.context.get("logger")
    sys.path.append(DATA_PIPELINE_DIR)
    upload_shipping = importlib.import_module("t1")
    upload_shipping.main()
    <http://logger.info|logger.info>("Order Shipping Data Uploaded")

def prefect_flow():
    with Flow(flow_name, ) as flow:
        dir_root = change_dir()     
        upload = upload_data()
        
        upload.set_upstream(dir_root)
    return flow


if __name__ == "__main__":
    logger = prefect.context.get("logger")
    flow = prefect_flow()
#    flow.run()
    try:
        client = Client()
        client.create_project(project_name=project_name)
    except prefect.utilities.exceptions.ClientError as e:
        <http://logger.info|logger.info>("Project already exists")

    flow.run_config=LocalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
    flow.executor = LocalExecutor()
    flow.register(project_name=project_name,
                labels=labels).
a
Yes, it’s documented here that Flows are containers for tasks. It will be easier in Orion. You don’t define the storage in your flow - I assume you use local pickle storage (default with this registration method)? What is your agent - is it a local agent on the same machine from which you register your flow? So the problem is that the task “Upload Order Shipping to Azure” is losing heartbeat, correct? it could be some connectivity issue with Azure, I saw an open issue when Azure was dropping connection to remote hosts - the user solved it by moving the agent to the same host
o
Yes, you are right, I am using the default local storage and a local agent from the same machine I register the flow. I am able to run the flow with prefect core using flow.run() so I don't think the problem is the dropped connection. I also have other flows that connect to Azure with no issues. So the problem is that the task “Upload Order Shipping to Azure” is losing heartbeat, correct? -- Yes, this shows up on the UI logs but .......... This shows up in the agent logs when the flow is run from the UI
Copy code
[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
What I suspect is because I use the argparse package, it might be causing some sort of conflict. For example when you have a python script that accepts arguments and you pass in an invalid argument, it throws an error similar to what I see on the agent logs. The module I call only expect a run-date argument but also gets a execute flow-run. Looks like prefect adds an additional argument on the backend. I might be way off 🙂 .
a
I wouldn’t want to prematurely jump to any conclusions. Can you answer those questions so that we can identify the root cause? 1. You said you run Server - what is your Prefect version on Server? 2. What is your agent’s Prefect version? 3. What is your Prefect version from the environment from which you register your flow? 4. Are all components (Server, agent, registration) running from your local machine or from a remote server? 5. Did you try putting the t1 module into your normal flow file to see whether this might be just an import issue? (you can literally copy paste the code to your flow just to check) 6. You could optionally try making t1 an installable module (you need just setup.py) that you can install on your agent and import this way
Also: do other flows work well? I suspect this is an import/packaging issue with this t1 module since you said running locally works, and on the agent it probably doesn’t because the agent doesn’t have this code as dependency
o
Hi @Anna Geller, sorry for the late response. All other flows run well. I have had problems with importing packages in the past and the error is quite clear. I import the modules same way I do in every other flow I have without issues. 1. You said you run Server - what is your Prefect version on Server? - 0.14.0 2. What is your agent’s Prefect version? - 0.15.7 3. What is your Prefect version from the environment from which you register your flow? - 0.15.7 4. Are all components (Server, agent, registration) running from your local machine or from a remote server? - They are all running on remote servers. Prefect Server is running on one remote server (Lets call this A) and the agent and registration are running off a different server (B). Not sure if this makes any difference but remote server A is a Linux and B is a Windows 5. Did you try putting the t1 module into your normal flow file to see whether this might be just an import issue? (you can literally copy paste the code to your flow just to check) - Same error. I copied the function and pasted it into the task, then called it. 6. You could optionally try making t1 an installable module (you need just setup.py) that you can install on your agent and import this way - Same error I removed the argparse parser from the function and it ran without any errors
I ran the python script using a shellTask rather than importing it as a module and this worked as well
a
Nice work! In general, your Server version should be >= version you use for agent and registration, otherwise you may hit some API endpoints that don't exist in Server because they were added later
🙌 1
898 Views