Sang Young Noh
05/03/2022, 8:23 AMAnna Geller
prefect storage create --helpValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:22 PMValentin Baert
05/17/2022, 2:22 PMAnna Geller
DeploymentSpec(
    name="gcs",
    flow=hello_flow,
    tags=["local"],
    flow_storage=FileStorageBlock(base_path="<gcs://prefect-orion/flows>"),
)
DeploymentSpec(
    name="dev",
    flow=hello_flow,
    tags=["local"],
    flow_storage=GoogleCloudStorageBlock(bucket="prefect-orion"),
)import json
import io
sa_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
with io.open(sa_file, "r", encoding="utf-8") as json_file:
    service_account_info = json.load(json_file)
DeploymentSpec(
    name="gcs",
    flow=hello_flow,
    tags=["local"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-orion",
        service_account_info=service_account_info
    ),
)Anna Geller
export GOOGLE_APPLICATION_CREDENTIALS=/Users/path/to/your/sa.jsonSang Young Noh
05/17/2022, 2:42 PMValentin Baert
05/17/2022, 3:26 PMAnna Geller
Anna Geller
DeploymentSpecValentin Baert
05/18/2022, 11:40 AMValentin Baert
05/18/2022, 11:40 AMValentin Baert
05/18/2022, 11:42 AMValentin Baert
05/18/2022, 11:42 AMapi Starting...
api  ___ ___ ___ ___ ___ ___ _____    ___  ___ ___ ___  _  _
api | _ \ _ \ __| __| __/ __|_   _|  / _ \| _ \_ _/ _ \| \| |
api |  _/   / _|| _|| _| (__  | |   | (_) |   /| | (_) | .` |
api |_| |_|_\___|_| |___\___| |_|    \___/|_|_\___\___/|_|\_|
api Configure Prefect to communicate with the server with:
api     prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
api Check out the dashboard at <http://0.0.0.0:4200>
api INFO:     Started server process [9[]
api INFO:     Waiting for application startup.
api INFO:     Application startup complete.
api INFO:     Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
agent Starting agent connected to <https://api-beta.prefect.io/api/accounts/8f6464fb-3d>
agent 6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0...
agent   ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
agent  | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
agent  |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
agent  |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|
agent Agent started! Looking for work from queue 'kubernetes'...
agent 09:01:07.562 | INFO    | prefect.agent - Submitting flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent 09:01:07.699 | INFO    | prefect.flow_runner.kubernetes - RUNNING
agent 09:01:07.906 | INFO    | prefect.flow_runner.kubernetes - Flow run 'charming-gopher' has job settings = {'metadata': {'generateName': 'charming-gopher', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '1e14e8b2-11b5-432a-802b-fb8f57fd68b5', 'io.prefect.flow-run-name': 'charming-gopher', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'prefecthq/prefect:2.0b4-python3.8', 'command': ['python', '-m', 'prefect.engine', '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'], 'env': [{'name': 'PREFECT_API_URL', 'value': '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'}, {'name': 'PREFECT_API_KEY', 'value': 'pnu_iyYoMBQYyonrYPzQNGvCwI3nDsst4Q4erhiy'}]}]}}, 'backoff_limit': 4}}
agent 09:01:07.934 | ERROR   | prefect.agent - Flow runner failed to submit flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent Traceback (most recent call last):
agent   File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 136, in submit_run
agent     await self.task_group.start(flow_runner.submit_flow_run, flow_run)
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 701, in start
agent     return await future
agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 827, in submit_flow_run
agent     job_name = await run_sync_in_worker_thread(self._create_and_start_job, flow_run)
agent   File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
agent     return await anyio.to_thread.run_sync(call, cancellable=True)
agent   File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
agent     return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
agent     return await future
agent   File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
agent     result = context.run(func, *args)
agent   File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 1016, in _create_and_start_job
agent     job = self.batch_client.create_namespaced_job(self.namespace, job_settings)
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
agent     return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
agent     return self.api_client.call_api(
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
agent     return self.__call_api(resource_path, method,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
agent     response_data = self.request(
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
agent     return <http://self.rest_client.POST|self.rest_client.POST>(url,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
agent     return self.request("POST", url,
agent   File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
agent     raise ApiException(http_resp=r)
agent kubernetes.client.exceptions.ApiException: (403)
agent Reason: Forbidden
agent HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cd7f296b-0444-4ac4-b4ec-dc88b9843534', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '27d5e504-aa65-49d9-8f50-ef3c4aa090aa', 'X-Kubernetes-Pf-Prioritylevel-Uid': '1894d569-9152-44eb-848e-bd2a893b4c22', 'Date': 'Wed, 18 May 2022 09:01:07 GMT', 'Content-Length': '313'})
agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}Anna Geller
Valentin Baert
05/18/2022, 11:44 AMValentin Baert
05/18/2022, 11:44 AMimport io
import json
import os
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException
from prefect import flow, task, get_run_logger
from prefect.blocks.storage import GoogleCloudStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
class TopicConsumer:
    def __init__(self, topics, config, commits_every_n_messages=1):
        self.topics = topics
        self.config = config
        self.commits_every_n_messages = commits_every_n_messages
        self.consumer = Consumer(config)
        self.running = False
    def consume(self):
        try:
            self.consumer.subscribe(self.topics)
            print("Subscribed to topics: {}".format(self.topics))
            self.running = True
            msg_count = 0
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    yield msg
                    msg_count += 1
                    if msg_count % self.commits_every_n_messages == 0:
                        self.consumer.commit(asynchronous=False)
        finally:
            # Close down consumer to commit final offsets.
            self.consumer.close()
    def shutdown(self):
        self.running = False
@task
def process_message(msg):
    logger = get_run_logger()
    <http://logger.info|logger.info>("topic={} partition={} offset={} key={} value={}".format(
        msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
@flow(name="prefect_2_kafka_kub")
def main():
    conf = {
        'bootstrap.servers': "xxxxx",
        'group.id': "prefect_poc_kafka",
        'auto.offset.reset': 'earliest',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'PLAIN',
        'sasl.username' :'xxxxx',
        'sasl.password': 'xxxxxx'
    }
    topic_consumer = TopicConsumer(
        ["prefect-poc"],
        conf
    )
    for msg in topic_consumer.consume():
        process_message(msg)
    topic_consumer.shutdown()
service_account_info = json.loads(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
DeploymentSpec(
    name="gcs",
    flow=main,
    tags=["kubernetes"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-poc",
        service_account_info=service_account_info
    ),
    flow_runner=KubernetesFlowRunner()
)
if __name__ == '__main__':
    main()Valentin Baert
05/18/2022, 11:45 AMValentin Baert
05/18/2022, 11:45 AMValentin Baert
05/18/2022, 11:46 AMapiVersion: apps/v1
kind: Deployment
metadata:
  name: orion
spec:
  selector:
    matchLabels:
      app: orion
  replicas: 1  # We're using SQLite, so we should only run 1 pod
  template:
    metadata:
      labels:
        app: orion
    spec:
      containers:
      - name: api
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
        imagePullPolicy: "IfNotPresent"
        ports:
        - containerPort: 4200
      - name: agent
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "agent", "start", "kubernetes"]
        imagePullPolicy: "IfNotPresent"
        env:
          - name: PREFECT_API_URL
            value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
          - name: PREFECT_API_KEY
            value: 'xxxxxxxxxxx'
          - name: GOOGLE_APPLICATION_CREDENTIALS
            value: '{"type":"service_account",.....'
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  namespace: default
  name: flow-runner
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/status"]
  verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: flow-runner-role-binding
  namespace: default
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
roleRef:
  kind: Role
  name: flow-runner
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>Valentin Baert
05/18/2022, 11:46 AMAnna Geller
Valentin Baert
05/18/2022, 11:48 AMAnna Geller
Anna Geller
service_account_nameValentin Baert
05/18/2022, 11:52 AMAnna Geller
prefect config viewprefect work-queue create -t k8sprefect agent start k8sValentin Baert
05/18/2022, 11:54 AMAnna Geller
Valentin Baert
05/18/2022, 11:54 AMprefect deployment create src/flows/prefect_2_kafka_kub.py
prefect deployment run prefect_2_kafka_kub/gcsValentin Baert
05/18/2022, 11:54 AMValentin Baert
05/18/2022, 11:54 AMValentin Baert
05/18/2022, 11:55 AMagent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}Valentin Baert
05/18/2022, 11:56 AMAnna Geller
Valentin Baert
05/18/2022, 11:58 AMAnna Geller
Valentin Baert
05/18/2022, 12:06 PMValentin Baert
05/18/2022, 12:07 PMAnna Geller
Valentin Baert
05/18/2022, 12:07 PMValentin Baert
05/18/2022, 12:08 PMAnna Geller
class UniversalFlowRunner(FlowRunner):
    """
    The universal flow runner contains configuration options that can be used by any
    Prefect flow runner implementation.
    This flow runner cannot be used at runtime and should be converted into a subtype.
    Attributes:
        env: Environment variables to provide to the flow run
    """
    typename: Literal["universal"] = "universal"
    env: Dict[str, str] = Field(default_factory=dict)Anna Geller
env=dict(key="value")Valentin Baert
05/18/2022, 12:08 PMAnna Geller
Anna Geller
class KubernetesFlowRunner(UniversalFlowRunner):
    """
    Executes flow runs in a Kubernetes job.
    Requires a Kubernetes cluster to be connectable.
    Attributes:
        image: An optional string specifying the tag of a Docker image to use for the job.
        namespace: An optional string signifying the Kubernetes namespace to use.
        service_account_name: An optional string specifying which Kubernetes service account to use.
        labels: An optional dictionary of labels to add to the job.
        image_pull_policy: The Kubernetes image pull policy to use for job containers.
        restart_policy: The Kubernetes restart policy to use for jobs.
        stream_output: If set, stream output from the container to local standard output.
    """Valentin Baert
05/18/2022, 12:35 PMAnna Geller
Valentin Baert
05/18/2022, 12:39 PMValentin Baert
05/18/2022, 12:40 PMValentin Baert
05/18/2022, 12:40 PMapiVersion: apps/v1
kind: Deployment
metadata:
  name: orion
spec:
  selector:
    matchLabels:
      app: orion
  replicas: 1  # We're using SQLite, so we should only run 1 pod
  template:
    metadata:
      labels:
        app: orion
    spec:
      serviceAccountName: orion-agent
      containers:
      - name: api
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
        imagePullPolicy: "IfNotPresent"
        ports:
        - containerPort: 4200
      - name: agent
        image: prefecthq/prefect:2.0b4-python3.8
        command: ["prefect", "agent", "start", "kubernetes"]
        imagePullPolicy: "IfNotPresent"
        env:
          - name: PREFECT_API_URL
            value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
          - name: PREFECT_API_KEY
            value: 'xxxxxx'
          - name: GOOGLE_APPLICATION_CREDENTIALS
            value: '{"type":"service_account",'
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: orion-agent
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  name: orion-agent-rbac
  namespace: everysens
rules:
  - apiGroups:
      - batch
      - extensions
    resources:
      - jobs
    verbs:
      - '*'
  - apiGroups:
      - ''
    resources:
      - events
      - pods
      - pods/log
      - pods/status
    verbs:
      - '*'
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
  name: orion-agent-rbac
  namespace: everysens
roleRef:
  apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
  kind: Role
  name: orion-agent-rbac
subjects:
  - kind: ServiceAccount
    name: orion-agentValentin Baert
05/18/2022, 12:41 PMAnna Geller
Valentin Baert
05/18/2022, 12:42 PMAnna Geller
Valentin Baert
05/18/2022, 12:42 PMAnna Geller
Anna Geller
FROM prefecthq/prefect:2.0b4-python3.8
COPY requirements.txt .
RUN pip install -r requirements.txtValentin Baert
05/18/2022, 12:44 PMValentin Baert
05/18/2022, 1:02 PMValentin Baert
05/18/2022, 1:03 PMValentin Baert
05/18/2022, 1:03 PMValentin Baert
05/18/2022, 1:06 PMValentin Baert
05/18/2022, 1:06 PMDeploymentSpec(
    name="gcs",
    flow=main,
    tags=["kubernetes"],
    flow_storage=GoogleCloudStorageBlock(
        bucket="prefect-poc"
    ),
    flow_runner=KubernetesFlowRunner(
        image="<http://gcr.io/everysens-integration/prefect-poc/development:latest|gcr.io/everysens-integration/prefect-poc/development:latest>",
        namespace="everysens",
        service_account_name="orion-agent"
    ),
)Valentin Baert
05/18/2022, 1:07 PMValentin Baert
05/18/2022, 1:07 PMValentin Baert
05/18/2022, 1:09 PMValentin Baert
05/18/2022, 1:17 PMAnna Geller
Anna Geller
Valentin Baert
05/18/2022, 3:08 PMAnna Geller
Valentin Baert
05/19/2022, 8:45 AMValentin Baert
05/19/2022, 9:26 AMValentin Baert
05/19/2022, 9:59 AMAnna Geller
PREFECT_API_URLAnna Geller
Anna Geller
Valentin Baert
05/19/2022, 12:03 PMValentin Baert
05/19/2022, 12:03 PMAnna Geller
# orion_streaming_app.py
if __name__ == "__main__":
    while True:
        main()
        time.sleep(1)Anna Geller
FROM prefecthq/prefect:2.0b5-python3.9
COPY orion_streaming_app.py .
CMD [ "python", "./orion_streaming_app.py" ]Anna Geller
Valentin Baert
05/19/2022, 12:14 PMValentin Baert
05/19/2022, 12:15 PMAnna Geller
Anna Geller
DeploymentSpecPREFECT_API_URLValentin Baert
05/19/2022, 12:39 PM@flow(name="prefect_2_kafka")
def process_message(msg):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Received message topic={} partition={} offset={} key={} value={}".format(
        msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
def main():
    topic_consumer = TopicConsumer(
        ["prefect-poc"],
        .....
    )
    for msg in topic_consumer.consume():
        process_message(msg)
if __name__ == '__main__':
    main()Valentin Baert
05/19/2022, 12:40 PMAnna Geller
Anna Geller
Anna Geller
Valentin Baert
05/19/2022, 12:49 PMyieldprocess_messageAnna Geller
Anna Geller
# orion_streaming_app.py
if __name__ == "__main__":
    while True:
        main()
        time.sleep(1)Valentin Baert
05/19/2022, 2:06 PMValentin Baert
05/19/2022, 2:07 PM@flowAnna Geller
# orion_streaming_app.py
if __name__ == "__main__":
    while True:
        main() # function decorated with flowif __name__ == "__main__":Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by