Title
r

Robert Kowalski

12/22/2021, 2:46 PM
Hi, I have a problem with flow, sometime flow is correctly executed without any errors in logs, but the same flow execution on next day/time never ends. I stop this flow after eg. 2 days of execution time ( correct execution time ~ 3h )and if I rerun the same flow ones again every tasks in flow is finished correctly. I use docker agent and gitlab registry. In agent logs i found this error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 188, in interrupt_if_cancelling
    flow_run_info = self.client.get_flow_run_info(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1564, in get_flow_run_info
    raise ClientError('Flow run ID not found: "{}"'.format(flow_run_id))
prefect.exceptions.ClientError: Flow run ID not found: "0695cb92-7995-43b1-abf7-6500eb7e9fc0"
Flow freeze on one task, this task insert data to influxdb. I have two instance of this task with two different database config. This two tasks are execute in the same time. One off instance execute correctly, second task never ends. Does anybody have an idea what might be causing this log error or why the task is not ending?
config = load_config(config_path=Path(f'{parent_dir}/config.yaml'))
defaults = config.get('defaults')
flow_name = 'adjustment_creator'

storage = Docker(
    env_vars={
        "PYTHONPATH": "$PYTHONPATH:/pipeline",
        "PACKAGES_REGISTRY_USER": os.environ.get("PACKAGES_REGISTRY_USER"),
        "PACKAGES_REGISTRY_TOKEN": os.environ.get("PACKAGES_REGISTRY_TOKEN"),
    },
    files={f'{parent_dir}': '/pipeline'},
    image_tag=os.environ.get('IMAGE_TAG'),
    image_name=flow_name,
    registry_url="<http://gitlab.algopolis.ai:4567/algopolis/dataengine|gitlab.algopolis.ai:4567/algopolis/dataengine>",
    stored_as_script=True,
    path='/pipeline/flow.py',
    extra_dockerfile_commands=[
        'WORKDIR /pipeline',
        'RUN pip install poetry',
        'RUN poetry config virtualenvs.create false',
        'RUN poetry config http-basic.influx_wrapper $PACKAGES_REGISTRY_USER $PACKAGES_REGISTRY_TOKEN',
        'RUN poetry install --no-dev'
    ]
)

algoseek_data_provider = AlgoseekDataProvider()
data_filter = AdjustmentsFilter()
data_splitter = AdjustmentsSplitter()
factors_calculator = FactorsCalculator()
influx_db_config = InfluxConfig(**config.get('databases').get('influxdb'))
influx_db_dev_config = InfluxConfig(**config.get('databases').get('influxdb-dev'))

factors_inserter = FactorsInserter(influx_db_config=influx_db_config)
factors_inserter_dev = FactorsInserter(influx_db_config=influx_db_dev_config, name='factors_inserter_dev')


def dynamic_executor():
    return LocalCluster(n_workers=prefect.context.parameters["workers_quantity"])


with Flow(name=flow_name,
          executor=DaskExecutor(cluster_class=dynamic_executor),
          storage=storage,
          result=LocalResult()) as flow:
    workers_quantity = Parameter("workers_quantity", default=int(defaults.get('workers_quantity')))
    flow.add_task(workers_quantity)
    ticker_filter = Parameter('ticker_filter', default='all')
    algoseek_data_path = Parameter('algoseek_data_path', default=defaults.get('algoseek_data_root_path'))

    algoseek_data = algoseek_data_provider.bind(algoseek_data_root_path=algoseek_data_path)
    filtered_data = data_filter.bind(adjustments=algoseek_data,
                                     tickers_filter=ticker_filter)
    split_data = data_splitter.bind(adjustments=filtered_data, max_groups_quantity=workers_quantity)
    tickers_factors = factors_calculator.map(grouped_adjustments=split_data)

    inserter = factors_inserter.map(tickers_factors=tickers_factors)
    inserter_dev = factors_inserter_dev.map(tickers_factors=tickers_factors)
a

Anna Geller

12/22/2021, 2:52 PM
Is it possible that someone deleted the flow mid-run? I’ve only seen this error so far here when used with a Kubernetes agent
what is also interesting is that I see you use LocalResult with Docker agent? not sure whether those results will be persisted - do you use a bind mount?
r

Robert Kowalski

12/23/2021, 8:42 AM
I will be watching these docker containers, yesterday and today flows execute correctly. yes i use localResult and i don't use any mounting in docker.
👍 1