Riley Hun
11/17/2020, 12:56 AMdaskdev/dask
dockerfile
parser = argparse.ArgumentParser()
parser.add_argument(
'--flow', type=str, required=True,
help='Module path which contains the flow.'
)
parser.add_argument(
'--project', type=str, required=True,
help='Project name in which to register the flow.'
)
parser.add_argument(
'--image_name', type=str, required=True,
help='Name of the image which is associated with the storage'
)
parser.add_argument(
'--image_tag', type=str, required=True,
help='Tag of the image which is associated with the storage'
)
parser.add_argument(
'--env_file_path', type=str,
help='Json file containing environment variables to be stored in the image'
)
parser.add_argument(
'--python_dependencies', type=str,
help='List of python packages to be pip installed in the image, separated by comma'
)
parser.add_argument(
'--gcp_project_id', type=str, required=True,
help='GCP Project name'
)
parser.add_argument(
'--release', type=str, required=True,
help='Name of GKE Cluster'
)
parser.add_argument(
'--dask_scheduler_address', type=str, required=True,
help='Scheduler Address of Dask Cluster'
)
parser.add_argument(
'--tls_ca', type=str, required=True,
help='File path of tls certificate file for authorized access to Dask Cluster'
)
parser.add_argument(
'--tls_key', type=str, required=True,
help='File path of tls key file for authorized access to Dask Cluster'
)
parser.add_argument(
'--dockerfile', type=str, required=True,
help='Path to dockerfile for building flow container'
)
args = parser.parse_args()
# get env vars
envs = {}
if args.env_file_path:
with open(args.env_file_path) as f:
envs = json.load(f)
# get python dependencies
# to do: list all packages in values.yaml as ell
python_dependencies = ["google-cloud-storage",
"google-cloud-logging",
"hvac",
"google-api-core",
"snowflake-sqlalchemy",
"snowflake-connector-python",
"SQLAlchemy",
"requests",
"ravenpackapi",
"pandas",
"prefect",
"lz4",
"fsspec"
]
if args.python_dependencies:
python_dependencies.append(args.python_dependencies.split(','))
flows = [importlib.import_module(i).flow for i in args.flow.split(',')]
print(f'Listing flows: {flows}')
storage = Docker(
image_name=args.image_name,
image_tag=args.image_tag,
dockerfile=args.dockerfile,
registry_url=f'<http://gcr.io/{args.gcp_project_id}/|gcr.io/{args.gcp_project_id}/>',
env_vars=envs,
python_dependencies=python_dependencies
)
security = Security(tls_ca_file=args.tls_ca,
tls_client_cert=args.tls_ca,
tls_client_key=args.tls_key,
require_encryption=True)
for flow in flows:
flow.storage = storage
flow.environment.executor = DaskExecutor(
address=args.dask_scheduler_address,
client_kwargs={'security': security}
)
path = storage.add_flow(flow)
print(f'storing flow {flow.name} at {path} at the image.')
storage = storage.build()
# run shell script for updating dask workers w/ prefect flow image
my_env = dict(os.environ)
my_env['RELEASE'] = args.release
my_env['GCP_PROJECT_ID'] = args.gcp_project_id
my_env['IMAGE_NAME'] = args.image_name
my_env['IMAGE_TAG'] = args.image_tag
subprocess.Popen(['./deployment/setup_dask.sh'], env=my_env)
for flow in flows:
flow.register(project_name=args.project, build=False)
@task
def extract():
"""Get a list of data"""
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('An info message')
return random.sample(range(1, 100), 5)
@task
def transform(data):
"""Multiply the input by 10"""
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('Another info message')
return [i * 10 for i in data]
@task
def load(data):
"""Print the data to indicate it was received"""
print("Here's your data: {}".format(data))
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=60),
interval=timedelta(minutes=5),
)
with Flow('ETL') as flow:
e = extract()
t = transform(e)
l = load(t)
flow.schedule = schedule
Kyle Moon-Wright
11/17/2020, 1:19 AMshow_flow_logs=True
if using flow.run_agent()
or --show-flow-logs
if starting from the CLI
- setting PREFECT__LOGGING__LEVEL="DEBUG"
to get more verbose logging output from your flow runRiley Hun
11/17/2020, 2:04 AM[2020-11-17 07:27:13+0000] INFO - prefect.CloudTaskRunner | Task 'extract': Starting task run...
[2020-11-17 07:27:13+0000] ERROR - prefect.CloudTaskRunner | Task 'extract': Unexpected error while running task: AttributeError("'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?")
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 273, in run
self.set_task_run_name(task_inputs=task_inputs)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 342, in set_task_run_name
task_run_name = self.task.task_run_name
File "/opt/conda/lib/python3.7/site-packages/prefect/tasks/core/function.py", line 69, in __getattr__
f"'FunctionTask' object has no attribute {k}."
AttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have beendecorated with @prefect.task?
I tried looking up this error on the thread but couldn't find anything that could help.Kyle Moon-Wright
11/17/2020, 4:42 PMRiley Hun
11/17/2020, 9:03 PM[2020-11-17 21:02:12+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
No errors in the kubernetes logsKyle Moon-Wright
11/17/2020, 11:16 PMRiley Hun
11/17/2020, 11:35 PMFailed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql