Sang Young Noh
05/03/2022, 8:23 AMAnna Geller
prefect storage create --help
more on that here: https://orion-docs.prefect.io/concepts/storage/Valentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:21 PMValentin Baert
05/17/2022, 2:22 PMValentin Baert
05/17/2022, 2:22 PMAnna Geller
DeploymentSpec(
name="gcs",
flow=hello_flow,
tags=["local"],
flow_storage=FileStorageBlock(base_path="<gcs://prefect-orion/flows>"),
)
DeploymentSpec(
name="dev",
flow=hello_flow,
tags=["local"],
flow_storage=GoogleCloudStorageBlock(bucket="prefect-orion"),
)
assuming GOOGLE_APPLICATION_CREDENTIALS is a string with the path to your JSON service account file:
import json
import io
sa_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
with io.open(sa_file, "r", encoding="utf-8") as json_file:
service_account_info = json.load(json_file)
DeploymentSpec(
name="gcs",
flow=hello_flow,
tags=["local"],
flow_storage=GoogleCloudStorageBlock(
bucket="prefect-orion",
service_account_info=service_account_info
),
)
Anna Geller
export GOOGLE_APPLICATION_CREDENTIALS=/Users/path/to/your/sa.json
and use GCS default storageSang Young Noh
05/17/2022, 2:42 PMValentin Baert
05/17/2022, 3:26 PMAnna Geller
Anna Geller
DeploymentSpec
examples I posted here? what error did you get when creating a deployment with GCS storage? I'm sure we can sort this out together so this shouldn't be the only reason to stick to Prefect 1.0Valentin Baert
05/18/2022, 11:40 AMValentin Baert
05/18/2022, 11:40 AMValentin Baert
05/18/2022, 11:42 AMValentin Baert
05/18/2022, 11:42 AMapi Starting...
api ___ ___ ___ ___ ___ ___ _____ ___ ___ ___ ___ _ _
api | _ \ _ \ __| __| __/ __|_ _| / _ \| _ \_ _/ _ \| \| |
api | _/ / _|| _|| _| (__ | | | (_) | /| | (_) | .` |
api |_| |_|_\___|_| |___\___| |_| \___/|_|_\___\___/|_|\_|
api Configure Prefect to communicate with the server with:
api prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
api Check out the dashboard at <http://0.0.0.0:4200>
api INFO: Started server process [9[]
api INFO: Waiting for application startup.
api INFO: Application startup complete.
api INFO: Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
agent Starting agent connected to <https://api-beta.prefect.io/api/accounts/8f6464fb-3d>
agent 6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0...
agent ___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
agent | _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
agent | _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
agent |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
agent Agent started! Looking for work from queue 'kubernetes'...
agent 09:01:07.562 | INFO | prefect.agent - Submitting flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent 09:01:07.699 | INFO | prefect.flow_runner.kubernetes - RUNNING
agent 09:01:07.906 | INFO | prefect.flow_runner.kubernetes - Flow run 'charming-gopher' has job settings = {'metadata': {'generateName': 'charming-gopher', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '1e14e8b2-11b5-432a-802b-fb8f57fd68b5', 'io.prefect.flow-run-name': 'charming-gopher', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'prefecthq/prefect:2.0b4-python3.8', 'command': ['python', '-m', 'prefect.engine', '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'], 'env': [{'name': 'PREFECT_API_URL', 'value': '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'}, {'name': 'PREFECT_API_KEY', 'value': 'pnu_iyYoMBQYyonrYPzQNGvCwI3nDsst4Q4erhiy'}]}]}}, 'backoff_limit': 4}}
agent 09:01:07.934 | ERROR | prefect.agent - Flow runner failed to submit flow run '1e14e8b2-11b5-432a-802b-fb8f57fd68b5'
agent Traceback (most recent call last):
agent File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 136, in submit_run
agent await self.task_group.start(flow_runner.submit_flow_run, flow_run)
agent File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 701, in start
agent return await future
agent File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 827, in submit_flow_run
agent job_name = await run_sync_in_worker_thread(self._create_and_start_job, flow_run)
agent File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
agent return await anyio.to_thread.run_sync(call, cancellable=True)
agent File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
agent return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
agent File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
agent return await future
agent File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
agent result = context.run(func, *args)
agent File "/usr/local/lib/python3.8/site-packages/prefect/flow_runners.py", line 1016, in _create_and_start_job
agent job = self.batch_client.create_namespaced_job(self.namespace, job_settings)
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
agent return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
agent return self.api_client.call_api(
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
agent return self.__call_api(resource_path, method,
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
agent response_data = self.request(
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request
agent return <http://self.rest_client.POST|self.rest_client.POST>(url,
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST
agent return self.request("POST", url,
agent File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request
agent raise ApiException(http_resp=r)
agent kubernetes.client.exceptions.ApiException: (403)
agent Reason: Forbidden
agent HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cd7f296b-0444-4ac4-b4ec-dc88b9843534', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '27d5e504-aa65-49d9-8f50-ef3c4aa090aa', 'X-Kubernetes-Pf-Prioritylevel-Uid': '1894d569-9152-44eb-848e-bd2a893b4c22', 'Date': 'Wed, 18 May 2022 09:01:07 GMT', 'Content-Length': '313'})
agent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
Anna Geller
Valentin Baert
05/18/2022, 11:44 AMValentin Baert
05/18/2022, 11:44 AMimport io
import json
import os
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException
from prefect import flow, task, get_run_logger
from prefect.blocks.storage import GoogleCloudStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
class TopicConsumer:
def __init__(self, topics, config, commits_every_n_messages=1):
self.topics = topics
self.config = config
self.commits_every_n_messages = commits_every_n_messages
self.consumer = Consumer(config)
self.running = False
def consume(self):
try:
self.consumer.subscribe(self.topics)
print("Subscribed to topics: {}".format(self.topics))
self.running = True
msg_count = 0
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
yield msg
msg_count += 1
if msg_count % self.commits_every_n_messages == 0:
self.consumer.commit(asynchronous=False)
finally:
# Close down consumer to commit final offsets.
self.consumer.close()
def shutdown(self):
self.running = False
@task
def process_message(msg):
logger = get_run_logger()
<http://logger.info|logger.info>("topic={} partition={} offset={} key={} value={}".format(
msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
@flow(name="prefect_2_kafka_kub")
def main():
conf = {
'bootstrap.servers': "xxxxx",
'group.id': "prefect_poc_kafka",
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username' :'xxxxx',
'sasl.password': 'xxxxxx'
}
topic_consumer = TopicConsumer(
["prefect-poc"],
conf
)
for msg in topic_consumer.consume():
process_message(msg)
topic_consumer.shutdown()
service_account_info = json.loads(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
DeploymentSpec(
name="gcs",
flow=main,
tags=["kubernetes"],
flow_storage=GoogleCloudStorageBlock(
bucket="prefect-poc",
service_account_info=service_account_info
),
flow_runner=KubernetesFlowRunner()
)
if __name__ == '__main__':
main()
Valentin Baert
05/18/2022, 11:45 AMValentin Baert
05/18/2022, 11:45 AMValentin Baert
05/18/2022, 11:46 AMapiVersion: apps/v1
kind: Deployment
metadata:
name: orion
spec:
selector:
matchLabels:
app: orion
replicas: 1 # We're using SQLite, so we should only run 1 pod
template:
metadata:
labels:
app: orion
spec:
containers:
- name: api
image: prefecthq/prefect:2.0b4-python3.8
command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
imagePullPolicy: "IfNotPresent"
ports:
- containerPort: 4200
- name: agent
image: prefecthq/prefect:2.0b4-python3.8
command: ["prefect", "agent", "start", "kubernetes"]
imagePullPolicy: "IfNotPresent"
env:
- name: PREFECT_API_URL
value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
- name: PREFECT_API_KEY
value: 'xxxxxxxxxxx'
- name: GOOGLE_APPLICATION_CREDENTIALS
value: '{"type":"service_account",.....'
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
namespace: default
name: flow-runner
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
name: flow-runner-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: Role
name: flow-runner
apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
Valentin Baert
05/18/2022, 11:46 AMAnna Geller
Valentin Baert
05/18/2022, 11:48 AMAnna Geller
Anna Geller
service_account_name
Valentin Baert
05/18/2022, 11:52 AMAnna Geller
prefect config view
), then you create a work queue e.g.:
prefect work-queue create -t k8s
and then your agent for this queue:
prefect agent start k8s
Valentin Baert
05/18/2022, 11:54 AMAnna Geller
Valentin Baert
05/18/2022, 11:54 AMprefect deployment create src/flows/prefect_2_kafka_kub.py
prefect deployment run prefect_2_kafka_kub/gcs
Valentin Baert
05/18/2022, 11:54 AMValentin Baert
05/18/2022, 11:54 AMValentin Baert
05/18/2022, 11:55 AMagent HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:everysens:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
Valentin Baert
05/18/2022, 11:56 AMAnna Geller
Valentin Baert
05/18/2022, 11:58 AMAnna Geller
Valentin Baert
05/18/2022, 12:06 PMValentin Baert
05/18/2022, 12:07 PMAnna Geller
Valentin Baert
05/18/2022, 12:07 PMValentin Baert
05/18/2022, 12:08 PMAnna Geller
class UniversalFlowRunner(FlowRunner):
"""
The universal flow runner contains configuration options that can be used by any
Prefect flow runner implementation.
This flow runner cannot be used at runtime and should be converted into a subtype.
Attributes:
env: Environment variables to provide to the flow run
"""
typename: Literal["universal"] = "universal"
env: Dict[str, str] = Field(default_factory=dict)
Anna Geller
env=dict(key="value")
Valentin Baert
05/18/2022, 12:08 PMAnna Geller
Anna Geller
class KubernetesFlowRunner(UniversalFlowRunner):
"""
Executes flow runs in a Kubernetes job.
Requires a Kubernetes cluster to be connectable.
Attributes:
image: An optional string specifying the tag of a Docker image to use for the job.
namespace: An optional string signifying the Kubernetes namespace to use.
service_account_name: An optional string specifying which Kubernetes service account to use.
labels: An optional dictionary of labels to add to the job.
image_pull_policy: The Kubernetes image pull policy to use for job containers.
restart_policy: The Kubernetes restart policy to use for jobs.
stream_output: If set, stream output from the container to local standard output.
"""
Valentin Baert
05/18/2022, 12:35 PMAnna Geller
Valentin Baert
05/18/2022, 12:39 PMValentin Baert
05/18/2022, 12:40 PMValentin Baert
05/18/2022, 12:40 PMapiVersion: apps/v1
kind: Deployment
metadata:
name: orion
spec:
selector:
matchLabels:
app: orion
replicas: 1 # We're using SQLite, so we should only run 1 pod
template:
metadata:
labels:
app: orion
spec:
serviceAccountName: orion-agent
containers:
- name: api
image: prefecthq/prefect:2.0b4-python3.8
command: ["prefect", "orion", "start", "--host", "0.0.0.0", "--log-level", "WARNING"]
imagePullPolicy: "IfNotPresent"
ports:
- containerPort: 4200
- name: agent
image: prefecthq/prefect:2.0b4-python3.8
command: ["prefect", "agent", "start", "kubernetes"]
imagePullPolicy: "IfNotPresent"
env:
- name: PREFECT_API_URL
value: '<https://api-beta.prefect.io/api/accounts/8f6464fb-3d6b-4fc0-8d89-107cafbf6a23/workspaces/77a268cd-2363-43dd-9de1-cc2afde567e0>'
- name: PREFECT_API_KEY
value: 'xxxxxx'
- name: GOOGLE_APPLICATION_CREDENTIALS
value: '{"type":"service_account",'
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: orion-agent
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
name: orion-agent-rbac
namespace: everysens
rules:
- apiGroups:
- batch
- extensions
resources:
- jobs
verbs:
- '*'
- apiGroups:
- ''
resources:
- events
- pods
- pods/log
- pods/status
verbs:
- '*'
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
name: orion-agent-rbac
namespace: everysens
roleRef:
apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
kind: Role
name: orion-agent-rbac
subjects:
- kind: ServiceAccount
name: orion-agent
Valentin Baert
05/18/2022, 12:41 PMAnna Geller
Valentin Baert
05/18/2022, 12:42 PMAnna Geller
Valentin Baert
05/18/2022, 12:42 PMAnna Geller
Anna Geller
FROM prefecthq/prefect:2.0b4-python3.8
COPY requirements.txt .
RUN pip install -r requirements.txt
Valentin Baert
05/18/2022, 12:44 PMValentin Baert
05/18/2022, 1:02 PMValentin Baert
05/18/2022, 1:03 PMValentin Baert
05/18/2022, 1:03 PMValentin Baert
05/18/2022, 1:06 PMValentin Baert
05/18/2022, 1:06 PMDeploymentSpec(
name="gcs",
flow=main,
tags=["kubernetes"],
flow_storage=GoogleCloudStorageBlock(
bucket="prefect-poc"
),
flow_runner=KubernetesFlowRunner(
image="<http://gcr.io/everysens-integration/prefect-poc/development:latest|gcr.io/everysens-integration/prefect-poc/development:latest>",
namespace="everysens",
service_account_name="orion-agent"
),
)
Valentin Baert
05/18/2022, 1:07 PMValentin Baert
05/18/2022, 1:07 PMValentin Baert
05/18/2022, 1:09 PMValentin Baert
05/18/2022, 1:17 PMAnna Geller
Anna Geller
Valentin Baert
05/18/2022, 3:08 PMAnna Geller
Valentin Baert
05/19/2022, 8:45 AMValentin Baert
05/19/2022, 9:26 AMValentin Baert
05/19/2022, 9:59 AMAnna Geller
PREFECT_API_URL
for observability, visibility and orchestration
this way, stopping it is as simple as stopping your application. If you change the code for this, you can deploy a second application and take down the old one once you are ready to ensure a continuous data stream with little to no downtimeAnna Geller
Anna Geller
Valentin Baert
05/19/2022, 12:03 PMValentin Baert
05/19/2022, 12:03 PMAnna Geller
# orion_streaming_app.py
if __name__ == "__main__":
while True:
main()
time.sleep(1)
Anna Geller
FROM prefecthq/prefect:2.0b5-python3.9
COPY orion_streaming_app.py .
CMD [ "python", "./orion_streaming_app.py" ]
Anna Geller
Valentin Baert
05/19/2022, 12:14 PMValentin Baert
05/19/2022, 12:15 PMAnna Geller
Anna Geller
DeploymentSpec
and just point your script at your PREFECT_API_URL
for orchestration and observabilityValentin Baert
05/19/2022, 12:39 PM@flow(name="prefect_2_kafka")
def process_message(msg):
logger = get_run_logger()
<http://logger.info|logger.info>("Received message topic={} partition={} offset={} key={} value={}".format(
msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value()))
def main():
topic_consumer = TopicConsumer(
["prefect-poc"],
.....
)
for msg in topic_consumer.consume():
process_message(msg)
if __name__ == '__main__':
main()
and I start it with :
export PREFECT_API_URL=https://api-beta.prefect.io/api/...
export PREFECT_API_URL=xxx
python prefect_2_kafka_kub.py
and I have the observability in prefect cloudValentin Baert
05/19/2022, 12:40 PMAnna Geller
Anna Geller
Anna Geller
Valentin Baert
05/19/2022, 12:49 PMyield
it to the main function
If it reaches the timeout, it loops in the infinite loop and polls again
When the message is yielded to the main lopp, that's when I start a @flow with the process_message
function
After the message is consumed it resumes the generator function and resumes to calling the poll function and the cycle continues
So I don't understand where I would need a time.sleep() and for what purpose ?Anna Geller
Anna Geller
# orion_streaming_app.py
if __name__ == "__main__":
while True:
main()
time.sleep(1)
this way you get fine-grained visibility into each run and the whole process is much more observable and orchestratable via PrefectValentin Baert
05/19/2022, 2:06 PMValentin Baert
05/19/2022, 2:07 PM@flow
(which is just a logger here but will actually contain several tasks IRL)Anna Geller
# orion_streaming_app.py
if __name__ == "__main__":
while True:
main() # function decorated with flow
and then moving the while self.running out of the class into the if __name__ == "__main__":
block
but this already looks good, nice work, and thanks for sharing!