Kyle McChesney
08/09/2021, 4:52 PMrun_config
and storage
attributes for each flow before registration.Kyle McChesney
08/09/2021, 4:54 PM''' registry.py '''
import os
from prefect.storage import S3
from prefect.run_configs import ECSRun
def main():
storage_bucket = os.environ['PREFECT_FLOWS_STORAGE_BUCKET']
image = os.environ['PREFECT_FLOWS_IMAGE']
for flow in MY_FLOWS():
flow.storage = S3(storage_bucket)
flow.run_config = ECSRun(image=image)
flow.serialze()
if __name__ == '__main__':
main()
Kyle McChesney
08/09/2021, 4:55 PMprefect build
produces, but that seems rather brittle.Kevin Kho
Kyle McChesney
08/09/2021, 5:16 PM{
"name": "fake_demultiplex",
"type": "prefect.core.flow.Flow",
"schedule": null,
"parameters": [
{
"name": "csv_path",
"outputs": "typing.Any",
"slug": "csv_path",
"default": null,
"type": "prefect.core.parameter.Parameter",
"required": true,
"tags": [],
"__version__": "0.15.3"
}
],
"tasks": [
{
"retry_delay": null,
"name": "add_umi",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "add_umi-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {
"sample": {
"required": true,
"type": "typing.Any"
}
},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "consolidate",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "consolidate-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {
"sample": {
"required": true,
"type": "typing.Any"
}
},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "csv_path",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "csv_path",
"auto_generated": false,
"type": "prefect.core.parameter.Parameter",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "group",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "group-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {
"samples": {
"required": true,
"type": "typing.Any"
}
},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "identify",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "identify-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {
"sample": {
"required": true,
"type": "typing.Any"
}
},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "load_samples",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "load_samples-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {
"csv_path": {
"required": true,
"type": "typing.Any"
}
},
"__version__": "0.15.3"
},
{
"retry_delay": null,
"name": "startup",
"trigger": {
"fn": "prefect.triggers.all_successful",
"kwargs": {}
},
"cache_validator": {
"fn": "prefect.engine.cache_validators.never_use",
"kwargs": {}
},
"max_retries": 0,
"outputs": "typing.Any",
"slug": "startup-1",
"auto_generated": false,
"type": "prefect.tasks.core.function.FunctionTask",
"cache_key": null,
"timeout": null,
"tags": [],
"cache_for": null,
"skip_on_upstream_skip": true,
"inputs": {},
"__version__": "0.15.3"
}
],
"edges": [
{
"upstream_task": {
"slug": "add_umi-1",
"__version__": "0.15.3"
},
"downstream_task": {
"slug": "consolidate-1",
"__version__": "0.15.3"
},
"key": "sample",
"mapped": true,
"flattened": false,
"__version__": "0.15.3"
},
{
"upstream_task": {
"slug": "consolidate-1",
"__version__": "0.15.3"
},
"downstream_task": {
"slug": "identify-1",
"__version__": "0.15.3"
},
"key": "sample",
"mapped": true,
"flattened": false,
"__version__": "0.15.3"
},
{
"upstream_task": {
"slug": "csv_path",
"__version__": "0.15.3"
},
"downstream_task": {
"slug": "load_samples-1",
"__version__": "0.15.3"
},
"key": "csv_path",
"mapped": false,
"flattened": false,
"__version__": "0.15.3"
},
{
"upstream_task": {
"slug": "identify-1",
"__version__": "0.15.3"
},
"downstream_task": {
"slug": "group-1",
"__version__": "0.15.3"
},
"key": "samples",
"mapped": false,
"flattened": false,
"__version__": "0.15.3"
},
{
"upstream_task": {
"slug": "load_samples-1",
"__version__": "0.15.3"
},
"downstream_task": {
"slug": "add_umi-1",
"__version__": "0.15.3"
},
"key": "sample",
"mapped": true,
"flattened": false,
"__version__": "0.15.3"
}
],
"reference_tasks": [],
"environment": null,
"run_config": {
"task_definition": null,
"cpu": null,
"memory": null,
"task_definition_arn": null,
"execution_role_arn": null,
"task_role_arn": null,
"labels": [],
"env": null,
"run_task_kwargs": null,
"image": "IMAGE",
"task_definition_path": null,
"__version__": "0.15.3",
"type": "ECSRun"
},
"__version__": "0.15.3",
"storage": {
"secrets": [],
"key": null,
"client_options": null,
"flows": {},
"bucket": "BUCKET",
"stored_as_script": false,
"__version__": "0.15.3",
"type": "S3"
}
}
Kyle McChesney
08/09/2021, 5:17 PMjson.dumps(flow.serialize())
Kyle McChesney
08/09/2021, 5:21 PM{'version': 1, 'flows': [....]}
where flows is just an array of jsons mathcing flow.serializeKyle McChesney
08/09/2021, 5:22 PMKevin Kho
prefect build ..
? It feels like you can call the Python script in CI/CD and you can rotate the storage and run config in the script? Or am I missing something?Kyle McChesney
08/09/2021, 5:53 PMKevin Kho
Kevin Kho
Kyle McChesney
08/09/2021, 6:39 PMKyle McChesney
08/09/2021, 6:39 PMKyle McChesney
08/09/2021, 6:40 PMdef main():
configure_logging()
<http://LOGGER.info|LOGGER.info>('Starting flow serialization process')
storage_bucket = os.environ['PREFECT_FLOWS_STORAGE_BUCKET']
image = os.environ['PREFECT_FLOWS_IMAGE']
ci_commit_short_sha = os.environ['CI_COMMIT_SHORT_SHA']
image_branch_tag = os.environ['IMAGE_BRANCH_TAG']
<http://LOGGER.info|LOGGER.info>('Registry: %s', REGISTRY)
<http://LOGGER.info|LOGGER.info>('Storage Bucket: %s', storage_bucket)
<http://LOGGER.info|LOGGER.info>('Image: %s', image)
<http://LOGGER.info|LOGGER.info>('commit SHA: %s', ci_commit_short_sha)
<http://LOGGER.info|LOGGER.info>('branch tag: %s', image_branch_tag)
s3_client = boto3.client('s3')
for project, modules in REGISTRY.items():
project_flows = []
for module in modules:
<http://LOGGER.info|LOGGER.info>('Processing %s in %s', module, project)
flow = import_module(f'{ROOT}.{project}.{module}').flow
flow.storage = S3(storage_bucket)
flow.run_config = ECSRun(image=image)
project_flows.append(flow.serialize())
project_flow_payload = json.dumps({
'version': 1,
'flows': project_flows,
})
project_flow_key = UPLOAD_KEY.format(
project=project,
image_branch_tag=image_branch_tag,
ci_commit_short_sha=ci_commit_short_sha,
)
<http://LOGGER.info|LOGGER.info>('Uploading %s flows to %s', len(project_flows), project_flow_key)
s3_client.put_object(
Body=project_flow_payload,
Bucket=storage_bucket,
Key=project_flow_key,
)
Kyle McChesney
08/09/2021, 6:40 PMprefect build
, so 🤞 the format doesn’t change too muchKyle McChesney
08/09/2021, 7:29 PMflow.serialize
in the JSON accurately. Maintaining parity with flow build
is not a requirementKyle McChesney
08/10/2021, 2:58 PMflow.serialize
and prefect build
do behave a bit differently. I am experiencing an issue where my flows are getting registered on the backend but not created in s3 storage. So the create_flow
graphql call doesn’t fail, the new version is created on the backend, but nothing is generated in the s3 storage bucket. I think I tracked it down to the storage
block.
Here is what it looks like if I run `prefect build`:
"storage": {
"__version__": "0.15.3",
"bucket": "MY-BUCKET",
"client_options": null,
"flows": {
"MY_FLOW": "MY_FLOW/2021-08-10t14-52-35-372895-00-00"
},
"key": null,
"secrets": [],
"stored_as_script": false,
"type": "S3"
}
But if I just run `flow.serialize()`:
"storage": {
"__version__": "0.15.3",
"bucket": "MY-BUCKET",
"client_options": null,
"flows": {},
"key": null,
"secrets": [],
"stored_as_script": false,
"type": "S3"
}
Kyle McChesney
08/10/2021, 3:02 PMprefect build
on this flow, I get the following logs:
>>= prefect build -p prefect_flows/flows/examples/MY_FLOW.py
Collecting flows...
Processing 'prefect_flows/flows/examples/MY_FLOW.py':
Building `S3` storage...
[2021-08-10 08:52:35-0600] INFO - prefect.S3 | Uploading MY_FLOW/2021-08-10t14-52-35-372895-00-00 to MY-BUCKET
Building 'MY_FLOW'... Done
Writing output to 'flows.json'
Which seems to indicate that it uploaded the flow to S3, even though I just called build and not register. I will note that no file was actually created on S3, but it does appear that some AWS api calls were made. I know this because it tried to hit our production environments bucket (the location of the storage for this test) and I did not have the proper AWS profile configured and I got permission denied error.Kyle McChesney
08/10/2021, 3:12 PMKyle McChesney
08/10/2021, 3:17 PMbuild=True
to my flow.serialize call does the trick.Kevin Kho
stored_as_script
, it gets uploaded but if it’s stored_as_script
, it doesn’t. We’re looking to unify that. Seems like you’re good now?Kyle McChesney
08/10/2021, 3:24 PMbuild=True
to serialize, and the resulting keys are populated into the serialized JSON that I then use for registrationKevin Kho
Kyle McChesney
08/10/2021, 5:01 PMKyle McChesney
08/10/2021, 5:09 PM