Sang Young Noh

    Sang Young Noh

    4 months ago
    Hi I'm currently trying to do some deployment testing, and I'm getting the following error: Specification in 'newflow.py', line 13 failed validation! You have not configured default storage on the server or set a storage to use for this deployment but this deployment is using a Universal flow runner which requires remote storage. I've currently got a file called: from prefect import flow @flow def hello_world(name="world"): print(f"Hello {name}!")

    Note: a deployed flow does not need a command to

    explicitly run the flow. The API handles this for you.

    hello_world()

    from prefect.deployments import DeploymentSpec DeploymentSpec( flow=hello_world, name="hello-world-daily", ) and the error comes out when I run the following: prefect deployment create newflow.py
    Anna Geller

    Anna Geller

    4 months ago
    This makes sense - before you create your deployment, you need to configure remote storage. You can choose from S3, GCS, Azure blob. Here is the command you may use:
    prefect storage create --help
    more on that here: https://orion-docs.prefect.io/concepts/storage/
    Valentin Baert

    Valentin Baert

    4 months ago
    I will try a bit the 2.0 version though I'm having an issue When setting up a google cloud storage how do I assign a service account ?
    the documentation is unclear
    it talks about a Service Account Info option but without any code or cli example
    also the cli isn't of any help, the --help lacks more information
    I have a google service account key file as a json but I don't know how to configure it with the storage
    Anna Geller

    Anna Geller

    4 months ago
    use one of those options:
    DeploymentSpec(
        name="gcs",
        flow=hello_flow,
        tags=["local"],
        flow_storage=FileStorageBlock(base_path="<gcs://prefect-orion/flows>"),
    )
    
    DeploymentSpec(
        name="dev",
        flow=hello_flow,
        tags=["local"],
        flow_storage=GoogleCloudStorageBlock(bucket="prefect-orion"),
    )
    assuming GOOGLE_APPLICATION_CREDENTIALS is a string with the path to your JSON service account file:
    import json
    import io
    
    sa_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
    
    with io.open(sa_file, "r", encoding="utf-8") as json_file:
        service_account_info = json.load(json_file)
    
    
    DeploymentSpec(
        name="gcs",
        flow=hello_flow,
        tags=["local"],
        flow_storage=GoogleCloudStorageBlock(
            bucket="prefect-orion",
            service_account_info=service_account_info
        ),
    )
    you may also do:
    export GOOGLE_APPLICATION_CREDENTIALS=/Users/path/to/your/sa.json
    and use GCS default storage
    Sang Young Noh

    Sang Young Noh

    4 months ago
    Hello again - once we have a series of deployments ready, is it possible to deploy without having to use the CLI? So in the deployment.py, instead of doing 'prefect deployment create deployment.py', is there a way I can embedd the code within to deploy the clusters? I suppose I am curious also for how to do this for creatign work-queues and agents. Thanks in advance!
    Valentin Baert

    Valentin Baert

    4 months ago
    ok thank you, I tried but failed I'll stick with 1.0 for the time being and I'll wait for the official release. Maybe for expert users the documentation is enough but for a beginner like me the 1.0 doc is much better. 2.0 is too vague for me and lacks clear step by step intructions for setting up google cloud storage. Right now it is just a single sentence with a obscure table listing settings without any code example.https://orion-docs.prefect.io/concepts/storage/#google-cloud-storage
    Anna Geller

    Anna Geller

    4 months ago
    @Sang Young Noh deployment from CLI is essentially also an API call, therefore you could do it in some other way but curious to hear why would you do it from anything different than CLI? I believe most users will create deployments from some CI/CD system that's why CLI commands provide the best UX so far
    @Valentin Baert I can understand your frustration and I'm aware we need more examples, but curious - did you try the GCS
    DeploymentSpec
    examples I posted here? what error did you get when creating a deployment with GCS storage? I'm sure we can sort this out together so this shouldn't be the only reason to stick to Prefect 1.0
    Valentin Baert

    Valentin Baert

    4 months ago
    I'm still trying, I managed to assemble various documentation on multiple pages to get something working for a simple use case to me : deploy an agent on a gke cluster, connect it to prefect orion cloud, use a gcs storage and run a flow as a kubernetes job
    very hard to find the correct information because it is scattered everywhere and most of the time incomplete
    now I manage to make the agent pickup the flow from a queue but it fails to start a pod it seems
    api Starting...
    api  ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
    api | _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
    api |  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
    api |_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|
    api Configure Prefect to communicate with the server with:
    api     prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
    api Check out the dashboard at <http://0.0.0.0:4200>
    api INFO:     Started server process [9[]
    api INFO:     Waiting for application startup.
    api INFO:     Application startup complete.
    api INFO:     Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
    agent Starting agent connected to <https://api-beta.prefect.io/api/accounts/8f6464fb-3d>
    agent 6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0...
    agent   ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
    agent  | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
    agent  |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
    agent  |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|
    agent Agent started! Looking for work from queue 'kubernetes'...
    agent 09:01:07.562 | INFO    | prefect.agent - Submitting flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
    agent 09:01:07.699 | INFO    | prefect.flow_runner.kubernetes - RUNNING
    agent 09:01:07.906 | INFO    | prefect.flow_runner.kubernetes - Flow run 'charming-gopher' has job settings = {'metadata': {'generateName': 'charming-gopher', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '1e14e8b2-11b5-432a-802b-fb8f57fd68b5', 'io.prefect.flow-run-name': 'charming-gopher', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'prefecthq/prefect:2.0b4-python3.8', 'command': ['python', '-m', 'prefect.engine', '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'], 'env': [{'name': 'PREFECT_API_URL', 'value': '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'}, {'name': 'PREFECT_API_KEY', 'value': 'pnu_iyYoMBQYyonrYPzQNGvCwI3nDsst4Q4erhiy'}]}]}}, 'backoff_limit': 4}}
    agent 09:01:07.934 | ERROR   | prefect.agent - Flow runner failed to submit flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
    agent Traceback (most recent call last):
    agent   File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 136, in submit_run
    agent     await self.task_group.start(flow_runner.submit_flow_run, flow_run)
    agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 701, in start
    agent     return await future
    agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 827, in submit_flow_run
    agent     job_name = await run_sync_in_worker_thread(self._create_and_start_job, flow_run)
    agent   File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    agent     return await anyio.to_thread.run_sync(call, cancellable=True)
    agent   File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    agent     return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
    agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    agent     return await future
    agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    agent     result = context.run(func, *args)
    agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 1016, in _create_and_start_job
    agent     job = self.batch_client.create_namespaced_job(self.namespace, job_settings)
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
    agent     return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
    agent     return self.api_client.call_api(
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    agent     return self.__call_api(resource_path, method,
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    agent     response_data = self.request(
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
    agent     return <http://self.rest_client.POST|self.rest_client.POST>(url,
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
    agent     return self.request("POST", url,
    agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
    agent     raise ApiException(http_resp=r)
    agent kubernetes.client.exceptions.ApiException: (403)
    agent Reason: Forbidden
    agent HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cd7f296b-0444-4ac4-b4ec-dc88b9843534', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '27d5e504-aa65-49d9-8f50-ef3c4aa090aa', 'X-Kubernetes-Pf-Prioritylevel-Uid': '1894d569-9152-44eb-848e-bd2a893b4c22', 'Date': 'Wed, 18 May 2022 09:01:07 GMT', 'Content-Length': '313'})
    agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
    Anna Geller

    Anna Geller

    4 months ago
    nice work you got the GCS storage setup working! so KubernetesFlowRunner is the issue now? (just to make it clear for me)
    Valentin Baert

    Valentin Baert

    4 months ago
    current flow
    import io
    import json
    import os
    import sys
    
    from confluent_kafka import Consumer, KafkaError, KafkaException
    from prefect import flow, task, get_run_logger
    from prefect.blocks.storage import GoogleCloudStorageBlock
    from prefect.deployments import DeploymentSpec
    from prefect.flow_runners import KubernetesFlowRunner
    
    class TopicConsumer:
    
        def __init__(self, topics, config, commits_every_n_messages=1):
            self.topics = topics
            self.config = config
            self.commits_every_n_messages = commits_every_n_messages
            self.consumer = Consumer(config)
            self.running = False
    
        def consume(self):
            try:
                self.consumer.subscribe(self.topics)
                print("Subscribed to topics: {}".format(self.topics))
                self.running = True
                msg_count = 0
                while self.running:
                    msg = self.consumer.poll(timeout=1.0)
                    if msg is None:
                        continue
    
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                             (msg.topic(), msg.partition(), msg.offset()))
                        elif msg.error():
                            raise KafkaException(msg.error())
                    else:
                        yield msg
                        msg_count += 1
                        if msg_count % self.commits_every_n_messages == 0:
                            self.consumer.commit(asynchronous=False)
            finally:
                # Close down consumer to commit final offsets.
                self.consumer.close()
    
        def shutdown(self):
            self.running = False
    
    @task
    def process_message(msg):
        logger = get_run_logger()
        <http://logger.info|logger.info>("topic={} partition={} offset={} key={} value={}".format(
            msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
    
    @flow(name="prefect_2_kafka_kub")
    def main():
    
        conf = {
            'bootstrap.servers': "xxxxx",
            'group.id': "prefect_poc_kafka",
            'auto.offset.reset': 'earliest',
            'security.protocol': 'SASL_SSL',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username' :'xxxxx',
            'sasl.password': 'xxxxxx'
        }
    
        topic_consumer = TopicConsumer(
            ["prefect-poc"],
            conf
        )
    
        for msg in topic_consumer.consume():
            process_message(msg)
    
        topic_consumer.shutdown()
    
    
    service_account_info = json.loads(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
    
    DeploymentSpec(
        name="gcs",
        flow=main,
        tags=["kubernetes"],
        flow_storage=GoogleCloudStorageBlock(
            bucket="prefect-poc",
            service_account_info=service_account_info
        ),
        flow_runner=KubernetesFlowRunner()
    )
    
    if __name__ == '__main__':
        main()
    it seems so
    here is my orion agent manifest
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: orion
    spec:
      selector:
        matchLabels:
          app: orion
      replicas: 1  # We're using SQLite, so we should only run 1 pod
      template:
        metadata:
          labels:
            app: orion
        spec:
          containers:
          - name: api
            image: prefecthq/prefect:2.0b4-python3.8
            command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
            imagePullPolicy: "IfNotPresent"
            ports:
            - containerPort: 4200
          - name: agent
            image: prefecthq/prefect:2.0b4-python3.8
            command: ["prefect", "agent", "start", "kubernetes"]
            imagePullPolicy: "IfNotPresent"
            env:
              - name: PREFECT_API_URL
                value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
              - name: PREFECT_API_KEY
                value: 'xxxxxxxxxxx'
              - name: GOOGLE_APPLICATION_CREDENTIALS
                value: '{"type":"service_account",.....'
    
    ---
    apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
    kind: Role
    metadata:
      namespace: default
      name: flow-runner
    rules:
    - apiGroups: [""]
      resources: ["pods", "pods/log", "pods/status"]
      verbs: ["get", "watch", "list"]
    - apiGroups: ["batch"]
      resources: ["jobs"]
      verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
    ---
    apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
    kind: RoleBinding
    metadata:
      name: flow-runner-role-binding
      namespace: default
    subjects:
    - kind: ServiceAccount
      name: default
      namespace: default
    roleRef:
      kind: Role
      name: flow-runner
      apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
    not sure what I'm missing since the generated manifest was working out of the box for prefect 1 but not here with prefect orion
    Anna Geller

    Anna Geller

    4 months ago
    I think you are very close - let's take it step by step
    Valentin Baert

    Valentin Baert

    4 months ago
    I think it's something related to the rbac authorization but I'm not very familiar to this kubernetes object
    Anna Geller

    Anna Geller

    4 months ago
    you don't need this manifest when using Cloud 2.0 - this is only required if you want to self-host Orion - for Cloud 2.0, your kubectl needs to point at the GKE (or whichever K8s cluster you chose) on the VM from which you start an agent
    yes you're 100% correct, it's missing service account info which you can configure on the KubernetesFlowRunner using
    service_account_name
    Valentin Baert

    Valentin Baert

    4 months ago
    hmm I'm a bit confused I followed the doc at https://orion-docs.prefect.io/tutorials/kubernetes-flow-runner/ because I wanted to run flows on kubernetes and connect the agent to orion cloud, isn't it what it is intended for ?
    Anna Geller

    Anna Geller

    4 months ago
    I can try to replicate that later today on AWS EKS because I'm more familiar with this than GKE but the setup should be the same - your kubectl points at your cluster, your prefect config points at your Cloud 2.0 PREFECT_API_URL (which you can test with the command
    prefect config view
    ), then you create a work queue e.g.:
    prefect work-queue create -t k8s
    and then your agent for this queue:
    prefect agent start k8s
    Valentin Baert

    Valentin Baert

    4 months ago
    yes I got to this point already
    Anna Geller

    Anna Geller

    4 months ago
    I can 100% understand your frustration, this docs page is purely for a simple all-in OSS deployment and not for using it with Cloud 2.0 - I can prioritize this for you
    Valentin Baert

    Valentin Baert

    4 months ago
    then from my desktop using prefect CLI connected to Cloud 2.0 I have created and ran a deployment
    prefect deployment create src/flows/prefect_2_kafka_kub.py
    prefect deployment run prefect_2_kafka_kub/gcs
    the agent has picked up the resulting flow needed to be run
    but fails with this error
    agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
    with prefect 1 I also got to this point but the flow ran properly so I must be missing something that have changed for 2.0 but I don't know what
    Anna Geller

    Anna Geller

    4 months ago
    seems like a permission issue with the service account. Let me open an issue for myself to replicate this later this or next week and give you more help since I can't dive deeper now I appreciate you brought this up and it's important we provide more examples for this (extremely common!) deployment pattern
    Valentin Baert

    Valentin Baert

    4 months ago
    ok thank you
    Anna Geller

    Anna Geller

    4 months ago
    Valentin Baert

    Valentin Baert

    4 months ago
    does the orion agent reads undocumented env variables ?
    because with the 1.0 agent it had a bunch of documented env var
    Anna Geller

    Anna Geller

    4 months ago
    you can add env variables to every flow runner afaik
    Valentin Baert

    Valentin Baert

    4 months ago
    with 1.0 it had
    with 2.0 there seems to be only PREFECT_API_URL and I can't find the other ones documented anywhere
    Anna Geller

    Anna Geller

    4 months ago
    at least every that is based on UniversalFlowRunner:
    class UniversalFlowRunner(FlowRunner):
        """
        The universal flow runner contains configuration options that can be used by any
        Prefect flow runner implementation.
    
        This flow runner cannot be used at runtime and should be converted into a subtype.
    
        Attributes:
            env: Environment variables to provide to the flow run
        """
    
        typename: Literal["universal"] = "universal"
        env: Dict[str, str] = Field(default_factory=dict)
    env=dict(key="value")
    Valentin Baert

    Valentin Baert

    4 months ago
    so just wondering if I should try to pass for example a SERVICE_ACCOUNT_NAME variable ?
    Anna Geller

    Anna Geller

    4 months ago
    sure you could but there is a dedicated arg for that
    service_account_name
    class KubernetesFlowRunner(UniversalFlowRunner):
        """
        Executes flow runs in a Kubernetes job.
    
        Requires a Kubernetes cluster to be connectable.
    
        Attributes:
            image: An optional string specifying the tag of a Docker image to use for the job.
            namespace: An optional string signifying the Kubernetes namespace to use.
            service_account_name: An optional string specifying which Kubernetes service account to use.
            labels: An optional dictionary of labels to add to the job.
            image_pull_policy: The Kubernetes image pull policy to use for job containers.
            restart_policy: The Kubernetes restart policy to use for jobs.
            stream_output: If set, stream output from the container to local standard output.
        """
    Valentin Baert

    Valentin Baert

    4 months ago
    Ok I have made a bit of progress
    Anna Geller

    Anna Geller

    4 months ago
    nice!
    Valentin Baert

    Valentin Baert

    4 months ago
    the error I had : "agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User "systemšŸ˜’erviceaccount:everysens:default" cannot create resource "jobs" in API group "batch" in the namespace "default"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}" it because the default generated manifest seem to assume the default service account already have the create jobs permissions but in my case it did not with the manifest generated with prefect CLI it had a more complete kubernetes ROle definition So I simply used the rbac generated by the 1.0 CLI and replace the rbac generated by the 2.0 CLI After that the agent was able to deploy flows as kubernetes pod But since I didn't want to use a "default" service account, I made a proper dedicated one named orion-agent by defining a kubernetes ServiceAccount object and referencing the serviceAccount in the agent pod spec Also after a bit of fiddling I got other rbac errors The agent after starting a pod, needs to query the pods API, pods/log API and pods/status API, so I added them too to the rbac
    the final manifest looks like
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: orion
    spec:
      selector:
        matchLabels:
          app: orion
      replicas: 1  # We're using SQLite, so we should only run 1 pod
      template:
        metadata:
          labels:
            app: orion
        spec:
          serviceAccountName: orion-agent
          containers:
          - name: api
            image: prefecthq/prefect:2.0b4-python3.8
            command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
            imagePullPolicy: "IfNotPresent"
            ports:
            - containerPort: 4200
          - name: agent
            image: prefecthq/prefect:2.0b4-python3.8
            command: ["prefect", "agent", "start", "kubernetes"]
            imagePullPolicy: "IfNotPresent"
            env:
              - name: PREFECT_API_URL
                value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
              - name: PREFECT_API_KEY
                value: 'xxxxxx'
              - name: GOOGLE_APPLICATION_CREDENTIALS
                value: '{"type":"service_account",'
    
    ---
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: orion-agent
    ---
    apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
    kind: Role
    metadata:
      name: orion-agent-rbac
      namespace: everysens
    rules:
      - apiGroups:
          - batch
          - extensions
        resources:
          - jobs
        verbs:
          - '*'
      - apiGroups:
          - ''
        resources:
          - events
          - pods
          - pods/log
          - pods/status
        verbs:
          - '*'
    ---
    apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
    kind: RoleBinding
    metadata:
      name: orion-agent-rbac
      namespace: everysens
    roleRef:
      apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
      kind: Role
      name: orion-agent-rbac
    subjects:
      - kind: ServiceAccount
        name: orion-agent
    now I got to the points where the agent picks up a flow and manage to start a pod to run the flow however in my flow I need to use a python pip library which is not in the base orion docker image with prefect 1.0 I would be able to specify extra pip packages with https://docs.prefect.io/orchestration/flow_config/docker.html#installing-extra-dependencies-at-runtime However it does not seem to be possible with 2.0
    Anna Geller

    Anna Geller

    4 months ago
    nice! did it deploy a flow run as K8s job as it should on GKE?
    Valentin Baert

    Valentin Baert

    4 months ago
    So I need to build a custom image based on 2.0 orion ?
    Anna Geller

    Anna Geller

    4 months ago
    or do you run it on a local K8s cluster?
    Valentin Baert

    Valentin Baert

    4 months ago
    yes as a k8s job on gke
    Anna Geller

    Anna Geller

    4 months ago
    correct, extra pip packages not there yet
    exactly, you can do a simple image like:
    FROM prefecthq/prefect:2.0b4-python3.8
    COPY requirements.txt .
    RUN pip install -r requirements.txt
    Valentin Baert

    Valentin Baert

    4 months ago
    ok, the extra pip setting seem to be a nice feature, I hope it will be implemented again for 2.0 I'll try to build a custom image for now ok
    It was the final piece I got it working now
    now I have the flow deployed as a GKE k8s pod which implement a long running flow which listen from kafka and starts tasks from there (hello world tasks that just log the kafka message for now)
    the deployment spec looks like
    DeploymentSpec(
        name="gcs",
        flow=main,
        tags=["kubernetes"],
        flow_storage=GoogleCloudStorageBlock(
            bucket="prefect-poc"
        ),
        flow_runner=KubernetesFlowRunner(
            image="<http://gcr.io/everysens-integration/prefect-poc/development:latest|gcr.io/everysens-integration/prefect-poc/development:latest>",
            namespace="everysens",
            service_account_name="orion-agent"
        ),
    )
    and I can monitor the flow run with the cloud UI
    This way of mixing streaming and workflows is really nice, I'm sure 2.0 will be a good product šŸ™‚ A bit more work is needed to polish the doc, it was not really straightforward ^^
    thank you for your patience, it is much more clear now
    Anna Geller

    Anna Geller

    4 months ago
    I was in meetings but wow, this is fantastic, well done! would you be open to writing up a quick README documenting the steps you took and sharing as a public GitHub gist or so? This looks amazing and would help many other users - only if you have time šŸ™
    @Valentin Baert
    Valentin Baert

    Valentin Baert

    4 months ago
    I would need to cleanup a bit because I have some private stuff in the middle but yes I can do that later this week
    Anna Geller

    Anna Geller

    4 months ago
    thank you! ā¤ļø
    Valentin Baert

    Valentin Baert

    4 months ago
    @Anna Geller I read your article at https://www.prefect.io/blog/you-no-longer-need-two-separate-systems-for-batch-processing-and-streaming/ In my case the data is not coming from an an API call but from a kafka topic Which means its continuously polling. Which means the flow never ends. So I'm wondering, how do I stop the flow when I need to update the code to add a new feature for example ? I didn't find a "prefect deployment stop" command I tried to manually delete the job in kubernetes but now I think I have created an inconsistency because the flow is not running anymore but still show as running in the Cloud UI. How should I gracefully stop a infinite running flow ?
    I don't kow if I should have a @flow wrapping the kafka infinite polling loop and then run a task for every individual message (but then how do I cleanly stop the infinite flow when I need to update the code ?) or whether there is a better way to handle event streaming. I was under the impression prefect 2.0 was brining new answers to this but I'm a bit lost here.
    I have written the complete steps I have taken for my POChttps://gitlab.com/idkw/prefect-orion-gke-poc See the readme
    Anna Geller

    Anna Geller

    4 months ago
    This is such a common question that I'm gonna tackle it with a high priority to build an example for this - you don't need a Prefect deployment at all for such use case - you would have a Python application running e.g. as a Kubernetes service and this service simply points at your
    PREFECT_API_URL
    for observability, visibility and orchestration this way, stopping it is as simple as stopping your application. If you change the code for this, you can deploy a second application and take down the old one once you are ready to ensure a continuous data stream with little to no downtime
    Thank you so much for this! Is it OK if I share in #show-us-what-you-got?
    or you could share it there, it will be extremely helpful to many people!
    Valentin Baert

    Valentin Baert

    4 months ago
    Ok I didn't see your answer
    So in the normal python app, I would have to create a flow object and call .run on it right ?
    Anna Geller

    Anna Geller

    4 months ago
    not even that, you could add a main method:
    # orion_streaming_app.py
    
    if __name__ == "__main__":
        while True:
            main()
            time.sleep(1)
    then say create a Docker image:
    FROM prefecthq/prefect:2.0b5-python3.9
    
    COPY orion_streaming_app.py .
    
    CMD [ "python", "./orion_streaming_app.py" ]
    and deploy it to GKE as a Kubernetes service
    Valentin Baert

    Valentin Baert

    4 months ago
    ok I think I was still mixing concepts from 1.0 and 2.0 so I though I had to use a deployment
    so in that case I don't need a DeploymentSpec anymore in the app script ?
    Anna Geller

    Anna Geller

    4 months ago
    that's understandable - for a streaming workflow deployments add unnecessary overhead IMO because Prefect would need to: ā€¢ pull the flow code from a remote storage ā€¢ deploy the relevant infrastructure using the flow runner already this would add so much latency that it would no longer be even close to "real-time" but doing it as a continuously running service would work well
    Prefect 2.0 has this concept of incremental adoption - you only use the concepts that are needed when they are needed - e.g. if you are doing real-time streaming, you don't need scheduled deployments of code artifacts so you can skip the
    DeploymentSpec
    and just point your script at your
    PREFECT_API_URL
    for orchestration and observability
    Valentin Baert

    Valentin Baert

    4 months ago
    Ok I think it finally "clicked" ^^ Now my script looks like :
    @flow(name="prefect_2_kafka")
    def process_message(msg):
        logger = get_run_logger()
        <http://logger.info|logger.info>("Received message topic={} partition={} offset={} key={} value={}".format(
            msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
    
    def main():
    
        topic_consumer = TopicConsumer(
            ["prefect-poc"],
            .....
        )
    
        for msg in topic_consumer.consume():
            process_message(msg)
    
    if __name__ == '__main__':
        main()
    and I start it with : export PREFECT_API_URL=https://api-beta.prefect.io/api/... export PREFECT_API_URL=xxx python prefect_2_kafka_kub.py and I have the observability in prefect cloud
    so that would be a sort of template for streaming app and I would continue using deployment for other scheduled flows
    Anna Geller

    Anna Geller

    4 months ago
    exactly!
    will your flow run immediately end if there is no message to consume?
    I wonder if adding some time.sleep() may be helpful after the flow run to avoid having too many of them - especially given that you already loop over all messages
    Valentin Baert

    Valentin Baert

    4 months ago
    I'm not sure to follow you In my case with kafka, you can see in the full code sample https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub.py#L40 I have an infinite while loop that uses a confluent Consumer object This class provides a poll function which implements long polling (with a timeout parameter) When you call the poll() function it blocks until there is a message or it reaches the timeout parameter If it finds a message, I
    yield
    it to the main function If it reaches the timeout, it loops in the infinite loop and polls again When the message is yielded to the main lopp, that's when I start a @flow with the
    process_message
    function After the message is consumed it resumes the generator function and resumes to calling the poll function and the cycle continues So I don't understand where I would need a time.sleep() and for what purpose ?
    Anna Geller

    Anna Geller

    4 months ago
    is this a never-ending flow run then?
    this is what I would try to avoid - ideally, each loop iteration picking up and doing something with those messages, is one flow run triggered from a while loop as shown here:
    # orion_streaming_app.py
    
    if __name__ == "__main__":
        while True:
            main()
            time.sleep(1)
    this way you get fine-grained visibility into each run and the whole process is much more observable and orchestratable via Prefect
    Valentin Baert

    Valentin Baert

    4 months ago
    I have updated the poc to reflect the changes : ā€¢ https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub.py demonstrates a never ending flow-run which starts tasks. This one is using a deployment. ā€¢ https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub_no_deployment.py demonstrates a perpetual python app running a kafka consumer which starts a new flow for each kafka message without using a deployment
    with the second one each flow-run lives only for the duration of the process_message
    @flow
    (which is just a logger here but will actually contain several tasks IRL)
    Anna Geller

    Anna Geller

    4 months ago
    Nice, this looks interesting, but what I actually meant was more:
    # orion_streaming_app.py
    
    if __name__ == "__main__":
        while True:
            main() # function decorated with flow
    and then moving the while self.running out of the class into the
    if __name__ == "__main__":
    block but this already looks good, nice work, and thanks for sharing!