Igor Adamski
11/05/2021, 10:11 AMdata_downloader = DataDownloader(b_config=db_config)
downloaded_data = data_downloader.map(tickers_and_dates=grouped_list,
columns=unmapped(columns)
)
save_data = SaveData(db_config=db_config,
override_exists_files=True)
save_data.bind(data=downloaded_data,
table_name='tmp_events')
I’m getting very weird behaviour, where some children of the mapped task start running again after completion. This leads to this weird behaviour that the SaveData
task starts running before all the mapped children finish from the DataDownloader
task. See screenshots attached in the thread, it would be great if someone could offer any guidance on this. On the second screenshot, the top bar is the DataDownloader
bar and you can see that the SaveData
bar (just below) starts before the DataDownloader
one is finishedIgor Adamski
11/05/2021, 10:12 AMAnna Geller
with Flow() as flow:
constructor.
Since your flow was killed by Zombie killer - is this a long running job?Anna Geller
Igor Adamski
11/05/2021, 10:37 AMimport os
import sys
from pathlib import Path
from prefect import Flow, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.executors import DaskExecutor
from prefect.storage import Docker
sys.path.append(os.path.dirname(__file__))
from src.tools import load_config
from src.tasks.data_downloader import DataDownloader
from src.tasks.prepare_date_ranges import PrepareDateRanges
from src.tasks.save_data import SaveData
from src.tasks.tickers_group_creator import TickersGroupCreator
from src.tasks.factset_columns_provider import FactsetColumnsProvider
from db.polygon_db.polygon_client import PolygonDbConfig
from db.factset_db.factset_client import FactsetDbConfig
from src.tasks.all_tickers_provider import AllTickersProvider
from src.schedulers import GetFactsetDataScheduler
from prefect.run_configs import DockerRun
parent_dir = os.path.dirname(os.path.realpath(__file__))
config = load_config(config_path=Path(f'{parent_dir}/config.yaml'))
flow_name = 'factset_events_tmp'
storage = Docker(
env_vars={
"PYTHONPATH": "$PYTHONPATH:/pipeline",
"REGISTRY_USER": os.environ.get("REGISTRY_USER"),
"REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN")
},
files={
f'{parent_dir}': '/pipeline',
f'{parent_dir}/pyproject.toml': 'pyproject.toml',
f'{parent_dir}/poetry.lock': 'poetry.lock'
},
image_tag=os.environ.get('IMAGE_TAG'),
image_name=flow_name,
registry_url="<http://gitlab.algopolis.ai:4567/algopolis/dataengine|gitlab.algopolis.ai:4567/algopolis/dataengine>",
stored_as_script=True,
path='/pipeline/flow.py',
#ignore_healthchecks=True,
extra_dockerfile_commands=[
'RUN pip install poetry',
'RUN poetry config virtualenvs.create false',
'RUN poetry config http-basic.factset_connector $REGISTRY_USER $REGISTRY_TOKEN',
'RUN echo $REGISTRY_USER',
'RUN poetry install --no-dev'
]
)
with Flow(
name=flow_name,
schedule=GetFactsetDataScheduler(),
executor=DaskExecutor(cluster_kwargs={"n_workers": 10, "threads_per_worker": 1}),
storage=storage,
result=LocalResult(),
run_config=DockerRun(
host_config={
'network_mode': 'host'
}
),
) as flow:
polygon_db_config = PolygonDbConfig(**config.get('polygon').get('database'))
factset_db_config = FactsetDbConfig(**config.get('factset').get('database'))
tickers = AllTickersProvider(polygon_db_config=polygon_db_config)
ranges = PrepareDateRanges()
ranges.bind(all_tickers=tickers)
grouped_tickers = TickersGroupCreator(group_size=20)
grouped_tickers.bind(tickers=tickers,
dates=ranges)
columns = FactsetColumnsProvider(factset_db_config=factset_db_config)
data_downloader = DataDownloader(factset_db_config=factset_db_config)
downloaded_data = data_downloader.map(tickers_and_dates=grouped_tickers,
columns=unmapped(columns)
)
save_data = SaveData(factset_db_config=factset_db_config,
override_exists_files=True)
save_data.bind(data=downloaded_data,
table_name='tmp_factset_events')
Igor Adamski
11/05/2021, 10:41 AMAnna Geller
from prefect import Flow, unmapped
data_downloader = DataDownloader(b_config=db_config)
save_data = SaveData(db_config=db_config, override_exists_files=True)
with Flow("igor") as flow:
download_task = data_downloader()
downloaded_data = download_task.map(
tickers_and_dates=grouped_list, columns=unmapped(columns)
)
save_data = save_data()
save_data.bind(data=downloaded_data, table_name="tmp_events")
You need to initialize the task class, and then you can call it within the flow constructor like a normal function.Igor Adamski
11/05/2021, 10:45 AMsave_data
task is spawned after all of download_data
is finished and none of the child processes try to start running again once finished - maybe thats a clue?Anna Geller
Anna Geller
Igor Adamski
11/05/2021, 10:47 AMIgor Adamski
11/05/2021, 10:49 AMAnna Geller
Anna Geller
storage = Docker(
env_vars={
"PYTHONPATH": "$PYTHONPATH:/pipeline",
"REGISTRY_USER": os.environ.get("REGISTRY_USER"),
"REGISTRY_TOKEN": os.environ.get("REGISTRY_TOKEN"),
"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
},
More on that here: https://docs.prefect.io/orchestration/concepts/flows.html#flow-settingsAnna Geller
Igor Adamski
11/05/2021, 11:07 AMIgor Adamski
11/08/2021, 10:05 AMAnna Geller
"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"
Overall, Prefect has this privacy barrier so that your code runs on your infrastructure. OOM errors are difficult because it’s an infrastructure-specific issue. Have you tried something to eliminate the OOM error like allocating more resources to the flow?Anna Geller
Igor Adamski
11/08/2021, 10:15 AM