Martin
09/20/2021, 9:59 PMShaoyi Zhang
09/21/2021, 12:10 AMError creating: pods "prefect-job-138dbd9d-v6rq5" is forbidden: failed quota: xxxxx-request-quota: must specify requests.cpu,requests.memory
although I specified the request limit with
env:
.....
- name: JOB_MEM_REQUEST
value: "2048Mi"
- name: JOB_MEM_LIMIT
value: "4096Mi"
- name: JOB_CPU_REQUEST
value: "100m"
- name: JOB_CPU_LIMIT
value: "200m"
This is the Prefect source code if I understand correctly and I’m using prefect:0.15.5-python3.6
. Any suggestions?Anh Nguyen
09/21/2021, 2:11 AMLucas Beck
09/21/2021, 8:07 AMn
different jobs in a k8s cluster to perform some computationally heavy tasks. Sometimes when I cancel the flow or it fails for whatever reason, I then need to manually clean up the n
jobs in the k8s cluster. To avoid this manual work, I am trying to create a state handler to deal with that. The handler works for the failed state, but does not with cancelling. I have tried both Cancelling
and Cancelled
. For ilustration purpouses, there is an example below:
import os
import uuid
import time
import prefect
from prefect import Task, Parameter
from prefect.executors.dask import LocalDaskExecutor
from prefect.utilities.edges import unmapped
def clean_up(task, old_state, new_state):
logger = prefect.context.get("logger")
task_full_name = prefect.context.get("task_full_name")
if isinstance(new_state, prefect.engine.state.Failed):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelled):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
elif isinstance(new_state, prefect.engine.state.Cancelling):
<http://logger.info|logger.info>(
f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
)
class DoSomeComputation(Task):
def __init__(self, **kwargs):
self.run_id = str(uuid.uuid4())[:8]
super().__init__(**kwargs)
def run(self, input, fail=True):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Started some computation task with input {input}")
for i in range(0, 200):
time.sleep(1)
if fail:
raise Exception("Something terrible happened")
<http://logger.info|logger.info>("Sucessful completed some computation task")
if __name__ == "__main__":
project_name = "test-flows"
flow_name = "test-cleanup"
with Flow(...) as flow:
fail = Parameter("fail")
do_some_comp = DoSomeComputation(
state_handlers=[clean_up], task_run_name="{task_name}-job-name={input}"
)
inputs = ["input1", "input2", "input3"]
do_some_comp.map(inputs, unmapped(fail))
flow.executor = LocalDaskExecutor(num_workers=20)
flow.register(project_name=project_name)
The "cleaning up job..." log never gets triggered when I cancel the job. Anyone has an ideia or also experienced this?
PS: I am using the KubernetesRun
flow configAndrey Nikonov
09/21/2021, 9:05 AMquery DMS{
flow (
where: {
name: {_eq: "Daily metrics summary"}, archived: {_eq: false}
}
) {
id, version,
tasks {
id, name,
task_runs(order_by: {start_time: desc}, limit: 1){
start_time, end_time
},
# Here I'm getting error (see below)
timeout
}}
}
Ruslan Aliev
09/21/2021, 12:25 PM@task(name="Download video", log_stdout=True)
def download_video(task_params, destination_dir='./data'):
remote_file_path = f"{task_params['storage']['source']}{task_params['video']['filepath']}"
destination_dir = Path(destination_dir)
destination_dir.mkdir(exist_ok=True)
destination_path = f"{destination_dir / Path(remote_file_path).name}"
file_download(remote_file_path, destination_path)
@task(name="Fetch previous result", log_stdout=True)
def fetch_prev_result(task):
prev_result = task['prev_result']
db_name = prev_result['db_name']
collection_name = prev_result['collection_name']
out_path = f"{db_name}_{collection_name}.csv"
mongo_fetch(mongo_client, db_name, collection_name, out_path=out_path)
is_detection = check_equal(task_type, 1)
ifelse(is_detection, download_video(task_params, destination_dir='./videos'), fetch_prev_result(task))
command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
container_name=container_name(scenario_id=scenario_id, task_id=task_id),
command=command).set_upstream([download_video, fetch_prev_result])
started = start_container(container_id=container_id)
maz
09/21/2021, 4:52 PMMax Kureykin
09/21/2021, 5:13 PMDaniel Saxton
09/21/2021, 5:17 PMDanny Vilela
09/21/2021, 5:51 PMTask
is retrying. It tells me which Task
failed, when it’ll retry, how many times this task has retried, etc. I think I’d really rather just have that same state handler on the top-level Flow
to avoid setting that state handler on each task. I noticed the signatures for Task
state handlers are different than those for the Flow
state handlers — is there an easy way to have a Flow
state handler that can access Task-level information?
For example, given a Flow
that’s changing states, can we readily access the specific Task
in question?Hui Zheng
09/22/2021, 12:43 AM- prefect.Docker | Pushing image to the registry...
Does it happen to other people?Abhas P
09/22/2021, 1:10 AMMax Kureykin
09/22/2021, 9:05 AMNadav Nuni
09/22/2021, 1:21 PMLocalExecutor
? or is there something else I need to do?
the docs here state that I need to a DaskExecutor
for mapping to work…but I’m not sure which mapping, and I’m also not sure I understand why…Svyat
09/22/2021, 3:14 PMrun_config
parameters in ECSRun
classLukáš Pravda
09/22/2021, 4:22 PMautofunction
directive (tedious and not ideal). I can achieve a similar behaviour when I define the task using class and run() method (would require a lot of changes in the code base, so not ideal too), and I’ve also came across a plugin for celery….
So my question is: is there anything else I can do with present version of prefect to set up sphinx documentation using automodule
? If so, could you point me to the right direction/provide me with a code snippet? If not do you plan on making something like prefect-sphinx plugin, that would allow this functionality? Thanks in advance for any comment/suggestion!Jeremy Phelps
09/22/2021, 5:21 PMrequests
library? I need to cancel a bunch of flow runs but the Web interface is too cumbersome and the CLI tool doesn't support cancellation.
I notice that the browser has a token that only lasts a few minutes.Anatoly Alekseev
09/23/2021, 12:51 AMAnh Nguyen
09/23/2021, 2:40 AMSandip Viradiya
09/23/2021, 6:48 AMSTORAGE = Bitbucket(
project="PrefectDemoECS", # name of project
repo="prefect", # name of repo in project
path="demo.py", # location of flow file in repo
workspace="sandipviradiya",
cloud_username_secret="bb_username",
cloud_app_password_secret="bb_app_password"
)
It is showing me the 404 error as the given image.
I checked the code https://github.com/PrefectHQ/prefect/blob/a2025983352d7e152da88358c3e578f3ce73778a/src/prefect/storage/bitbucket.py#L29 and it seems it is not selecting the Bitbucket cloud function and trying the Bitbucket server fetch only.
Any guidance will be really helpful.Sandip Viradiya
09/23/2021, 6:52 AMSandip Viradiya
09/23/2021, 6:52 AMMilly gupta
09/23/2021, 8:24 AMJohn Lee
09/23/2021, 10:12 AMfrom pathlib import Path
import tempfile
import os
import google.auth
creds = Path(tempfile.NamedTemporaryFile().name)
creds.write_text(os.environ["GOOGLE_APPLICATION_CREDENTIALS"])
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = str(creds)
google.auth.default()
Ruslan Aliev
09/23/2021, 12:36 PMlocal agent
and try to bind local folder with folder inside docker container using volumes
, but looks like it doesn’t work or I do it wrong 👀:
container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
container_id = create_container(image_name=image_name,
container_name=container_name(scenario_id=scenario_id, task_id=task_id),
# command=command,
command=['ls'],
volumes=['data:/home/data',
'temp:/home/temp'],
).set_upstream(input_file_path)
started = start_container(container_id=container_id)
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
logs = get_container_logs(container_id=container_id, upstream_tasks=[status_code])
log([logs])
The log([logs])
shows, that the data
and temp
folders are missed.
What am I missing here?Barbara Abi Khoriati
09/23/2021, 12:38 PMIssam Assafi
09/23/2021, 12:38 PMSteve s
09/23/2021, 1:03 PMDmitry Kuleshov
09/23/2021, 1:18 PMAnze Kravanja
09/23/2021, 2:49 PM