Michael Reynolds

    Michael Reynolds

    2 months ago
    hello, i am hacking away at something and i had a question about prefect 2.0 that i cannot seem to find an answer to.
    Anna Geller

    Anna Geller

    2 months ago
    Hi @Michael Reynolds could you please next time specify the title/main problem in the main message and move details into the thread rather than separate messages? check previous threads to see how it looks like moving your messages here:
    "i believe that i am having a problem running a streaming flow with orion because the engine is trying to checkpoint that task and it has an unserializable object. Or at least, according to docs from pre-orion versions of prefect." "i am wondering if in orion still has this notion of checkpointing tasks? and more importantly... how can i configure this? i cannot find anything relevant looking in the orion branch for the keyword checkpoint" "for reference, this is the error i get when running the app. it is strange because the app runs fine a few times but eventually this error pops up and crashes the flow"
    23:29:26.313 | ERROR   | Flow run 'mustard-quetzal' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
        flow_call
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
        cancellable=True,
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
        await coro
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
        func, *args, cancellable=cancellable, limiter=limiter
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/myapp/main.py", line 35, in run_pipeline
        messages = poll_kafka( conf[ 'kafka' ] ).result()
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
        self._result, timeout=timeout, raise_on_failure=raise_on_failure
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
        return self.__get_result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
        raise self._exception
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
        return final_state.result(raise_on_failure=raise_on_failure)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
        raise data
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
        self._results[run_key] = await run_fn(**run_kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
        client=client,
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
        result, serializer="cloudpickle"
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
        return Completed(data=DataDocument.encode(serializer, result))
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
        blob = lookup_serializer(encoding).dumps(data, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
        data_bytes = cloudpickle.dumps(data)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
        return Pickler.dump(self, obj)
    TypeError: can't pickle cimpl.Message objects
    Could you try moving the
    cimpl.Message
    objects to the task that needs it without passing it between tasks? checkpoint=False is a good approach but this doesn't exist in 2.0 yet
    Michael Reynolds

    Michael Reynolds

    2 months ago
    @Anna Geller sorry i didn't know what the decorum for posting was
    i'll do that in the future
    okay interesting so
    checkpoint = False
    is the correct way to go. is there an issue for adding that support that i could follow?
    Anna Geller

    Anna Geller

    2 months ago
    Yes there is! Follow the #announcements channel and the release-notes tag on Discourse