https://prefect.io logo
k

kade

03/16/2023, 2:22 PM
Hi there. I want to use GCP Google Compute Engine as prefect agent and run a flow through prefect cloud api. I have deployed my code to a gcs bucket. My code is all working fine locally. However, when I run prefect agent start -p mypool on the VM, the flow fails as follows.
Copy code
14:14:30.142 | ERROR   | prefect.infrastructure.process - Process 'psi6-vanden-sector' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
The above error is not a memory issue (memory did not spike when I monitored it), nor did I manually terminate it. One peculiarity is that retrying this failed flow results in a new error message:
Copy code
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
14:22:03.283 | ERROR   | Flow run 'psi6-vanden-sector' - Finished in state Failed('Flow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.\n')
1
If I don’t deploy my code and run it in the VM like
python etl.py
, it works fine. What is the problem, I have been suffering for days because of this.
whole error code:
Copy code
Agent started! Looking for work from work pool 'gh-agent-pool'...
14:12:32.114 | INFO    | prefect.agent - Submitting flow run 'd7146be9-3824-4c04-bb0d-71b88bfbccf5'
14:12:33.068 | INFO    | prefect.infrastructure.process - Opening process 'psi6-vanden-sector'...
14:12:33.297 | INFO    | prefect.agent - Completed submission of flow run 'd7146be9-3824-4c04-bb0d-71b88bfbccf5'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
14:12:37.062 | INFO    | Flow run 'psi6-vanden-sector' - Downloading flow code from storage at '/home/kade/gh-prefect'
14:13:02.706 | INFO    | Flow run 'psi6-vanden-sector' - Created task run 'extract_data_from_web-0' for task 'extract_data_from_web'
14:13:02.707 | INFO    | Flow run 'psi6-vanden-sector' - Executing 'extract_data_from_web-0' immediately...
14:14:30.142 | ERROR   | prefect.infrastructure.process - Process 'psi6-vanden-sector' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
14:14:30.518 | INFO    | prefect.agent - Reported flow run 'd7146be9-3824-4c04-bb0d-71b88bfbccf5' as crashed: Flow run infrastructure exited with non-zero status code -9.
14:21:27.854 | INFO    | prefect.agent - Submitting flow run 'd7146be9-3824-4c04-bb0d-71b88bfbccf5'
14:21:28.885 | INFO    | prefect.infrastructure.process - Opening process 'psi6-vanden-sector'...
14:21:29.172 | INFO    | prefect.agent - Completed submission of flow run 'd7146be9-3824-4c04-bb0d-71b88bfbccf5'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
14:21:36.018 | INFO    | Flow run 'psi6-vanden-sector' - Downloading flow code from storage at '/home/kade/gh-prefect'
14:22:01.874 | INFO    | Flow run 'psi6-vanden-sector' - Created task run 'extract_data_from_web-0' for task 'extract_data_from_web'
14:22:01.876 | INFO    | Flow run 'psi6-vanden-sector' - Executing 'extract_data_from_web-0' immediately...
14:22:02.970 | WARNING | Task run 'extract_data_from_web-0' - Task run 'edc91b43-2ef6-4de9-b78c-70825dc38d35' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
14:22:02.972 | ERROR   | Flow run 'psi6-vanden-sector' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/engine.py", line 665, in orchestrate_flow_run
    result = await run_sync(flow_call)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmppa8w_lpdprefect/etl/etl_web_to_gcs.py", line 81, in etl_web_to_gcs
    extract_data_from_web(year, month, day, hour)
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/tasks.py", line 469, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/engine.py", line 965, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
           ^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/engine.py", line 1114, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/kade/gh-prefect/venv/lib/python3.11/site-packages/prefect/states.py", line 103, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
14:22:03.283 | ERROR   | Flow run 'psi6-vanden-sector' - Finished in state Failed('Flow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.\n')
execution_date: 2023-03-16-13
14:22:05.093 | INFO    | prefect.infrastructure.process - Process 'psi6-vanden-sector' exited cleanly.
r

Ryan Peden

03/16/2023, 3:20 PM
Is there anything in your task code that might not work properly when running in a subproccess? It looks like something is happening in the task causing abrupt termination of the process running the flow. It happened so quickly that the task disappeared while its state was still
RUNNING
, as indicated by this error message that appeared when Prefect tried to re-run the flow:
Copy code
14:22:02.970 | WARNING | Task run 'extract_data_from_web-0' - Task run 'edc91b43-2ef6-4de9-b78c-70825dc38d35' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
We have an update going out today allow the RUNNING to RUNNING transition, but I don't think it will solve the underlying problem here if the task keeps terminating the flow subprocess. It's hard to diagnose more without knowing what's happening in the
extract_data_from_web
task. I've seen occasional issues when launching a headless browser tool like Puppeteer from a subprocess; are you using anything like that to extract web data?
k

kade

03/17/2023, 12:34 AM
hi @Ryan Peden. Thanks for response. I didn’t use the headless tool. I just using request url to download. here is my code `etract_data_from_web`:
Copy code
@task(
    retries=3,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(days=1),
)
def extract_data_from_web(
    year: int, month: int, day: int, hour: int
) -> None:
    headers = {"User-Agent": "Mozilla/5.0"}
    # hour is 0-23
    url = f"<https://data.gharchive.org/{year}-{month:02d}-{day:02d}-{hour}.json.gz>"
    print(f"Extracting data from {url}...")
    req = request.Request(url, headers=headers)
    response = request.urlopen(req)

    data = gzip.decompress(response.read()).decode()

    dicts = data.strip().split("\n")

    data_list = []
    for d in dicts:
        # remove payload key in dict
        d = json.loads(d)
        d.pop("payload")
        data_list.append(d)

    file_name = f"data/{year}/{month:02d}/{day:02d}/{hour:02d}.json.gz"
    with gzip.open(file_name, "wt", encoding="utf-8") as f:
        json.dump(data_list, f)
2 Views