• Tom Klein

    Tom Klein

    2 months ago
    Hey 🙋 is it somehow possible to do something in between a map and a reduce? 🤔 e.g. - let’s say i have the following pattern: • a huge input CSV (let’s say 200K rows) • that i then split into a ton of tiny CSVs (let’s say 1000 rows each) • then some work is done on each one, and a new output CSV is generated per each tiny CSV • and afterwards i wanna combine the outputs - but in “slightly bigger batches”, i.e. - let’s say in groups of 5 or groups of 10 - and each one of those to turn into a “medium sized” CSV (with 5K or 10K rows) which is then uploaded to S3 obviously, i don’t want the last step (4) that combines them to wait for ALL the tiny CSVs to be processed (so i don’t want it to just be a simple reduce), just enough (e.g. 5, or 10) that could be grouped and uploaded does that make any sense?
    Tom Klein
    Anna Geller
    12 replies
    Copy to Clipboard
  • Enrique

    Enrique

    2 months ago
    Hi all! 🙂 I have a question (maybe it's stupid but here it goes xD) I'm working with Prefect in a Kubernetes environment, which means that the agent and the jobs are running in the cluster. The cluster has 2 CPUs. When I'm trying to run three parallel tasks, only two of them run and then when one of them ends the third one starts running. Is that the expected behavior or I'm doing something wrong? Thanks a lot for your tool, Prefect rocks! 😎
    Enrique
    Anna Geller
    7 replies
    Copy to Clipboard
  • j

    Jason Thomas

    2 months ago
    Hi all, version 2.0b8 I’m playing with flow retries on and got a result I didn’t expect. I’ll post code in the thread, but here’s what’s happening: - flow has one retry - I call flow, passing in a list of 4 unprocessed ‘files’ - retry 0: - flow filters the list to remove any processed files, leaving 4 files to process - flow iterates over the filtered list, calling task on each unprocessed file - task processes each file - on one file, task raises an error - flow continues to run, processing 3 out of 4 files - flow finishes in state
    AwaitingRetry
    - retry 1: - flow filters the list, leaving 1 file to process - flow iterates over the filtered list, but does not call task - flow finishes in state
    Completed
    I’m guessing task is not called on the retry because the result has been cached and there is no reason to try again. That’s fine. However, I expected the final state of the task to be
    Failed
    .
    Is this behaving as intended?
    j
    Anna Geller
    15 replies
    Copy to Clipboard
  • Tom Klein

    Tom Klein

    2 months ago
    Hi again 😄 I just got the error
    No heartbeat detected from the remote task; marking the run as failed
    - this happend for tasks being run in parallel via the dask
    KubeCluster
    looking at our (internal) logs - it seems like all of them died simultaneously, 20+ minutes into their run there’s a total of 58 tasks being run, with 4 workers the first 4 tasks took approx. 1 hour to run and completed successfully, and then the next 4 that were run all seem to have failed simultaneously any idea / help?
    Tom Klein
    Anna Geller
    18 replies
    Copy to Clipboard
  • Shivam Bhatia

    Shivam Bhatia

    2 months ago
    Hi, I am trying to figure out running flows on a docker container (prefect 2.0b8) After the flow run is complete I get an error and my agent stops running. (Error log in the thread) How can i fix this? Any help would be appreciated
    Shivam Bhatia
    2 replies
    Copy to Clipboard
  • ibrahem

    ibrahem

    2 months ago
    Hi ... I'm having an issue with the local agent it has been working fine for days and suddenly after restarting the prefect server the agent wont run!. when I do prefect local agent start it seems that it runs okay in terminal (agent registration successful) but after a while I get this error: requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPConnectionPool(host='localhost', port=4200): Read timed out. (read timeout=15)")) Appreciate your help
    ibrahem
    Octopus
    3 replies
    Copy to Clipboard
  • Kevin Grismore

    Kevin Grismore

    2 months ago
    I'm trying to create a deployment for Prefect Cloud 2.0 for a flow that imports a local package named
    tutorial
    that's in the flow's directory, and when I run
    prefect deployment create deployment.py
    I get
    ModuleNotFoundError: No module named 'tutorial'
    Kevin Grismore
    Noam Cohen
    +1
    9 replies
    Copy to Clipboard
  • Josh Paulin

    Josh Paulin

    2 months ago
    If I have a conditional check in my flow thats something like
    with case(cond, True):
        val1_if_true = action1_if_true()
        val2_if_true = action2_if_true()
    
    
    with case(cond, False):
        val_if_false = action_if_false()
    What (if anything) do I pass to the merge function?
    Josh Paulin
    Anna Geller
    6 replies
    Copy to Clipboard
  • Sabir Ali

    Sabir Ali

    2 months ago
    I am trying to connect elasticsearch clietn but getting exception
    Traceback (most recent call last):
      File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
        print(<http://client.info|client.info>())
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
        return api(*args, **kwargs)
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
        return self.perform_request(  # type: ignore[return-value]
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
        raise UnsupportedProductError(
    elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
    I used following command to install elasticsearch client
    (venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch      
    Collecting elasticsearch
      Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
    Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
    Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
    Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
    Installing collected packages: elasticsearch
    Successfully installed elasticsearch-8.3.1
    WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
    You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
    (venv) sabirali@Sabirs-MacBook-Pro ETL %
    Sabir Ali
    Anna Geller
    13 replies
    Copy to Clipboard
  • d

    Divya

    2 months ago
    Hello all, I am trying to connect to Perfect Server but I am getting the error message as below. Can you please help me figure out the issue or the step that I am missing? The steps taken so far:1. Installed prefect using the pip command 2. Created a flow and ran successfully(screenshot attached). 3. Executed the below commands to start the server: *prefect backend serve*r prefect server start --detach --use-volume --expose Thank you! Best, Divya
    d
    nicholas
    5 replies
    Copy to Clipboard