• g

    Gary

    2 years ago
    Hi folks, we are evaluating Prefect to handle our data flow. By far all things are great for our needs. The only concern is we are not sure is it okay if we generate massive number of mapped tasks? One of our scenario is to use Prefect to perform crawl, calculate and store financial data (e.g., financial statement and daily trading data of US stocks). For the purpose, here is the simplified example code:
    @task
    def get_stock_id_list():
      # query stock id list from database
      stock_id_list = query_stock_id_list_from_db()
      return stock_id_list
    
    @task
    def crawl_stock_data_from_external_website(stock_id):
      # Crawler related work
      return crawl_stock_data(stock_id)
    
    @task
    def perform_calculation(crawled_data):
      # Some calculation with Pandas
      perform_some_calculation(crawled_data)
    
    
    with Flow('Example flow') as flow:
      # The number of stock id in list is about ten thousand.
      stock_id_list = get_stock_id_list()
    
      crawled_data_list = crawl_stock_data_from_external_website.map(stock_id_list)
    
      perform_calculation.map(crawled_data_list)
    Is it okay for Prefect to generate about 10,000~50,000 mapped tasks within the above flow without any problem? Another scenario is to generate about 1 million tasks within a flow. In this scenario, we query user id list from our database, and perform some calculation about user behavior analysis, (One user is mapped to one mapped task.) Is it okay? Or is there a better way to do this?
    g
    Chris White
    +2
    10 replies
    Copy to Clipboard
  • s

    simone

    2 years ago
    Hi I am trying to port our image processing pipeline to prefect. The analysis we run relays on a couple of configuration yaml file that are loaded from each process when the analysis runs in parallel. I was just wandering if I should keep this approach or add the parameters to the
    prefect.context
    and avoid the file loading. Is there any reason why I should not have the config data added to the context. Thanks a lot!
    s
    j
    2 replies
    Copy to Clipboard
  • a

    Arlo Bryer

    2 years ago
    Hi everyone - could somebody tell me if there's a way to ensure that a flow doesn't start if the previous run of the flow hasn't yet finished?
  • a

    Arlo Bryer

    2 years ago
    i.e. for a scheduled run of a flow (say every two hours), if a flow that started at t0 is still running when the flow at t1(2h later) should kick off, the t1 flow is delayed until t0 has finished. This then propagating to t2, t3, ..., tn?
    a
    j
    2 replies
    Copy to Clipboard
  • i

    Iain Dillingham

    2 years ago
    Hi everyone. I have a mapped task that returns a list of objects; actually it returns a list of lists of objects, because it's mapped. I'd like to flatten the list of lists, before passing the list of objects to another task. I have written my own flatten task, but was wondering whether I'd missed something obvious: an idiom, a built-in function, etc. I'd appreciate the community's thoughts 🙂
    i
    j
    3 replies
    Copy to Clipboard
  • Alfie

    Alfie

    2 years ago
    Hi Team, a question about Functional API, can I specify upstream of a task manually? I want to have a task which is just wait for a few seconds, and it’s upstream of a task executes some actions
    Alfie
    j
    6 replies
    Copy to Clipboard
  • Alfie

    Alfie

    2 years ago
    thanks
  • a

    Avi A

    2 years ago
    Hey, I’m using
    LocalDaskExecutor
    . It has been working well for a while but I’m getting the following errors in the last couple of days, in the middle of running a long flow. IDK what changed, but it seems that the process is unable to spawn a new thread of something. So I reset the server to make sure that stale processes/threads don’t exist in the system, but I’m still getting these.
  • a

    Avi A

    2 years ago
    OpenBLAS blas_thread_init: pthread_create failed for thread 29 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
    OpenBLAS blas_thread_init: pthread_create failed for thread 30 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
    OpenBLAS blas_thread_init: pthread_create failed for thread 31 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
        _run_code(code, mod_globals, init_globals,
      File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
        from prefect.cli import cli
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
        import prefect.triggers
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
        from prefect.engine import signals
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
        exitcode = _main(fd, parent_sentinel)
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 125, in _main
        prepare(preparation_data)
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 236, in prepare
        _fixup_main_from_path(data['init_main_from_path'])
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
        main_content = runpy.run_path(main_path,
      File "/usr/lib/python3.8/runpy.py", line 262, in run_path
        return _run_module_code(code, init_globals, run_name,
      File "/usr/lib/python3.8/runpy.py", line 95, in _run_module_code
        _run_code(code, mod_globals, init_globals,
      File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
        from prefect.cli import cli
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
        import prefect.triggers
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
        from prefect.engine import signals
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/__init__.py", line 8, in <module>
        from prefect.engine.flow_runner import FlowRunner
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 13, in <module>
        from prefect.core import Edge, Flow, Task
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/__init__.py", line 4, in <module>
        from prefect.core.flow import Flow
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/flow.py", line 40, in <module>
        from prefect.environments import Environment
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/__init__.py", line 1, in <module>
        from prefect.environments.execution import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/__init__.py", line 8, in <module>
        from prefect.environments.execution.dask import DaskKubernetesEnvironment
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/__init__.py", line 2, in <module>
        from prefect.environments.execution.dask.cloud_provider import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/cloud_provider.py", line 5, in <module>
        from distributed.deploy.cluster import Cluster
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/__init__.py", line 3, in <module>
        from .actor import Actor, ActorFuture
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/actor.py", line 6, in <module>
        from .client import Future, default_client
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/client.py", line 43, in <module>
        from .batched import BatchedSend
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/batched.py", line 8, in <module>
        from .core import CommClosedError
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/core.py", line 18, in <module>
        from .comm import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/__init__.py", line 1, in <module>
        from .addressing import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/addressing.py", line 5, in <module>
        from ..utils import get_ip_interface
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 97, in <module>
        mp_context = _initialize_mp_context()
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 88, in _initialize_mp_context
        importlib.import_module(pkg)
      File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/__init__.py", line 142, in <module>
        from . import core
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/__init__.py", line 24, in <module>
        from . import multiarray
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/multiarray.py", line 14, in <module>
        from . import overrides
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/overrides.py", line 7, in <module>
        from numpy.core._multiarray_umath import (
    KeyboardInterrupt
  • a

    Avi A

    2 years ago
    I’ve tried using
    DaskExecutor
    in the past but it was even worse because every now and then the execution froze and I stopped receiving logs so I left it for the time being because it’s enough for me to run on a strong machine and don’t need a cluster yet.