Hi! I’m coming across a very weird behaviour, I’m ...
# ask-community
i
Hi! I’m coming across a very weird behaviour, I’m running a flow where I first map a task and then collect all the mapped runs and bind them into the next task, something like this:
Copy code
data_downloader = DataDownloader(b_config=db_config)
downloaded_data = data_downloader.map(tickers_and_dates=grouped_list,
                                          columns=unmapped(columns)
                                          )

save_data = SaveData(db_config=db_config,
                         override_exists_files=True)
save_data.bind(data=downloaded_data, 
              table_name='tmp_events')
I’m getting very weird behaviour, where some children of the mapped task start running again after completion. This leads to this weird behaviour that the
SaveData
task starts running before all the mapped children finish from the
DataDownloader
task. See screenshots attached in the thread, it would be great if someone could offer any guidance on this. On the second screenshot, the top bar is the
DataDownloader
bar and you can see that the
SaveData
bar (just below) starts before the
DataDownloader
one is finished
a
@Igor Adamski can you share the entire flow so that I can try reproduce the issue? Which agent do you use - do you run it on Kubernetes? If you can share your run configuration, it will also be useful for troubleshooting. At first, I see that DataDownloader and SaveData look like imperative API tasks - those would need to be instantiated first before they can be used in the
with Flow() as flow:
constructor. Since your flow was killed by Zombie killer - is this a long running job?
Another issue worth checking: I see you pass db_config to multiple tasks. Can you check whether your database connection gets closed? Otherwise this can also lead to unintended behavior such as zombie processes
i
ok the entire flow is here:
Copy code
import os
import sys
from pathlib import Path

from prefect import Flow, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.executors import DaskExecutor
from prefect.storage import Docker

sys.path.append(os.path.dirname(__file__))

from src.tools import load_config
from src.tasks.data_downloader import DataDownloader
from src.tasks.prepare_date_ranges import PrepareDateRanges
from src.tasks.save_data import SaveData
from src.tasks.tickers_group_creator import TickersGroupCreator
from src.tasks.factset_columns_provider import FactsetColumnsProvider

from db.polygon_db.polygon_client import PolygonDbConfig
from db.factset_db.factset_client import FactsetDbConfig

from src.tasks.all_tickers_provider import AllTickersProvider
from src.schedulers import GetFactsetDataScheduler

from prefect.run_configs import DockerRun

parent_dir = os.path.dirname(os.path.realpath(__file__))

config = load_config(config_path=Path(f'{parent_dir}/config.yaml'))

flow_name = 'factset_events_tmp'

storage = Docker(
    env_vars={
        "PYTHONPATH": "$PYTHONPATH:/pipeline",
        "REGISTRY_USER": os.environ.get("REGISTRY_USER"),
        "REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN")
    },
    files={
        f'{parent_dir}': '/pipeline',
        f'{parent_dir}/pyproject.toml': 'pyproject.toml',
        f'{parent_dir}/poetry.lock': 'poetry.lock'
    },
    image_tag=os.environ.get('IMAGE_TAG'),
    image_name=flow_name,
    registry_url="<http://gitlab.algopolis.ai:4567/algopolis/dataengine|gitlab.algopolis.ai:4567/algopolis/dataengine>",
    stored_as_script=True,
    path='/pipeline/flow.py',
    #ignore_healthchecks=True,
    extra_dockerfile_commands=[
        'RUN pip install poetry',
        'RUN poetry config virtualenvs.create false',
        'RUN poetry config http-basic.factset_connector $REGISTRY_USER $REGISTRY_TOKEN',
        'RUN echo $REGISTRY_USER',
        'RUN poetry install --no-dev'
    ]
)

with Flow(
        name=flow_name,
        schedule=GetFactsetDataScheduler(),
        executor=DaskExecutor(cluster_kwargs={"n_workers": 10, "threads_per_worker": 1}),
        storage=storage,
        result=LocalResult(),
        run_config=DockerRun(
            host_config={
                'network_mode': 'host'
            }
        ),
) as flow:
    polygon_db_config = PolygonDbConfig(**config.get('polygon').get('database'))
    factset_db_config = FactsetDbConfig(**config.get('factset').get('database'))

    tickers = AllTickersProvider(polygon_db_config=polygon_db_config)

    ranges = PrepareDateRanges()
    ranges.bind(all_tickers=tickers)

    grouped_tickers = TickersGroupCreator(group_size=20)
    grouped_tickers.bind(tickers=tickers,
                         dates=ranges)

    columns = FactsetColumnsProvider(factset_db_config=factset_db_config)

    data_downloader = DataDownloader(factset_db_config=factset_db_config)
    downloaded_data = data_downloader.map(tickers_and_dates=grouped_tickers,
                                          columns=unmapped(columns)
                                          )

    save_data = SaveData(factset_db_config=factset_db_config,
                         override_exists_files=True)
    save_data.bind(data=downloaded_data, 
              table_name='tmp_factset_events')
we use a DockerAgent, not kubernetes
👍 1
a
Here is a tiny example how those imperative API tasks are normally initialized and called:
Copy code
from prefect import Flow, unmapped


data_downloader = DataDownloader(b_config=db_config)
save_data = SaveData(db_config=db_config, override_exists_files=True)


with Flow("igor") as flow:
    download_task = data_downloader()
    downloaded_data = download_task.map(
        tickers_and_dates=grouped_list, columns=unmapped(columns)
    )
    save_data = save_data()
    save_data.bind(data=downloaded_data, table_name="tmp_events")
You need to initialize the task class, and then you can call it within the flow constructor like a normal function.
i
what kind of errors do you expect in the logs? one more thing I did not mention, if we limit the number of tickers we are loading the data for (from ~1700 to 500) the whole flow works fine, and everything gets saved to the DB, the
save_data
task is spawned after all of
download_data
is finished and none of the child processes try to start running again once finished - maybe thats a clue?
a
If you rewrite it a bit so that all classes are initialized first before they are called in a flow, this should work. But can you confirm that all of those DataDownloader, SaveData, FactsetColumnsProvider, TickersGroupCreator, and all other classes you reference in your flow inherit from Prefect Task class? They need to be Prefect tasks, otherwise Prefect cannot run it as part of a Flow. This page explains that https://docs.prefect.io/core/task_library/overview.html#task-library-in-action and compares with the Functional API.
I think what can be useful for you is to visualize the Flow before you run it. Can you try flow.visualize() and confirm that the Flow and the computational graph (order of tasks) looks as intended?
i
yes everything looks as intended, like you see here
all tasks inherit from the prefect Task, so I think that is not an issue - the fact that increasing the number of tickers we are doing this for breaks the flow suggests that maybe the issue is around size?
a
So the main issue here is the “No heartbeat detected”. It can be caused by many things but 95% of cases are due to out of memory errors. Can you confirm that your container has eough memory to perform this computation? Maybe you could increase the memory in your Docker settings?
Also, since your flow is a long running job (I see over 2 h runtime), it can make sense to change the heartbeat mode to thread - see env variable PREFECT__CLOUD__HEARTBEAT_MODE added:
Copy code
storage = Docker(
    env_vars={
        "PYTHONPATH": "$PYTHONPATH:/pipeline",
        "REGISTRY_USER": os.environ.get("REGISTRY_USER"),
        "REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN"),
        "PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
    },
More on that here: https://docs.prefect.io/orchestration/concepts/flows.html#flow-settings
And here’s how Kevin explained it in another Slack thread: Prefect has heartbeats which check if your Flow is alive. If Prefect didn’t have heartbeats, flows that lost communication and die would permanently be shown as Running in the UI. 95% of the time, we have seen “no heartbeat detected” as a result of running out of memory. But we have also seen them happen with long-running jobs. We haven’t had a reproducible example yet from the community (we’d love to get one). If you are confident the task will succeed, you can make it a subflow and then turn off heartbeats for that subflow. We also rolled out a recent change you can try where you can configure heartbeats to be threads instead of processes. The documentation for that is here. This was our most recent effort around those.
i
thanks a lot Anna - I’ll rewrite the Flow as you proposed and change the heartbeat configuration and will get back to you
🙌 1
Hi @Anna Geller, unfortunately this has not helped, after analysing the problem further we think this is a memory issue - however the behaviour of Prefect is a bit unexpected, because we would expect to get some OOM problem clearly reported and we instead get this weird behaviour with the save_data task starting before it should have. Maybe you can look into it in some future releases, let me know how I can further help to provide details about this special case
a
@Igor Adamski thx for getting back - did you also try adding this env variable?
Copy code
"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
Overall, Prefect has this privacy barrier so that your code runs on your infrastructure. OOM errors are difficult because it’s an infrastructure-specific issue. Have you tried something to eliminate the OOM error like allocating more resources to the flow?
@Igor Adamski here is an example showing how to allocate more memory on `DockerRun`: https://github.com/kvnkho/demos/blob/main/prefect/host_config.py
i
yes we tried that and did not work - I think we can get around the memory issues somehow, but it would be useful to see these in Prefect’s logs
👍 1