Robert Kowalski
12/22/2021, 2:46 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 188, in interrupt_if_cancelling
flow_run_info = self.client.get_flow_run_info(flow_run_id)
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1564, in get_flow_run_info
raise ClientError('Flow run ID not found: "{}"'.format(flow_run_id))
prefect.exceptions.ClientError: Flow run ID not found: "0695cb92-7995-43b1-abf7-6500eb7e9fc0"
Flow freeze on one task, this task insert data to influxdb. I have two instance of this task with two different database config. This two tasks are execute in the same time. One off instance execute correctly, second task never ends. Does anybody have an idea what might be causing this log error or why the task is not ending?config = load_config(config_path=Path(f'{parent_dir}/config.yaml'))
defaults = config.get('defaults')
flow_name = 'adjustment_creator'
storage = Docker(
env_vars={
"PYTHONPATH": "$PYTHONPATH:/pipeline",
"PACKAGES_REGISTRY_USER": os.environ.get("PACKAGES_REGISTRY_USER"),
"PACKAGES_REGISTRY_TOKEN": os.environ.get("PACKAGES_REGISTRY_TOKEN"),
},
files={f'{parent_dir}': '/pipeline'},
image_tag=os.environ.get('IMAGE_TAG'),
image_name=flow_name,
registry_url="<http://gitlab.algopolis.ai:4567/algopolis/dataengine|gitlab.algopolis.ai:4567/algopolis/dataengine>",
stored_as_script=True,
path='/pipeline/flow.py',
extra_dockerfile_commands=[
'WORKDIR /pipeline',
'RUN pip install poetry',
'RUN poetry config virtualenvs.create false',
'RUN poetry config http-basic.influx_wrapper $PACKAGES_REGISTRY_USER $PACKAGES_REGISTRY_TOKEN',
'RUN poetry install --no-dev'
]
)
algoseek_data_provider = AlgoseekDataProvider()
data_filter = AdjustmentsFilter()
data_splitter = AdjustmentsSplitter()
factors_calculator = FactorsCalculator()
influx_db_config = InfluxConfig(**config.get('databases').get('influxdb'))
influx_db_dev_config = InfluxConfig(**config.get('databases').get('influxdb-dev'))
factors_inserter = FactorsInserter(influx_db_config=influx_db_config)
factors_inserter_dev = FactorsInserter(influx_db_config=influx_db_dev_config, name='factors_inserter_dev')
def dynamic_executor():
return LocalCluster(n_workers=prefect.context.parameters["workers_quantity"])
with Flow(name=flow_name,
executor=DaskExecutor(cluster_class=dynamic_executor),
storage=storage,
result=LocalResult()) as flow:
workers_quantity = Parameter("workers_quantity", default=int(defaults.get('workers_quantity')))
flow.add_task(workers_quantity)
ticker_filter = Parameter('ticker_filter', default='all')
algoseek_data_path = Parameter('algoseek_data_path', default=defaults.get('algoseek_data_root_path'))
algoseek_data = algoseek_data_provider.bind(algoseek_data_root_path=algoseek_data_path)
filtered_data = data_filter.bind(adjustments=algoseek_data,
tickers_filter=ticker_filter)
split_data = data_splitter.bind(adjustments=filtered_data, max_groups_quantity=workers_quantity)
tickers_factors = factors_calculator.map(grouped_adjustments=split_data)
inserter = factors_inserter.map(tickers_factors=tickers_factors)
inserter_dev = factors_inserter_dev.map(tickers_factors=tickers_factors)
Anna Geller
12/22/2021, 2:52 PMRobert Kowalski
12/23/2021, 8:42 AM