Igor Adamski

    Igor Adamski

    10 months ago
    Hi! I’m coming across a very weird behaviour, I’m running a flow where I first map a task and then collect all the mapped runs and bind them into the next task, something like this:
    data_downloader = DataDownloader(b_config=db_config)
    downloaded_data = data_downloader.map(tickers_and_dates=grouped_list,
                                              columns=unmapped(columns)
                                              )
    
    save_data = SaveData(db_config=db_config,
                             override_exists_files=True)
    save_data.bind(data=downloaded_data, 
                  table_name='tmp_events')
    I’m getting very weird behaviour, where some children of the mapped task start running again after completion. This leads to this weird behaviour that the
    SaveData
    task starts running before all the mapped children finish from the
    DataDownloader
    task. See screenshots attached in the thread, it would be great if someone could offer any guidance on this. On the second screenshot, the top bar is the
    DataDownloader
    bar and you can see that the
    SaveData
    bar (just below) starts before the
    DataDownloader
    one is finished
    Anna Geller

    Anna Geller

    10 months ago
    @Igor Adamski can you share the entire flow so that I can try reproduce the issue? Which agent do you use - do you run it on Kubernetes? If you can share your run configuration, it will also be useful for troubleshooting. At first, I see that DataDownloader and SaveData look like imperative API tasks - those would need to be instantiated first before they can be used in the
    with Flow() as flow:
    constructor. Since your flow was killed by Zombie killer - is this a long running job?
    Another issue worth checking: I see you pass db_config to multiple tasks. Can you check whether your database connection gets closed? Otherwise this can also lead to unintended behavior such as zombie processes
    Igor Adamski

    Igor Adamski

    10 months ago
    ok the entire flow is here:
    import os
    import sys
    from pathlib import Path
    
    from prefect import Flow, unmapped, Parameter
    from prefect.engine.results import LocalResult
    from prefect.executors import DaskExecutor
    from prefect.storage import Docker
    
    sys.path.append(os.path.dirname(__file__))
    
    from src.tools import load_config
    from src.tasks.data_downloader import DataDownloader
    from src.tasks.prepare_date_ranges import PrepareDateRanges
    from src.tasks.save_data import SaveData
    from src.tasks.tickers_group_creator import TickersGroupCreator
    from src.tasks.factset_columns_provider import FactsetColumnsProvider
    
    from db.polygon_db.polygon_client import PolygonDbConfig
    from db.factset_db.factset_client import FactsetDbConfig
    
    from src.tasks.all_tickers_provider import AllTickersProvider
    from src.schedulers import GetFactsetDataScheduler
    
    from prefect.run_configs import DockerRun
    
    parent_dir = os.path.dirname(os.path.realpath(__file__))
    
    config = load_config(config_path=Path(f'{parent_dir}/config.yaml'))
    
    flow_name = 'factset_events_tmp'
    
    storage = Docker(
        env_vars={
            "PYTHONPATH": "$PYTHONPATH:/pipeline",
            "REGISTRY_USER": os.environ.get("REGISTRY_USER"),
            "REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN")
        },
        files={
            f'{parent_dir}': '/pipeline',
            f'{parent_dir}/pyproject.toml': 'pyproject.toml',
            f'{parent_dir}/poetry.lock': 'poetry.lock'
        },
        image_tag=os.environ.get('IMAGE_TAG'),
        image_name=flow_name,
        registry_url="<http://gitlab.algopolis.ai:4567/algopolis/dataengine|gitlab.algopolis.ai:4567/algopolis/dataengine>",
        stored_as_script=True,
        path='/pipeline/flow.py',
        #ignore_healthchecks=True,
        extra_dockerfile_commands=[
            'RUN pip install poetry',
            'RUN poetry config virtualenvs.create false',
            'RUN poetry config http-basic.factset_connector $REGISTRY_USER $REGISTRY_TOKEN',
            'RUN echo $REGISTRY_USER',
            'RUN poetry install --no-dev'
        ]
    )
    
    with Flow(
            name=flow_name,
            schedule=GetFactsetDataScheduler(),
            executor=DaskExecutor(cluster_kwargs={"n_workers": 10, "threads_per_worker": 1}),
            storage=storage,
            result=LocalResult(),
            run_config=DockerRun(
                host_config={
                    'network_mode': 'host'
                }
            ),
    ) as flow:
        polygon_db_config = PolygonDbConfig(**config.get('polygon').get('database'))
        factset_db_config = FactsetDbConfig(**config.get('factset').get('database'))
    
        tickers = AllTickersProvider(polygon_db_config=polygon_db_config)
    
        ranges = PrepareDateRanges()
        ranges.bind(all_tickers=tickers)
    
        grouped_tickers = TickersGroupCreator(group_size=20)
        grouped_tickers.bind(tickers=tickers,
                             dates=ranges)
    
        columns = FactsetColumnsProvider(factset_db_config=factset_db_config)
    
        data_downloader = DataDownloader(factset_db_config=factset_db_config)
        downloaded_data = data_downloader.map(tickers_and_dates=grouped_tickers,
                                              columns=unmapped(columns)
                                              )
    
        save_data = SaveData(factset_db_config=factset_db_config,
                             override_exists_files=True)
        save_data.bind(data=downloaded_data, 
                  table_name='tmp_factset_events')
    we use a DockerAgent, not kubernetes
    Anna Geller

    Anna Geller

    10 months ago
    Here is a tiny example how those imperative API tasks are normally initialized and called:
    from prefect import Flow, unmapped
    
    
    data_downloader = DataDownloader(b_config=db_config)
    save_data = SaveData(db_config=db_config, override_exists_files=True)
    
    
    with Flow("igor") as flow:
        download_task = data_downloader()
        downloaded_data = download_task.map(
            tickers_and_dates=grouped_list, columns=unmapped(columns)
        )
        save_data = save_data()
        save_data.bind(data=downloaded_data, table_name="tmp_events")
    You need to initialize the task class, and then you can call it within the flow constructor like a normal function.
    Igor Adamski

    Igor Adamski

    10 months ago
    what kind of errors do you expect in the logs? one more thing I did not mention, if we limit the number of tickers we are loading the data for (from ~1700 to 500) the whole flow works fine, and everything gets saved to the DB, the
    save_data
    task is spawned after all of
    download_data
    is finished and none of the child processes try to start running again once finished - maybe thats a clue?
    Anna Geller

    Anna Geller

    10 months ago
    If you rewrite it a bit so that all classes are initialized first before they are called in a flow, this should work. But can you confirm that all of those DataDownloader, SaveData, FactsetColumnsProvider, TickersGroupCreator, and all other classes you reference in your flow inherit from Prefect Task class? They need to be Prefect tasks, otherwise Prefect cannot run it as part of a Flow. This page explains that https://docs.prefect.io/core/task_library/overview.html#task-library-in-action and compares with the Functional API.
    I think what can be useful for you is to visualize the Flow before you run it. Can you try flow.visualize() and confirm that the Flow and the computational graph (order of tasks) looks as intended?
    Igor Adamski

    Igor Adamski

    10 months ago
    yes everything looks as intended, like you see here
    all tasks inherit from the prefect Task, so I think that is not an issue - the fact that increasing the number of tickers we are doing this for breaks the flow suggests that maybe the issue is around size?
    Anna Geller

    Anna Geller

    10 months ago
    So the main issue here is the “No heartbeat detected”. It can be caused by many things but 95% of cases are due to out of memory errors. Can you confirm that your container has eough memory to perform this computation? Maybe you could increase the memory in your Docker settings?
    Also, since your flow is a long running job (I see over 2 h runtime), it can make sense to change the heartbeat mode to thread - see env variable PREFECT__CLOUD__HEARTBEAT_MODE added:
    storage = Docker(
        env_vars={
            "PYTHONPATH": "$PYTHONPATH:/pipeline",
            "REGISTRY_USER": os.environ.get("REGISTRY_USER"),
            "REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN"),
            "PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
        },
    More on that here: https://docs.prefect.io/orchestration/concepts/flows.html#flow-settings
    And here’s how Kevin explained it in another Slack thread: Prefect has heartbeats which check if your Flow is alive. If Prefect didn’t have heartbeats, flows that lost communication and die would permanently be shown as Running in the UI. 95% of the time, we have seen “no heartbeat detected” as a result of running out of memory. But we have also seen them happen with long-running jobs. We haven’t had a reproducible example yet from the community (we’d love to get one). If you are confident the task will succeed, you can make it a subflow and then turn off heartbeats for that subflow. We also rolled out a recent change you can try where you can configure heartbeats to be threads instead of processes. The documentation for that is here. This was our most recent effort around those.
    Igor Adamski

    Igor Adamski

    10 months ago
    thanks a lot Anna - I’ll rewrite the Flow as you proposed and change the heartbeat configuration and will get back to you
    Hi @Anna Geller, unfortunately this has not helped, after analysing the problem further we think this is a memory issue - however the behaviour of Prefect is a bit unexpected, because we would expect to get some OOM problem clearly reported and we instead get this weird behaviour with the save_data task starting before it should have. Maybe you can look into it in some future releases, let me know how I can further help to provide details about this special case
    Anna Geller

    Anna Geller

    10 months ago
    @Igor Adamski thx for getting back - did you also try adding this env variable?
    "PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
    Overall, Prefect has this privacy barrier so that your code runs on your infrastructure. OOM errors are difficult because it’s an infrastructure-specific issue. Have you tried something to eliminate the OOM error like allocating more resources to the flow?
    @Igor Adamski here is an example showing how to allocate more memory on DockerRun: https://github.com/kvnkho/demos/blob/main/prefect/host_config.py
    Igor Adamski

    Igor Adamski

    10 months ago
    yes we tried that and did not work - I think we can get around the memory issues somehow, but it would be useful to see these in Prefect’s logs