Hi I'm currently trying to do some deployment test...
# ask-community
s
Hi I'm currently trying to do some deployment testing, and I'm getting the following error: Specification in 'newflow.py', line 13 failed validation! You have not configured default storage on the server or set a storage to use for this deployment but this deployment is using a Universal flow runner which requires remote storage. I've currently got a file called: from prefect import flow @flow def hello_world(name="world"): print(f"Hello {name}!") # Note: a deployed flow does not need a command to # explicitly run the flow. The API handles this for you. # hello_world() from prefect.deployments import DeploymentSpec DeploymentSpec( flow=hello_world, name="hello-world-daily", ) and the error comes out when I run the following: prefect deployment create newflow.py
1
a
This makes sense - before you create your deployment, you need to configure remote storage. You can choose from S3, GCS, Azure blob. Here is the command you may use:
Copy code
prefect storage create --help
more on that here: https://orion-docs.prefect.io/concepts/storage/
v
I will try a bit the 2.0 version though I'm having an issue When setting up a google cloud storage how do I assign a service account ?
the documentation is unclear
it talks about a Service Account Info option but without any code or cli example
also the cli isn't of any help, the --help lacks more information
I have a google service account key file as a json but I don't know how to configure it with the storage
a
use one of those options:
Copy code
DeploymentSpec(
    name="gcs",
    flow=hello_flow,
    tags=["local"],
    flow_storage=FileStorageBlock(base_path="<gcs://prefect-orion/flows>"),
)

DeploymentSpec(
    name="dev",
    flow=hello_flow,
    tags=["local"],
    flow_storage=GoogleCloudStorageBlock(bucket="prefect-orion"),
)
assuming GOOGLE_APPLICATION_CREDENTIALS is a string with the path to your JSON service account file:
Copy code
import json
import io

sa_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

with io.open(sa_file, "r", encoding="utf-8") as json_file:
    service_account_info = json.load(json_file)


DeploymentSpec(
    name="gcs",
    flow=hello_flow,
    tags=["local"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-orion",
        service_account_info=service_account_info
    ),
)
you may also do:
Copy code
export GOOGLE_APPLICATION_CREDENTIALS=/Users/path/to/your/sa.json
and use GCS default storage
s
Hello again - once we have a series of deployments ready, is it possible to deploy without having to use the CLI? So in the deployment.py, instead of doing 'prefect deployment create deployment.py', is there a way I can embedd the code within to deploy the clusters? I suppose I am curious also for how to do this for creatign work-queues and agents. Thanks in advance!
v
ok thank you, I tried but failed I'll stick with 1.0 for the time being and I'll wait for the official release. Maybe for expert users the documentation is enough but for a beginner like me the 1.0 doc is much better. 2.0 is too vague for me and lacks clear step by step intructions for setting up google cloud storage. Right now it is just a single sentence with a obscure table listing settings without any code example. https://orion-docs.prefect.io/concepts/storage/#google-cloud-storage
a
@Sang Young Noh deployment from CLI is essentially also an API call, therefore you could do it in some other way but curious to hear why would you do it from anything different than CLI? I believe most users will create deployments from some CI/CD system that's why CLI commands provide the best UX so far
@Valentin Baert I can understand your frustration and I'm aware we need more examples, but curious - did you try the GCS
DeploymentSpec
examples I posted here? what error did you get when creating a deployment with GCS storage? I'm sure we can sort this out together so this shouldn't be the only reason to stick to Prefect 1.0
v
I'm still trying, I managed to assemble various documentation on multiple pages to get something working for a simple use case to me : deploy an agent on a gke cluster, connect it to prefect orion cloud, use a gcs storage and run a flow as a kubernetes job
very hard to find the correct information because it is scattered everywhere and most of the time incomplete
now I manage to make the agent pickup the flow from a queue but it fails to start a pod it seems
Copy code
api Starting...
api  ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
api | _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
api |  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
api |_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|
api Configure Prefect to communicate with the server with:
api     prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
api Check out the dashboard at <http://0.0.0.0:4200>
api INFO:     Started server process [9[]
api INFO:     Waiting for application startup.
api INFO:     Application startup complete.
api INFO:     Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
agent Starting agent connected to <https://api-beta.prefect.io/api/accounts/8f6464fb-3d>
agent 6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0...
agent   ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
agent  | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
agent  |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
agent  |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|
agent Agent started! Looking for work from queue 'kubernetes'...
agent 09:01:07.562 | INFO    | prefect.agent - Submitting flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent 09:01:07.699 | INFO    | prefect.flow_runner.kubernetes - RUNNING
agent 09:01:07.906 | INFO    | prefect.flow_runner.kubernetes - Flow run 'charming-gopher' has job settings = {'metadata': {'generateName': 'charming-gopher', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '1e14e8b2-11b5-432a-802b-fb8f57fd68b5', 'io.prefect.flow-run-name': 'charming-gopher', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'prefecthq/prefect:2.0b4-python3.8', 'command': ['python', '-m', 'prefect.engine', '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'], 'env': [{'name': 'PREFECT_API_URL', 'value': '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'}, {'name': 'PREFECT_API_KEY', 'value': 'pnu_iyYoMBQYyonrYPzQNGvCwI3nDsst4Q4erhiy'}]}]}}, 'backoff_limit': 4}}
agent 09:01:07.934 | ERROR   | prefect.agent - Flow runner failed to submit flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent Traceback (most recent call last):
agent   File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 136, in submit_run
agent     await self.task_group.start(flow_runner.submit_flow_run, flow_run)
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 701, in start
agent     return await future
agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 827, in submit_flow_run
agent     job_name = await run_sync_in_worker_thread(self._create_and_start_job, flow_run)
agent   File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
agent     return await anyio.to_thread.run_sync(call, cancellable=True)
agent   File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
agent     return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
agent     return await future
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
agent     result = context.run(func, *args)
agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 1016, in _create_and_start_job
agent     job = self.batch_client.create_namespaced_job(self.namespace, job_settings)
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
agent     return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
agent     return self.api_client.call_api(
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
agent     return self.__call_api(resource_path, method,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
agent     response_data = self.request(
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
agent     return <http://self.rest_client.POST|self.rest_client.POST>(url,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
agent     return self.request("POST", url,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
agent     raise ApiException(http_resp=r)
agent kubernetes.client.exceptions.ApiException: (403)
agent Reason: Forbidden
agent HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cd7f296b-0444-4ac4-b4ec-dc88b9843534', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '27d5e504-aa65-49d9-8f50-ef3c4aa090aa', 'X-Kubernetes-Pf-Prioritylevel-Uid': '1894d569-9152-44eb-848e-bd2a893b4c22', 'Date': 'Wed, 18 May 2022 09:01:07 GMT', 'Content-Length': '313'})
agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
a
nice work you got the GCS storage setup working! so KubernetesFlowRunner is the issue now? (just to make it clear for me)
v
current flow
Copy code
import io
import json
import os
import sys

from confluent_kafka import Consumer, KafkaError, KafkaException
from prefect import flow, task, get_run_logger
from prefect.blocks.storage import GoogleCloudStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner

class TopicConsumer:

    def __init__(self, topics, config, commits_every_n_messages=1):
        self.topics = topics
        self.config = config
        self.commits_every_n_messages = commits_every_n_messages
        self.consumer = Consumer(config)
        self.running = False

    def consume(self):
        try:
            self.consumer.subscribe(self.topics)
            print("Subscribed to topics: {}".format(self.topics))
            self.running = True
            msg_count = 0
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    yield msg
                    msg_count += 1
                    if msg_count % self.commits_every_n_messages == 0:
                        self.consumer.commit(asynchronous=False)
        finally:
            # Close down consumer to commit final offsets.
            self.consumer.close()

    def shutdown(self):
        self.running = False

@task
def process_message(msg):
    logger = get_run_logger()
    <http://logger.info|logger.info>("topic={} partition={} offset={} key={} value={}".format(
        msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))

@flow(name="prefect_2_kafka_kub")
def main():

    conf = {
        'bootstrap.servers': "xxxxx",
        'group.id': "prefect_poc_kafka",
        'auto.offset.reset': 'earliest',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'PLAIN',
        'sasl.username' :'xxxxx',
        'sasl.password': 'xxxxxx'
    }

    topic_consumer = TopicConsumer(
        ["prefect-poc"],
        conf
    )

    for msg in topic_consumer.consume():
        process_message(msg)

    topic_consumer.shutdown()


service_account_info = json.loads(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))

DeploymentSpec(
    name="gcs",
    flow=main,
    tags=["kubernetes"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-poc",
        service_account_info=service_account_info
    ),
    flow_runner=KubernetesFlowRunner()
)

if __name__ == '__main__':
    main()
it seems so
here is my orion agent manifest
Copy code
apiVersion: apps/v1
kind: Deployment
metadata:
  name: orion
spec:
  selector:
    matchLabels:
      app: orion
  replicas: 1  # We're using SQLite, so we should only run 1 pod
  template:
    metadata:
      labels:
        app: orion
    spec:
      containers:
      - name: api
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
        imagePullPolicy: "IfNotPresent"
        ports:
        - containerPort: 4200
      - name: agent
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "agent", "start", "kubernetes"]
        imagePullPolicy: "IfNotPresent"
        env:
          - name: PREFECT_API_URL
            value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
          - name: PREFECT_API_KEY
            value: 'xxxxxxxxxxx'
          - name: GOOGLE_APPLICATION_CREDENTIALS
            value: '{"type":"service_account",.....'

---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  namespace: default
  name: flow-runner
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/status"]
  verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: flow-runner-role-binding
  namespace: default
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
roleRef:
  kind: Role
  name: flow-runner
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
not sure what I'm missing since the generated manifest was working out of the box for prefect 1 but not here with prefect orion
a
I think you are very close - let's take it step by step
v
I think it's something related to the rbac authorization but I'm not very familiar to this kubernetes object
a
you don't need this manifest when using Cloud 2.0 - this is only required if you want to self-host Orion - for Cloud 2.0, your kubectl needs to point at the GKE (or whichever K8s cluster you chose) on the VM from which you start an agent
yes you're 100% correct, it's missing service account info which you can configure on the KubernetesFlowRunner using
service_account_name
v
hmm I'm a bit confused I followed the doc at https://orion-docs.prefect.io/tutorials/kubernetes-flow-runner/ because I wanted to run flows on kubernetes and connect the agent to orion cloud, isn't it what it is intended for ?
a
I can try to replicate that later today on AWS EKS because I'm more familiar with this than GKE but the setup should be the same - your kubectl points at your cluster, your prefect config points at your Cloud 2.0 PREFECT_API_URL (which you can test with the command
prefect config view
), then you create a work queue e.g.:
Copy code
prefect work-queue create -t k8s
and then your agent for this queue:
Copy code
prefect agent start k8s
v
yes I got to this point already
a
I can 100% understand your frustration, this docs page is purely for a simple all-in OSS deployment and not for using it with Cloud 2.0 - I can prioritize this for you
v
then from my desktop using prefect CLI connected to Cloud 2.0 I have created and ran a deployment
Copy code
prefect deployment create src/flows/prefect_2_kafka_kub.py
prefect deployment run prefect_2_kafka_kub/gcs
the agent has picked up the resulting flow needed to be run
but fails with this error
Copy code
agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
with prefect 1 I also got to this point but the flow ran properly so I must be missing something that have changed for 2.0 but I don't know what
a
seems like a permission issue with the service account. Let me open an issue for myself to replicate this later this or next week and give you more help since I can't dive deeper now I appreciate you brought this up and it's important we provide more examples for this (extremely common!) deployment pattern
v
ok thank you
a
v
does the orion agent reads undocumented env variables ?
because with the 1.0 agent it had a bunch of documented env var
a
you can add env variables to every flow runner afaik
v
with 1.0 it had
with 2.0 there seems to be only PREFECT_API_URL and I can't find the other ones documented anywhere
a
at least every that is based on UniversalFlowRunner:
Copy code
class UniversalFlowRunner(FlowRunner):
    """
    The universal flow runner contains configuration options that can be used by any
    Prefect flow runner implementation.

    This flow runner cannot be used at runtime and should be converted into a subtype.

    Attributes:
        env: Environment variables to provide to the flow run
    """

    typename: Literal["universal"] = "universal"
    env: Dict[str, str] = Field(default_factory=dict)
env=dict(key="value")
v
so just wondering if I should try to pass for example a SERVICE_ACCOUNT_NAME variable ?
a
sure you could but there is a dedicated arg for that
service_account_name
Copy code
class KubernetesFlowRunner(UniversalFlowRunner):
    """
    Executes flow runs in a Kubernetes job.

    Requires a Kubernetes cluster to be connectable.

    Attributes:
        image: An optional string specifying the tag of a Docker image to use for the job.
        namespace: An optional string signifying the Kubernetes namespace to use.
        service_account_name: An optional string specifying which Kubernetes service account to use.
        labels: An optional dictionary of labels to add to the job.
        image_pull_policy: The Kubernetes image pull policy to use for job containers.
        restart_policy: The Kubernetes restart policy to use for jobs.
        stream_output: If set, stream output from the container to local standard output.
    """
v
Ok I have made a bit of progress
a
nice!
v
the error I had : "agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"systemserviceaccounteverysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}" it because the default generated manifest seem to assume the default service account already have the create jobs permissions but in my case it did not with the manifest generated with prefect CLI it had a more complete kubernetes ROle definition So I simply used the rbac generated by the 1.0 CLI and replace the rbac generated by the 2.0 CLI After that the agent was able to deploy flows as kubernetes pod But since I didn't want to use a "default" service account, I made a proper dedicated one named orion-agent by defining a kubernetes ServiceAccount object and referencing the serviceAccount in the agent pod spec Also after a bit of fiddling I got other rbac errors The agent after starting a pod, needs to query the pods API, pods/log API and pods/status API, so I added them too to the rbac
the final manifest looks like
Copy code
apiVersion: apps/v1
kind: Deployment
metadata:
  name: orion
spec:
  selector:
    matchLabels:
      app: orion
  replicas: 1  # We're using SQLite, so we should only run 1 pod
  template:
    metadata:
      labels:
        app: orion
    spec:
      serviceAccountName: orion-agent
      containers:
      - name: api
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
        imagePullPolicy: "IfNotPresent"
        ports:
        - containerPort: 4200
      - name: agent
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "agent", "start", "kubernetes"]
        imagePullPolicy: "IfNotPresent"
        env:
          - name: PREFECT_API_URL
            value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
          - name: PREFECT_API_KEY
            value: 'xxxxxx'
          - name: GOOGLE_APPLICATION_CREDENTIALS
            value: '{"type":"service_account",'

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: orion-agent
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  name: orion-agent-rbac
  namespace: everysens
rules:
  - apiGroups:
      - batch
      - extensions
    resources:
      - jobs
    verbs:
      - '*'
  - apiGroups:
      - ''
    resources:
      - events
      - pods
      - pods/log
      - pods/status
    verbs:
      - '*'
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: orion-agent-rbac
  namespace: everysens
roleRef:
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
  kind: Role
  name: orion-agent-rbac
subjects:
  - kind: ServiceAccount
    name: orion-agent
now I got to the points where the agent picks up a flow and manage to start a pod to run the flow however in my flow I need to use a python pip library which is not in the base orion docker image with prefect 1.0 I would be able to specify extra pip packages with https://docs.prefect.io/orchestration/flow_config/docker.html#installing-extra-dependencies-at-runtime However it does not seem to be possible with 2.0
a
nice! did it deploy a flow run as K8s job as it should on GKE?
v
So I need to build a custom image based on 2.0 orion ?
👍 1
a
or do you run it on a local K8s cluster?
v
yes as a k8s job on gke
👍 1
a
correct, extra pip packages not there yet
exactly, you can do a simple image like:
Copy code
FROM prefecthq/prefect:2.0b4-python3.8
COPY requirements.txt .
RUN pip install -r requirements.txt
v
ok, the extra pip setting seem to be a nice feature, I hope it will be implemented again for 2.0 I'll try to build a custom image for now ok
👍 1
It was the final piece I got it working now
now I have the flow deployed as a GKE k8s pod which implement a long running flow which listen from kafka and starts tasks from there (hello world tasks that just log the kafka message for now)
the deployment spec looks like
Copy code
DeploymentSpec(
    name="gcs",
    flow=main,
    tags=["kubernetes"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-poc"
    ),
    flow_runner=KubernetesFlowRunner(
        image="<http://gcr.io/everysens-integration/prefect-poc/development:latest|gcr.io/everysens-integration/prefect-poc/development:latest>",
        namespace="everysens",
        service_account_name="orion-agent"
    ),
)
and I can monitor the flow run with the cloud UI
This way of mixing streaming and workflows is really nice, I'm sure 2.0 will be a good product 🙂 A bit more work is needed to polish the doc, it was not really straightforward ^^
thank you for your patience, it is much more clear now
a
I was in meetings but wow, this is fantastic, well done! would you be open to writing up a quick README documenting the steps you took and sharing as a public GitHub gist or so? This looks amazing and would help many other users - only if you have time 🙏
@Valentin Baert
v
I would need to cleanup a bit because I have some private stuff in the middle but yes I can do that later this week
a
thank you! ❤️
v
@Anna Geller I read your article at https://www.prefect.io/blog/you-no-longer-need-two-separate-systems-for-batch-processing-and-streaming/ In my case the data is not coming from an an API call but from a kafka topic Which means its continuously polling. Which means the flow never ends. So I'm wondering, how do I stop the flow when I need to update the code to add a new feature for example ? I didn't find a "prefect deployment stop" command I tried to manually delete the job in kubernetes but now I think I have created an inconsistency because the flow is not running anymore but still show as running in the Cloud UI. How should I gracefully stop a infinite running flow ?
I don't kow if I should have a @flow wrapping the kafka infinite polling loop and then run a task for every individual message (but then how do I cleanly stop the infinite flow when I need to update the code ?) or whether there is a better way to handle event streaming. I was under the impression prefect 2.0 was brining new answers to this but I'm a bit lost here.
I have written the complete steps I have taken for my POC https://gitlab.com/idkw/prefect-orion-gke-poc See the readme
❤️ 1
💯 1
🙏 2
a
This is such a common question that I'm gonna tackle it with a high priority to build an example for this - you don't need a Prefect deployment at all for such use case - you would have a Python application running e.g. as a Kubernetes service and this service simply points at your
PREFECT_API_URL
for observability, visibility and orchestration this way, stopping it is as simple as stopping your application. If you change the code for this, you can deploy a second application and take down the old one once you are ready to ensure a continuous data stream with little to no downtime
Thank you so much for this! Is it OK if I share in #CL09KTZPX?
or you could share it there, it will be extremely helpful to many people!
v
Ok I didn't see your answer
So in the normal python app, I would have to create a flow object and call .run on it right ?
a
not even that, you could add a main method:
Copy code
# orion_streaming_app.py

if __name__ == "__main__":
    while True:
        main()
        time.sleep(1)
then say create a Docker image:
Copy code
FROM prefecthq/prefect:2.0b5-python3.9

COPY orion_streaming_app.py .

CMD [ "python", "./orion_streaming_app.py" ]
and deploy it to GKE as a Kubernetes service
v
ok I think I was still mixing concepts from 1.0 and 2.0 so I though I had to use a deployment
so in that case I don't need a DeploymentSpec anymore in the app script ?
👍 1
a
that's understandable - for a streaming workflow deployments add unnecessary overhead IMO because Prefect would need to: • pull the flow code from a remote storage • deploy the relevant infrastructure using the flow runner already this would add so much latency that it would no longer be even close to "real-time" but doing it as a continuously running service would work well
Prefect 2.0 has this concept of incremental adoption - you only use the concepts that are needed when they are needed - e.g. if you are doing real-time streaming, you don't need scheduled deployments of code artifacts so you can skip the
DeploymentSpec
and just point your script at your
PREFECT_API_URL
for orchestration and observability
v
Ok I think it finally "clicked" ^^ Now my script looks like :
Copy code
@flow(name="prefect_2_kafka")
def process_message(msg):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Received message topic={} partition={} offset={} key={} value={}".format(
        msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))

def main():

    topic_consumer = TopicConsumer(
        ["prefect-poc"],
        .....
    )

    for msg in topic_consumer.consume():
        process_message(msg)

if __name__ == '__main__':
    main()
and I start it with : export PREFECT_API_URL=https://api-beta.prefect.io/api/... export PREFECT_API_URL=xxx python prefect_2_kafka_kub.py and I have the observability in prefect cloud
💯 1
so that would be a sort of template for streaming app and I would continue using deployment for other scheduled flows
a
exactly!
will your flow run immediately end if there is no message to consume?
I wonder if adding some time.sleep() may be helpful after the flow run to avoid having too many of them - especially given that you already loop over all messages
v
I'm not sure to follow you In my case with kafka, you can see in the full code sample https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub.py#L40 I have an infinite while loop that uses a confluent Consumer object This class provides a poll function which implements long polling (with a timeout parameter) When you call the poll() function it blocks until there is a message or it reaches the timeout parameter If it finds a message, I
yield
it to the main function If it reaches the timeout, it loops in the infinite loop and polls again When the message is yielded to the main lopp, that's when I start a @flow with the
process_message
function After the message is consumed it resumes the generator function and resumes to calling the poll function and the cycle continues So I don't understand where I would need a time.sleep() and for what purpose ?
a
is this a never-ending flow run then?
this is what I would try to avoid - ideally, each loop iteration picking up and doing something with those messages, is one flow run triggered from a while loop as shown here:
Copy code
# orion_streaming_app.py

if __name__ == "__main__":
    while True:
        main()
        time.sleep(1)
this way you get fine-grained visibility into each run and the whole process is much more observable and orchestratable via Prefect
v
I have updated the poc to reflect the changes : • https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub.py demonstrates a never ending flow-run which starts tasks. This one is using a deployment. • https://gitlab.com/idkw/prefect-orion-gke-poc/-/blob/master/prefect_2_kafka_kub_no_deployment.py demonstrates a perpetual python app running a kafka consumer which starts a new flow for each kafka message without using a deployment
with the second one each flow-run lives only for the duration of the process_message
@flow
(which is just a logger here but will actually contain several tasks IRL)
a
Nice, this looks interesting, but what I actually meant was more:
Copy code
# orion_streaming_app.py

if __name__ == "__main__":
    while True:
        main() # function decorated with flow
and then moving the while self.running out of the class into the
if __name__ == "__main__":
block but this already looks good, nice work, and thanks for sharing!
614 Views