Wondering if anyone can help with the following us...
# ask-community
k
Wondering if anyone can help with the following use case. I have a project that contains a number of flows, which I would like to register to an internal backend using a json format, but I also need to run the build in a CI/CD system (which includes multiple environments), this means that I need to configure the
run_config
and
storage
attributes for each flow before registration.
Something roughly like:
Copy code
''' registry.py '''
import os

from prefect.storage import S3
from prefect.run_configs import ECSRun


def main():
    storage_bucket = os.environ['PREFECT_FLOWS_STORAGE_BUCKET']
    image = os.environ['PREFECT_FLOWS_IMAGE']

    for flow in MY_FLOWS():
        flow.storage = S3(storage_bucket)
        flow.run_config = ECSRun(image=image)
        flow.serialze()

if __name__ == '__main__':
    main()
but this just produces the serialized flows, I need to combined them into the flows json file somehow. I could manually construct a json file that matches the structure that
prefect build
produces, but that seems rather brittle.
k
Hey @Kyle McChesney, what does your JSON look like?
k
Copy code
{
  "name": "fake_demultiplex",
  "type": "prefect.core.flow.Flow",
  "schedule": null,
  "parameters": [
    {
      "name": "csv_path",
      "outputs": "typing.Any",
      "slug": "csv_path",
      "default": null,
      "type": "prefect.core.parameter.Parameter",
      "required": true,
      "tags": [],
      "__version__": "0.15.3"
    }
  ],
  "tasks": [
    {
      "retry_delay": null,
      "name": "add_umi",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "add_umi-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {
        "sample": {
          "required": true,
          "type": "typing.Any"
        }
      },
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "consolidate",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "consolidate-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {
        "sample": {
          "required": true,
          "type": "typing.Any"
        }
      },
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "csv_path",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "csv_path",
      "auto_generated": false,
      "type": "prefect.core.parameter.Parameter",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {},
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "group",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "group-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {
        "samples": {
          "required": true,
          "type": "typing.Any"
        }
      },
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "identify",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "identify-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {
        "sample": {
          "required": true,
          "type": "typing.Any"
        }
      },
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "load_samples",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "load_samples-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {
        "csv_path": {
          "required": true,
          "type": "typing.Any"
        }
      },
      "__version__": "0.15.3"
    },
    {
      "retry_delay": null,
      "name": "startup",
      "trigger": {
        "fn": "prefect.triggers.all_successful",
        "kwargs": {}
      },
      "cache_validator": {
        "fn": "prefect.engine.cache_validators.never_use",
        "kwargs": {}
      },
      "max_retries": 0,
      "outputs": "typing.Any",
      "slug": "startup-1",
      "auto_generated": false,
      "type": "prefect.tasks.core.function.FunctionTask",
      "cache_key": null,
      "timeout": null,
      "tags": [],
      "cache_for": null,
      "skip_on_upstream_skip": true,
      "inputs": {},
      "__version__": "0.15.3"
    }
  ],
  "edges": [
    {
      "upstream_task": {
        "slug": "add_umi-1",
        "__version__": "0.15.3"
      },
      "downstream_task": {
        "slug": "consolidate-1",
        "__version__": "0.15.3"
      },
      "key": "sample",
      "mapped": true,
      "flattened": false,
      "__version__": "0.15.3"
    },
    {
      "upstream_task": {
        "slug": "consolidate-1",
        "__version__": "0.15.3"
      },
      "downstream_task": {
        "slug": "identify-1",
        "__version__": "0.15.3"
      },
      "key": "sample",
      "mapped": true,
      "flattened": false,
      "__version__": "0.15.3"
    },
    {
      "upstream_task": {
        "slug": "csv_path",
        "__version__": "0.15.3"
      },
      "downstream_task": {
        "slug": "load_samples-1",
        "__version__": "0.15.3"
      },
      "key": "csv_path",
      "mapped": false,
      "flattened": false,
      "__version__": "0.15.3"
    },
    {
      "upstream_task": {
        "slug": "identify-1",
        "__version__": "0.15.3"
      },
      "downstream_task": {
        "slug": "group-1",
        "__version__": "0.15.3"
      },
      "key": "samples",
      "mapped": false,
      "flattened": false,
      "__version__": "0.15.3"
    },
    {
      "upstream_task": {
        "slug": "load_samples-1",
        "__version__": "0.15.3"
      },
      "downstream_task": {
        "slug": "add_umi-1",
        "__version__": "0.15.3"
      },
      "key": "sample",
      "mapped": true,
      "flattened": false,
      "__version__": "0.15.3"
    }
  ],
  "reference_tasks": [],
  "environment": null,
  "run_config": {
    "task_definition": null,
    "cpu": null,
    "memory": null,
    "task_definition_arn": null,
    "execution_role_arn": null,
    "task_role_arn": null,
    "labels": [],
    "env": null,
    "run_task_kwargs": null,
    "image": "IMAGE",
    "task_definition_path": null,
    "__version__": "0.15.3",
    "type": "ECSRun"
  },
  "__version__": "0.15.3",
  "storage": {
    "secrets": [],
    "key": null,
    "client_options": null,
    "flows": {},
    "bucket": "BUCKET",
    "stored_as_script": false,
    "__version__": "0.15.3",
    "type": "S3"
  }
}
This is
json.dumps(flow.serialize())
seems like the JSON serialize output from prefect is more or less just
{'version': 1, 'flows': [....]}
where flows is just an array of jsons mathcing flow.serialize
but i suppose if that changes internally to prefect it would break
k
I am a bit confused why the approach is to replicate the
prefect build ..
? It feels like you can call the Python script in CI/CD and you can rotate the storage and run config in the script? Or am I missing something?
k
ah yes, the final issue has to do with networking. Our server is behind a VPN which is not accessible from our CI/CD system. I am hoping to generate the config in json format via CI/CD and then upload it to S3. A lambda trigger will occur from the upload, grab the JSON file and do the registration (its running in the VPC and can talk to the server). We could do everything in the lambda, but I was hoping to keep stuff a bit more isolated and keep the lambda “dumb” so to speak
k
oh wow! then yes I think you need to be updating the JSON. Maybe you can use this function to edit the Flow in the Lambda?
Seems cumbersome though because you’d need to install Prefect on the lambda. Do you already have it?
k
No, I was hoping to just use graphql to do the registration
the lambda is a generic CI helper lambda for tasks like this
Copy code
def main():
    configure_logging()
    <http://LOGGER.info|LOGGER.info>('Starting flow serialization process')

    storage_bucket = os.environ['PREFECT_FLOWS_STORAGE_BUCKET']
    image = os.environ['PREFECT_FLOWS_IMAGE']
    ci_commit_short_sha = os.environ['CI_COMMIT_SHORT_SHA']
    image_branch_tag = os.environ['IMAGE_BRANCH_TAG']

    <http://LOGGER.info|LOGGER.info>('Registry: %s', REGISTRY)
    <http://LOGGER.info|LOGGER.info>('Storage Bucket: %s', storage_bucket)
    <http://LOGGER.info|LOGGER.info>('Image: %s', image)
    <http://LOGGER.info|LOGGER.info>('commit SHA: %s', ci_commit_short_sha)
    <http://LOGGER.info|LOGGER.info>('branch tag: %s', image_branch_tag)

    s3_client = boto3.client('s3')

    for project, modules in REGISTRY.items():

        project_flows = []

        for module in modules:        
            <http://LOGGER.info|LOGGER.info>('Processing %s in %s', module, project)
            flow = import_module(f'{ROOT}.{project}.{module}').flow
            flow.storage = S3(storage_bucket)
            flow.run_config = ECSRun(image=image)
            project_flows.append(flow.serialize())

        project_flow_payload = json.dumps({
            'version': 1,
            'flows': project_flows,
        })
        project_flow_key = UPLOAD_KEY.format(
            project=project,
            image_branch_tag=image_branch_tag,
            ci_commit_short_sha=ci_commit_short_sha,
        )

        <http://LOGGER.info|LOGGER.info>('Uploading %s flows to %s', len(project_flows), project_flow_key)

        s3_client.put_object(
            Body=project_flow_payload,
            Bucket=storage_bucket,
            Key=project_flow_key,
        )
This sample produces a json file that matches the output produced by
prefect build
, so 🤞 the format doesn’t change too much
I am working through this example a bit more, and it seems like I will be doing a create flow mutation from the lambda for each flow, which means I just need to store
flow.serialize
in the JSON accurately. Maintaining parity with
flow build
is not a requirement
👍 1
okay so following up on this @Kevin Kho, I think I have to retract my previous statement,
flow.serialize
and
prefect build
do behave a bit differently. I am experiencing an issue where my flows are getting registered on the backend but not created in s3 storage. So the
create_flow
graphql call doesn’t fail, the new version is created on the backend, but nothing is generated in the s3 storage bucket. I think I tracked it down to the
storage
block. Here is what it looks like if I run `prefect build`:
Copy code
"storage": {
    "__version__": "0.15.3",
    "bucket": "MY-BUCKET",
    "client_options": null,
    "flows": {
        "MY_FLOW": "MY_FLOW/2021-08-10t14-52-35-372895-00-00"
    },
    "key": null,
    "secrets": [],
    "stored_as_script": false,
    "type": "S3"
}
But if I just run `flow.serialize()`:
Copy code
"storage": {
    "__version__": "0.15.3",
    "bucket": "MY-BUCKET",
    "client_options": null,
    "flows": {},
    "key": null,
    "secrets": [],
    "stored_as_script": false,
    "type": "S3"
}
Also, another sort of weird behavior I noticed. When I run
prefect build
on this flow, I get the following logs:
Copy code
>>= prefect build -p prefect_flows/flows/examples/MY_FLOW.py 
Collecting flows...
Processing 'prefect_flows/flows/examples/MY_FLOW.py':
  Building `S3` storage...
[2021-08-10 08:52:35-0600] INFO - prefect.S3 | Uploading MY_FLOW/2021-08-10t14-52-35-372895-00-00 to MY-BUCKET
  Building 'MY_FLOW'... Done
Writing output to 'flows.json'
Which seems to indicate that it uploaded the flow to S3, even though I just called build and not register. I will note that no file was actually created on S3, but it does appear that some AWS api calls were made. I know this because it tried to hit our production environments bucket (the location of the storage for this test) and I did not have the proper AWS profile configured and I got permission denied error.
Thinking through the semantics of this, I think it makes sense. The graphql call to create the flow is not going to automatically upload stuff to s3. Idk why I didn’t think through that
Seems like passing
build=True
to my flow.serialize call does the trick.
k
So actually the team was chatting about this today. If you flow is not
stored_as_script
, it gets uploaded but if it’s
stored_as_script
, it doesn’t. We’re looking to unify that. Seems like you’re good now?
k
I think so, I still need to test it end to end, but my manual test seemed to work. The flow is uploaded to s3 when I pass
build=True
to serialize, and the resulting keys are populated into the serialized JSON that I then use for registration
k
Gotcha
k
this worked, sorry for the delay github was down
👍 1
@Matan Drory
🙌 1