Ovo Ojameruaye
01/03/2022, 6:35 AMOvo Ojameruaye
01/03/2022, 6:36 AMProcess PID 924 returned non-zero exit code 2!
is returnedOvo Ojameruaye
01/03/2022, 6:42 AMOvo Ojameruaye
01/03/2022, 7:49 AM[2022-01-02 23:05:25-0800] DEBUG - prefect.CloudTaskRunner | Task 'Upload Order Shipping to Azure': Handling state change from Pending to Running
[2022-01-02 23:05:25-0800] DEBUG - prefect.CloudTaskRunner | Task 'Upload Order Shipping to Azure': Calling task.run() method...
[2022-01-03 07:05:27,024] DEBUG - agent | Querying for ready flow runs...
[2022-01-03 07:05:27,066] DEBUG - agent | No ready flow runs found.
[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
[2022-01-02 23:05:32-0800] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2022-01-03 07:05:35,077] DEBUG - agent | Querying for ready flow runs...
[2022-01-03 07:05:35,106] DEBUG - agent | No ready flow runs found.
The task:
@task(name="Upload Order Shipping to Azure", state_handlers=[send_email_on_failure])
def upload():
upload_shipping = importlib.import_module("some_local_module")
upload_shipping.main()
upload_shipping.main()
uses the argparse library to accept arguments with a default set. It looks like on the backend, other arguments are passed that prefect server uses to run the task which causes the fail. Please how can I solve this?Anna Geller
from prefect.run_configs import UniversalRun
flow.run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Ovo Ojameruaye
01/03/2022, 5:08 PM[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
A different argument is passed in addition to run-date. I guess this is needed for prefect backend somehow.Anna Geller
Ovo Ojameruaye
01/03/2022, 5:33 PMAnna Geller
BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
print(f"This is base dir: {BASE_DIR}")
DATA_PIPELINE_DIR = os.path.join(BASE_DIR, 'data-pipeline')
logger = prefect.context.get("logger")
if you move it into tasks, this should workAnna Geller
logger = prefect.context.get("logger")
Ovo Ojameruaye
01/04/2022, 3:01 PMproject_name = "proj_name"
flow_name = "Argparse test"
labels = ["WinDev"]
WORKFLOW_DIR = "C:\\Users\\xxxx\\Documents\\Develop\\workflow_management"
PROJ = "proj_dir"
@task(name="Change Directory", )
def change_dir():
BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Current dir: {os.getcwd()}")
os.chdir(BASE_DIR)
<http://logger.info|logger.info>(f"New dir: {os.getcwd()}")
@task(name="Upload Order Shipping to Azure", )
def upload_data():
BASE_DIR = os.path.join(WORKFLOW_DIR, PROJ)
DATA_PIPELINE_DIR = os.path.join(BASE_DIR, 'data-pipeline')
logger = prefect.context.get("logger")
sys.path.append(DATA_PIPELINE_DIR)
upload_shipping = importlib.import_module("t1")
upload_shipping.main()
<http://logger.info|logger.info>("Order Shipping Data Uploaded")
def prefect_flow():
with Flow(flow_name, ) as flow:
dir_root = change_dir()
upload = upload_data()
upload.set_upstream(dir_root)
return flow
if __name__ == "__main__":
logger = prefect.context.get("logger")
flow = prefect_flow()
# flow.run()
try:
client = Client()
client.create_project(project_name=project_name)
except prefect.utilities.exceptions.ClientError as e:
<http://logger.info|logger.info>("Project already exists")
flow.run_config=LocalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
flow.executor = LocalExecutor()
flow.register(project_name=project_name,
labels=labels).
Anna Geller
Ovo Ojameruaye
01/04/2022, 4:27 PM[2022-01-03 07:05:27,068] DEBUG - agent | Sleeping flow run poller for 8.0 seconds...
usage: __main__.py [-h] [--run-date RUN_DATE]
__main__.py: error: unrecognized arguments: execute flow-run
What I suspect is because I use the argparse package, it might be causing some sort of conflict. For example when you have a python script that accepts arguments and you pass in an invalid argument, it throws an error similar to what I see on the agent logs. The module I call only expect a run-date argument but also gets a execute flow-run. Looks like prefect adds an additional argument on the backend. I might be way off 🙂 .Anna Geller
Anna Geller
Ovo Ojameruaye
01/05/2022, 3:30 AMOvo Ojameruaye
01/05/2022, 5:52 AMAnna Geller