Jeremy Phelps
05/06/2021, 12:50 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 6:53 PMJeremy Phelps
05/06/2021, 6:55 PMJeremy Phelps
05/06/2021, 6:55 PMKevin Kho
ECSRun
or KubernetesRun
Jeremy Phelps
05/06/2021, 6:55 PMKevin Kho
UniversalRun
which is the default.Jeremy Phelps
05/06/2021, 6:56 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 6:58 PMclass OurFlow(PrefectFlow):
def __init__(self, name, **kwargs):
super().__init__(name, storage=Docker(registry_url='<http://gcr.io/pict-app|gcr.io/pict-app>',
image_name=name,
base_image='<http://gcr.io/pict-app/our-prefect|gcr.io/pict-app/our-prefect>',
path=path_in_docker(inspect.stack()[1].filename),
stored_as_script=True))
self.executor=DaskExecutor(address='<tcp://dask-scheduler:8786>', debug=True)
All my flows are derived from it.Jeremy Phelps
05/06/2021, 6:58 PMJeremy Phelps
05/06/2021, 6:59 PMJeremy Phelps
05/06/2021, 7:00 PMfrom prefect import Flow as PrefectFlow
Kevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 7:04 PM___init___
method of DaskExecutor takes no such argument.Kevin Kho
Jeremy Phelps
05/06/2021, 7:07 PMdask-scheduler
and the others run dask-worker
with the scheduler server's address specified.Jeremy Phelps
05/06/2021, 7:09 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:10 PMJeremy Phelps
05/06/2021, 7:13 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:49 PMKevin Kho
Jeremy Phelps
05/06/2021, 7:52 PMJeremy Phelps
05/06/2021, 7:53 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/06/2021, 7:56 PMJeremy Phelps
05/06/2021, 7:58 PMJeremy Phelps
05/06/2021, 7:59 PMJeremy Phelps
05/06/2021, 8:00 PMprefect get flow-runs
inside Kubernetes.Jeremy Phelps
05/06/2021, 8:01 PMTraceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 573, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/cli/execute.py", line 49, in flow_run
result = client.graphql(query)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 304, in graphql
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 220, in post
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 446, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 346, in _send_request
response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 590, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 504, in send
raise ConnectTimeout(e, request=request)
requests.exceptions.ConnectTimeout: HTTPConnectionPool(host='prefect-server', port=4200): Max retries exceeded with url: /graphql (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f474a174750>, 'Connection to prefect-server timed out. (connect timeout=30)'))
Jeremy Phelps
05/06/2021, 8:17 PM[2021-05-06 14:33:38-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:13:47-0500] INFO - prefect.DaskExecutor | Stopping executor, waiting for 1 active tasks to complete
[2021-05-06 15:14:12-0500] ERROR - prefect.CloudFlowRunner | Heartbeat process died with exit code 1
Is there any way to tell which "active task" might be referenced here?Jeremy Phelps
05/06/2021, 8:18 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:20 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:22 PMJeremy Phelps
05/06/2021, 8:22 PMJeremy Phelps
05/06/2021, 8:24 PMKevin Kho
export PREFECT__LOGGING__LEVEL=DEBUG
Jeremy Phelps
05/06/2021, 8:25 PMprefect agent kubernetes start --log-level DEBUG
not good enough?Kevin Kho
Jeremy Phelps
05/06/2021, 8:27 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:32 PM-e PREFECT__LOGGING__LEVEL=DEBUG
to the command line.Jeremy Phelps
05/06/2021, 8:35 PM[2021-05-06 15:33:11-0500] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'demand-forecasting-delivery-scheduler'
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Using executor type DaskExecutor
[2021-05-06 15:33:11-0500] DEBUG - prefect.CloudFlowRunner | Flow 'demand-forecasting-delivery-scheduler': Handling state change from Scheduled to Running
[2021-05-06 15:33:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:41-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:33:56-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:11-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:26-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
[2021-05-06 15:34:42-0500] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
Jeremy Phelps
05/06/2021, 8:37 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:39 PMdemand_forecast_status = demand_forecast_for_stockwell.map(unmapped(forecasting_job), stockwell_ids)
Both parameters are return values of tasks that have completed.Jeremy Phelps
05/06/2021, 8:52 PMstockwell_ids
array had only one value in it (of type int
), meaning only one task. Due to local memory constraints I had to keep it small.
Now I'm running it in Google Cloud, and trying to map over hundreds of integers. Perhaps this has something to do with why things are failing?Kevin Kho
Jeremy Phelps
05/06/2021, 8:56 PMKevin Kho
Jeremy Phelps
05/06/2021, 8:57 PMJeremy Phelps
05/07/2021, 8:39 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/07/2021, 9:21 PMJeremy Phelps
05/07/2021, 9:22 PMKevin Kho
Jeremy Phelps
05/07/2021, 9:26 PMJeremy Phelps
05/07/2021, 9:29 PMJeremy Phelps
05/07/2021, 9:34 PMKevin Kho
Jeremy Phelps
05/07/2021, 9:43 PM@task
def stockwell_ids_for_metro(metro_id):
return hundreds_of_integers()
from collections import namedtuple
@task
def create_job(tag):
return namedtuple('JobId', ['tag'])(tag=tag)
@task
def hello_world(job, stockwell_id):
prefect.context.get('logger').info('Called with {} and {}'.format(forecasting_job, stockwell_id))
return stockwell_id
with OurFlow('freezing-flow') as flow:
forecasting_job = create_job('1')
stockwell_ids = stockwell_ids_for_metro(1)
# hello_world doesn't run.
hello_world.map(unmapped(forecasting_job), stockwell_ids)
Jeremy Phelps
05/07/2021, 9:50 PM@task
decorator:
class OurTask(prefect.tasks.core.function.FunctionTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_mapped = False
def map(self, *args, **kwargs):
mapped_task = super().map(*args, **kwargs)
mapped_task.is_mapped = True
return mapped_task
def task(fn=None, log_stdout=True, **task_init_kwargs):
if fn is None:
return lambda fn: OurTask(fn=fn, log_stdout=log_stdout, **task_init_kwargs)
return OurTask(fn=fn, **task_init_kwargs)
The purpose of this is to be able to dynamically tell if a given Task object represents a mapped task or not.Jeremy Phelps
05/12/2021, 2:13 PMKevin Kho
Kevin Kho
Jeremy Phelps
05/12/2021, 2:48 PMKevin Kho
Zanie
Jeremy Phelps
05/12/2021, 3:03 PMZanie
Jeremy Phelps
05/12/2021, 3:08 PMZanie
Zanie