• s

    Scott Moreland

    1 year ago
    Hi all! I'm looking at using Prefect to manage ETL processes that are executed by PySpark on a CDSW/Hadoop environment. I'd like to subclass the Results class---much like what has already been done for Google Cloud and AWS---such that each task can read/write to Hive storage by toggling the Prefect cache variable. I.e. loading a cached task would simply read the spark dataframe created by that task via
    sql_context.read.table
    . Are there any references for this?
    s
    Kyle Moon-Wright
    15 replies
    Copy to Clipboard
  • Joël Luijmes

    Joël Luijmes

    1 year ago
    If I use a Kubernetes agent to run my flows, what would be the recommended way to execute non-python code? Spawn off new Kubernetes jobs in the cluster, or are there other ideas on this?
    Joël Luijmes
    s
    +1
    10 replies
    Copy to Clipboard
  • d

    Dolor Oculus

    1 year ago
    I'm registering a flow on a locally hosted server and would like the extra loggers functionality to be in effect when the scheduled flow runs. It looks like setting the environment variable `
    PREFECT__LOGGING__EXTRA_LOGGERS
    as per https://docs.prefect.io/core/concepts/logging.html#extra-loggers during flow registration isn't sufficient -- I'm not seeing my apps
    logging
    statements in the web ui. Is there a way to get this env variable setting to be in effect for when the app runs?
    d
    emre
    3 replies
    Copy to Clipboard
  • b

    Ben Davison

    1 year ago
    Hiya, trying out the
    DaskKubernetesEnvironment
    and am running into a weird issue.
    b
    Kyle Moon-Wright
    5 replies
    Copy to Clipboard
  • t

    tkanas

    1 year ago
    Hi, I was wondering what people do when coming across parts of objects that are not serializable by Cloudpickle when trying to save some result objects to GCS. The result objects are heavily nested so it is difficult to identify which parts of them are not pickleable. Is it considered poor practice to try to send the whole result object to GCS? Or is there some way around it?
    t
    Kyle Moon-Wright
    +1
    4 replies
    Copy to Clipboard
  • Saulius Beinorius

    Saulius Beinorius

    1 year ago
    Hi, maybe I missed the relevant documentation, but what exactly does a python module need for the CLI to be able to register a flow? Is it a module-level variable called
    flow
    ?
    Saulius Beinorius
    j
    3 replies
    Copy to Clipboard
  • r

    Riley Hun

    1 year ago
    Hi everyone, I'm using the newly announced k8s helm chart, which is awesome by the way! I'm trying to add basic authentication to the apollo server to protect it through an nginx controller on kubernetes. But now the "server cannot be reached". Could anyone please kindly offer up any insight on this? Would be most appreciated! The other alternative I could look into is making the apollo server an internal load balancer which I'll try next.
    r
    Joël Luijmes
    6 replies
    Copy to Clipboard
  • Michelle Wu

    Michelle Wu

    1 year ago
    Hey guys, I have Prefect Server and Dask Scheduler running on machine A and Dask Worker on machine B (machineB has no Prefect or Docker). When I tried to send a job from machine A to machine B to execute by using the task tag described here:https://docs.prefect.io/api/latest/engine/executors.html#executor, error occurred like this on machineB:
    distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
    Traceback (most recent call last):
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/worker.py", line 3268, in loads_function
        result = cache_loads[bytes_object]
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 1573, in __getitem__
        value = super().__getitem__(key)
      File "/anaconda3_a/envs/py37/lib/python3.7/collections/__init__.py", line 1027, in __getitem__
        raise KeyError(key)
    KeyError: b'\x80\x04\x950\x00\x00\x00\x00\x00\x00\x00\x8c\x1dprefect.engine.executors.dask\x94\x8c\n_maybe_run\x94\x93\x94.'
    ...
    ModuleNotFoundError: No module named 'prefect'
    I thought that I must install prefect on machine B for the task to run remotely. So then I installed prefect and reran the task, a new error occurred like this on machineB:
    [2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
    Traceback (most recent call last):
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    ...
    2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    I am really confused by “ConnectionError” and “Unable to write logs to Prefect Cloud”. Do you guys have any idea why this might be happening? Must I install Docker (on machine B) as well for the task to run?
    Michelle Wu
    j
    3 replies
    Copy to Clipboard
  • Saulius Beinorius

    Saulius Beinorius

    1 year ago
    Hey, I'm (yet another) person who would like to have a more flexible logging configuration in Prefect. I was a bit surprised that Prefect does not allow specifying logging via
    logging.dictConfig()
    -like format, especially since Dask supports it (https://docs.dask.org/en/latest/debugging.html#logs). If you could point me in the right direction where the logging setup happens (I think I found the agent's logging setup, but not sure about task logging, I assume that's separate for each execution environment?), I would be willing to work on a PR to allow specifying logging via dict format. For reference, I have uvicorn's server, which has similar options to Prefect, but also allows passing the entire logging configuration.
    Saulius Beinorius
    j
    2 replies
    Copy to Clipboard
  • f

    Florian L

    1 year ago
    Hi, i was wondering what is the best practice to cancel a flow run from one of our task ? Currently i'm raising a
    signal.ENDRUN(state.Cancelled())
    which put my task in the right state, but the following tasks are still executed meaning the trigger all_successfull is still true. Any idea what am i doing wrong ?
    f
    1 replies
    Copy to Clipboard