haf
10/21/2021, 9:58 PMFailed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1798, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
Zanie
Zanie
haf
10/21/2021, 10:04 PMhaf
10/21/2021, 10:04 PM0.15.6
haf
10/21/2021, 10:04 PMFROM prefecthq/prefect:0.15.6-python3.8
Zanie
haf
10/21/2021, 10:06 PMhaf
10/21/2021, 10:06 PMZanie
haf
10/21/2021, 10:07 PMZanie
haf
10/21/2021, 10:12 PMhaf
10/21/2021, 10:13 PMZanie
Zanie
Zanie
Zanie
haf
10/21/2021, 10:43 PMhaf
10/21/2021, 10:43 PMhaf
10/21/2021, 10:44 PMZanie
haf
10/21/2021, 10:46 PMpg_db = Parameter("LOGARY_PG_DB", default=getenv("LOGARY_PG_DB", default="analytics"))
@task(nout=2)
def fetch_model(dsn_params: DSNParams) -> Tuple[str, ModelDTO]:
m = _fetch_model(dsn_params)
return m.id, m
with Flow(
"run_mmm",
state_handlers=[print_state_callback],
) as flow:
# <https://deepnote.com/project/Media-Mix-Model-5xns-00xTG6nRlUK1f9DfA/%2Ftest_preprocessing.ipynb>
#
########### CONFIG #######
dsn_params = lambda name: build_dsn_params(
user=pg_user,
password=pg_password,
host=pg_host,
port=pg_port,
dbname=pg_db,
sslmode=pg_sslmode,
application_name=f"mmm/{name}",
)
model_id, model = fetch_model(dsn_params("fetch_model"))
haf
10/21/2021, 10:47 PMhaf
10/21/2021, 10:48 PMhaf
10/21/2021, 10:49 PMZanie
prefect run -p
or flow.run
? Does it work with the cloud local agent?haf
10/21/2021, 10:49 PMZanie
haf
10/21/2021, 10:50 PMZanie
prefect run --name "run_mmm" --execute
to do an agentless run (that still interacts with cloud)Zach Angell
haf
10/22/2021, 9:52 AMhaf
10/22/2021, 10:15 AMhaf
10/22/2021, 10:40 AMhaf
10/22/2021, 10:40 AMhaf
10/22/2021, 10:40 AMhaf
10/22/2021, 10:41 AMhaf
10/22/2021, 10:41 AMhaf
10/22/2021, 10:41 AMhaf
10/22/2021, 10:51 AMhaf
10/22/2021, 11:04 AMagent.py
file looks on disk: you'll find all of that discussion further up.haf
10/22/2021, 11:04 AMvolumeMounts:
- mountPath: /opt/job_template.yaml
name: prefect-agent-conf
subPath: job_template.yaml
- mountPath: /usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py
name: prefect-agent-conf
subPath: kubernetes_agent.py
haf
10/22/2021, 11:04 AMhaf
10/22/2021, 11:09 AMAnna Geller
prefect register --project project_name -p /path/to/flows/that/need/registration
haf
10/22/2021, 11:17 AMAnna Geller
Anna Geller
haf
10/22/2021, 11:27 AMhaf
10/22/2021, 11:27 AMJoël Luijmes
10/22/2021, 11:27 AMJoël Luijmes
10/22/2021, 11:28 AMAnna Geller
haf
10/22/2021, 11:31 AMJoël Luijmes
10/22/2021, 11:31 AMhaf
10/22/2021, 11:32 AMhaf
10/22/2021, 11:32 AMAnna Geller
haf
10/22/2021, 11:33 AMJoël Luijmes
10/22/2021, 11:34 AMhaf
10/22/2021, 11:35 AMhaf
10/22/2021, 12:00 PMapiVersion: apps/v1
kind: Deployment
metadata:
name: prefect-agent
spec:
replicas: 1
template:
spec:
serviceAccountName: prefect-agent
containers:
- name: agent
args:
- prefect agent kubernetes start --job-template /opt/job_template.yaml
command: [ "/bin/bash", "-c" ]
env:
- name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
valueFrom:
secretKeyRef:
name: prefect-agent
key: prefect-cloud-token
- name: PREFECT__CLOUD__API
value: <https://api.prefect.io>
- name: NAMESPACE
value: flows
- name: IMAGE_PULL_SECRETS
value: ''
- name: PREFECT__CLOUD__AGENT__LABELS
value: '["dev"]'
- name: JOB_MEM_REQUEST
value: ''
- name: JOB_MEM_LIMIT
value: ''
- name: JOB_CPU_REQUEST
value: ''
- name: JOB_CPU_LIMIT
value: ''
- name: IMAGE_PULL_POLICY
value: IfNotPresent
- name: SERVICE_ACCOUNT_NAME
value: prefect-agent
- name: PREFECT__BACKEND
value: cloud
- name: PREFECT__CLOUD__AGENT__AGENT_ADDRESS
value: http://:8080
# - name: PREFECT__LOGGING__LEVEL
# value: DEBUG
- name: PREFECT__CLOUD__AGENT__LEVEL
value: DEBUG
image: prefecthq/prefect:0.15.4-python3.8
livenessProbe:
failureThreshold: 2
httpGet:
path: /api/health
port: 8080
initialDelaySeconds: 40
periodSeconds: 40
resources:
requests:
cpu: 200m
memory: 40Mi
limits:
cpu: 1000m
memory: 1024Mi
volumeMounts:
- mountPath: /opt/job_template.yaml
name: prefect-agent-conf
subPath: job_template.yaml
- mountPath: /usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py
name: prefect-agent-conf
subPath: kubernetes_agent.py
volumes:
- name: prefect-agent-conf
configMap:
name: prefect-agent
Zanie
Zanie
Zanie
haf
10/22/2021, 3:32 PMhaf
10/22/2021, 3:32 PMhaf
10/22/2021, 3:32 PMhaf
10/22/2021, 3:32 PMhaf
10/22/2021, 3:32 PMhaf
10/22/2021, 3:32 PMZanie
haf
10/22/2021, 3:33 PMhaf
10/22/2021, 3:33 PMZanie
Zanie
Joël Luijmes
10/22/2021, 7:18 PMhaf
10/24/2021, 7:30 AMhaf
10/24/2021, 7:31 AMAnna Geller
git clone <https://github.com/PrefectHQ/prefect/tree/task-run-info-missing-id>
pip install ./prefect
not 100% sure, but in poetry this could be:
prefect = { git = "<https://github.com/PrefectHQ/prefect.git>", branch = "task-run-info-missing-id" }
haf
10/24/2021, 10:50 AMAnna Geller
pip install -U git+<https://github.com/PrefectHQ/prefect@task-run-info-missing-id#egg=prefect>
haf
10/24/2021, 11:14 AMhaf
10/24/2021, 11:14 AMhaf
10/24/2021, 11:14 AMhaf
10/24/2021, 11:15 AMhaf
10/24/2021, 11:19 AMhaf
10/24/2021, 11:20 AMhaf
10/24/2021, 11:20 AMhaf
10/24/2021, 11:21 AMhaf
10/24/2021, 12:52 PMhaf
10/24/2021, 1:07 PMhaf
10/25/2021, 8:51 PM[2021-10-25 20:50:18+0000] ERROR - prefect.CloudTaskRunner | Failed to retrieve task state with error: ValueError('`task_id` missing from task run context')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 158, in initialize_run
raise ValueError("`task_id` missing from task run context")
ValueError: `task_id` missing from task run context
haf
10/25/2021, 8:52 PMTask slug(s) missing from the current flow missing from the flow stored in the Prefect backend: {'LOGARY_PG_SSLMODE', 'LOGARY_PG_PASSWORD-1', 'LOGARY_PG_HOST', 'LOGARY_PG_USER-1'}
Zanie
Zanie
haf
10/25/2021, 8:54 PMhaf
10/25/2021, 8:55 PMZanie
haf
10/25/2021, 8:56 PMfrom typing import Optional
from prefect.client.secrets import Secret as _Secret
from prefect.core.task import Task
from prefect.engine.results.secret_result import SecretResult
class LogarySecret(Task):
"""
Prefect Secrets Task. This task retrieves the underlying secret through
the Prefect Secrets API (which has the ability to toggle between local vs. Cloud secrets).
Args:
- name (str, optional): The name of the underlying secret
- **kwargs (Any, optional): additional keyword arguments to pass to the Task constructor
Raises:
- ValueError: if a `result` keyword is passed
"""
secret_name: Optional[str]
default: Optional[str]
def __init__(self, name=None, default=None, **kwargs):
if kwargs.get("result"):
raise ValueError("Result types for Secrets are not configurable.")
kwargs["checkpoint"] = False
self.secret_name = name
self.default = default
super().__init__(name=name, **kwargs)
self.result = SecretResult(secret_task=self)
def run(self, name: str = None):
"""
The run method for Secret Tasks. This method actually retrieves and returns the
underlying secret value using the `Secret.get()` method. Note that this method first
checks context for the secret value, and if not found either raises an error or queries
Prefect Cloud, depending on whether `config.cloud.use_local_secrets` is `True` or
`False`.
Args:
- name (str, optional): the name of the underlying Secret to retrieve. Defaults
to the name provided at initialization.
Returns:
- Any: the underlying value of the Prefect Secret
"""
if name is None:
name = self.secret_name
if name is None:
raise ValueError("A secret name must be provided.")
_s = _Secret(name)
if not _s.exists() and self.default is not None:
return self.default
return _s.get()
Zanie
build_dsn_params
?haf
10/26/2021, 8:56 AMhaf
10/26/2021, 10:59 AMZanie
haf
10/26/2021, 5:11 PMZanie
-1
appended are secrets, the others are parameters.Zanie
haf
10/26/2021, 5:41 PMZanie
Zanie
haf
10/26/2021, 5:51 PMflow.register
haf
10/26/2021, 5:52 PMZanie
haf
10/27/2021, 7:38 PMZanie
haf
10/27/2021, 7:44 PMJoël Luijmes
10/28/2021, 9:16 AMhaf
10/28/2021, 1:13 PMhaf
10/28/2021, 1:14 PMhaf
10/28/2021, 3:02 PMZanie
Kevin Kho
Joël Luijmes
10/28/2021, 5:00 PMZanie