Jeremy Phelps
05/06/2021, 12:50 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 6:53 PMJeremy Phelps
05/06/2021, 6:55 PMJeremy Phelps
05/06/2021, 6:55 PMKevin Kho
ECSRun or KubernetesRunJeremy Phelps
05/06/2021, 6:55 PMKevin Kho
UniversalRun which is the default.Jeremy Phelps
05/06/2021, 6:56 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 6:58 PMclass OurFlow(PrefectFlow):
    def __init__(self, name, **kwargs):
        super().__init__(name, storage=Docker(registry_url='<http://gcr.io/pict-app|gcr.io/pict-app>',
                                              image_name=name,
                                              base_image='<http://gcr.io/pict-app/our-prefect|gcr.io/pict-app/our-prefect>',
                                              path=path_in_docker(inspect.stack()[1].filename),
                                              stored_as_script=True))
        self.executor=DaskExecutor(address='<tcp://dask-scheduler:8786>', debug=True)
All my flows are derived from it.Jeremy Phelps
05/06/2021, 6:58 PMJeremy Phelps
05/06/2021, 6:59 PMJeremy Phelps
05/06/2021, 7:00 PMfrom prefect import Flow as PrefectFlowKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 7:04 PM___init___ method of DaskExecutor takes no such argument.Kevin Kho
Jeremy Phelps
05/06/2021, 7:07 PMdask-scheduler and the others run dask-worker with the scheduler server's address specified.Jeremy Phelps
05/06/2021, 7:09 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:10 PMJeremy Phelps
05/06/2021, 7:13 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:49 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:52 PMJeremy Phelps
05/06/2021, 7:53 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 7:56 PMJeremy Phelps
05/06/2021, 7:58 PMJeremy Phelps
05/06/2021, 7:59 PMJeremy Phelps
05/06/2021, 8:00 PMprefect get flow-runs inside Kubernetes.Jeremy Phelps
05/06/2021, 8:01 PMTraceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 573, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/cli/execute.py", line 49, in flow_run
    result = client.graphql(query)
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 304, in graphql
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 220, in post
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 446, in _request
    session=session, method=method, url=url, params=params, headers=headers
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 346, in _send_request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 504, in send
    raise ConnectTimeout(e, request=request)
requests.exceptions.ConnectTimeout: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))Jeremy Phelps
05/06/2021, 8:17 PM[2021-05-06 14:33:38-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:13:47-0500] INFO - prefect.DaskExecutor | Stopping executor, waiting for 1 active tasks to complete
[2021-05-06 15:14:12-0500] ERROR - prefect.CloudFlowRunner | Heartbeat process died with exit code 1
Is there any way to tell which "active task" might be referenced here?Jeremy Phelps
05/06/2021, 8:18 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:20 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:22 PMJeremy Phelps
05/06/2021, 8:22 PMJeremy Phelps
05/06/2021, 8:24 PMKevin Kho
export PREFECT__LOGGING__LEVEL=DEBUGJeremy Phelps
05/06/2021, 8:25 PMprefect agent kubernetes start --log-level DEBUG  not good enough?Kevin Kho
Jeremy Phelps
05/06/2021, 8:27 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:32 PM-e PREFECT__LOGGING__LEVEL=DEBUG to the command line.Jeremy Phelps
05/06/2021, 8:35 PM[2021-05-06 15:33:11-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Using executor type DaskExecutor
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Flow 'demand-forecasting-delivery-scheduler': Handling state change from Scheduled to Running
[2021-05-06 15:33:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:41-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:56-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:11-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:42-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...Jeremy Phelps
05/06/2021, 8:37 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:39 PMdemand_forecast_status = demand_forecast_for_stockwell.map(unmapped(forecasting_job), stockwell_ids)
Both parameters are return values of tasks that have completed.Jeremy Phelps
05/06/2021, 8:52 PMstockwell_ids array had only one value in it (of type int), meaning only one task. Due to local memory constraints I had to keep it small.
Now I'm running it in Google Cloud, and trying to map over hundreds of integers. Perhaps this has something to do with why things are failing?Kevin Kho
Jeremy Phelps
05/06/2021, 8:56 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:57 PMJeremy Phelps
05/07/2021, 8:39 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/07/2021, 9:21 PMJeremy Phelps
05/07/2021, 9:22 PMKevin Kho
Jeremy Phelps
05/07/2021, 9:26 PMJeremy Phelps
05/07/2021, 9:29 PMJeremy Phelps
05/07/2021, 9:34 PMKevin Kho
Jeremy Phelps
05/07/2021, 9:43 PM@task
def stockwell_ids_for_metro(metro_id):
    return hundreds_of_integers()
from collections import namedtuple
@task
def create_job(tag):
    return namedtuple('JobId', ['tag'])(tag=tag)
@task
def hello_world(job, stockwell_id):
    prefect.context.get('logger').info('Called with {} and {}'.format(forecasting_job, stockwell_id))
    return stockwell_id
with OurFlow('freezing-flow') as flow:
    forecasting_job = create_job('1')
    stockwell_ids = stockwell_ids_for_metro(1)
    # hello_world doesn't run.
    hello_world.map(unmapped(forecasting_job), stockwell_ids)Jeremy Phelps
05/07/2021, 9:50 PM@task decorator:
class OurTask(prefect.tasks.core.function.FunctionTask):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.is_mapped = False
    def map(self, *args, **kwargs):
        mapped_task = super().map(*args, **kwargs)
        mapped_task.is_mapped = True
        return mapped_task
def task(fn=None, log_stdout=True, **task_init_kwargs):
    if fn is None:
        return lambda fn: OurTask(fn=fn, log_stdout=log_stdout, **task_init_kwargs)
    return OurTask(fn=fn, **task_init_kwargs)
The purpose of this is to be able to dynamically tell if a given Task object represents a mapped task or not.Jeremy Phelps
05/12/2021, 2:13 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/12/2021, 2:48 PMKevin Kho
Zanie
Jeremy Phelps
05/12/2021, 3:03 PMZanie
Jeremy Phelps
05/12/2021, 3:08 PMZanie
Zanie