Bo
02/14/2022, 5:54 PMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Container.image should not be null or empty
I'm have defined the task as a yaml file (e.g. https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/s3_ecs_run_custom_task_definition.py) and that works fine, so I am not sure what I am doing wrong. Thanks!Ben Welsh
02/14/2022, 8:37 PMpython_dependencies
kwarg to the class to include open-source packages. But how do I get a private package from Google Artifact Registry included as well? Is there an established pattern for this?David Serédi
02/14/2022, 8:57 PMjack
02/15/2022, 12:25 AMRUN pip install -r requirements.txt
that requires network it just hangs.Владислав Богучаров
02/15/2022, 1:31 PMAlvaro Durán Tovar
02/15/2022, 7:52 PMRunNamespacedJob
is possible to get the logs from the job and have them on prefect ui? I'm setting log_level
but that isn't workingLana Dann
02/15/2022, 8:18 PMflow.run()
? i’m trying to create an integration test now and it’s just stalling so i wonder if it’s because the schedule that’s attached to it is waiting until the actual start time.Sharath Chandra
02/16/2022, 8:04 AMspark-submit
,
The issue I am facing is that in case of errors in the spark-job, the prefect task still shows it as a success.
I can see the pod status on k8s as failed with errors in the log.
How can we propagate the errors from spark jobs to prefect task?
The Task
is defined
class K8sSparkSubmitTask(ShellTask):
def __init__(
self,
command: str = None,
master_uri: str = None,
deploy_mode: str = None,
job_name: str = None,
conf_kwargs: dict = None,
env: dict = None,
helper_script: str = None,
shell: str = "bash",
return_all: bool = False,
log_stderr: bool = False,
**kwargs: Any,
)
super().__init__(
**kwargs,
command=command,
env=env,
helper_script=helper_script,
shell=shell,
return_all=return_all,
log_stderr=log_stderr,
)
@defaults_from_attrs("job_name", "conf_kwargs")
def run(
self,
job_name: str = None,
job_uri: str = None,
job_args: List[str] = None,
conf_kwargs: dict = None,
) -> str:
command = self._build_command(
master_uri=self.master_uri,
deploy_mode=self.deploy_mode,
job_name=job_name,
job_uri=job_uri,
job_args=job_args,
conf_kwargs=conf_kwargs,
)
print(f"The spark-submit command is {command}")
return super(K8sSparkSubmitTask, self).run(command=command)
The spark-job is
def main(args):
spark_session, spark_logger, config_dict = init_spark()
try:
# logic here
except Exception as err:
spark_logger.error(err)
raise
finally:
if spark_session is not None:
spark_session.stop()
Dotan Asselmann
02/16/2022, 8:20 AMJovan Sakovic
02/16/2022, 2:34 PMMichał
02/16/2022, 2:42 PMjack
02/16/2022, 5:10 PMLiam England
02/16/2022, 5:58 PMBruno Nunes
02/17/2022, 10:50 AMPedro Martins
02/17/2022, 6:23 PMDaniel
02/17/2022, 9:49 PMMariusz Olszewski
02/18/2022, 10:43 AMjcozar
02/20/2022, 1:49 PM51ec0178-bd12-43c4-8b00-fa4ae61300ef
was cancelled because of that automation. But in the dashboard that flow run does not exists, and a flow run was correctly executed at expected time. Do you know what might be happening?
Thank you in advance!Slava Shor
02/20/2022, 4:19 PMDASK_DISTRIBUTED__COMM__COMPRESSION=zlib
and now we getting a puzzling exception:
We didn’t use Conda explicitly at any stage. And yet, we see something weird. All of the stack trace is from Python 3.10 except the last line, which is from Python 3.8.Stéphan Taljaard
02/21/2022, 4:45 AMMichał
02/21/2022, 6:56 AMMathijs Carlu
02/21/2022, 3:04 PMget_run_logger().info('message')
, as the print()
function also does not seem to produce any logs.madhav
02/21/2022, 4:56 PMВладислав Богучаров
02/21/2022, 7:39 PMSen
02/22/2022, 2:07 PMprefect agent local start -l 'On_Prem_MapTest' -a "<http://MY-SERVER-IP:4200>"
Please find the code for the flow below:
# Basic Imports
import psutil
import os
import requests
# Extracting the Prefect Server URL
os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY-SERVER-IP:4200/graphql>"
import prefect
from prefect import Flow, Parameter, task
from prefect.core.task import Task
from prefect.engine import signals
from prefect.engine.state import State
from prefect.environments import LocalEnvironment
from prefect.environments.storage import Docker, Azure
from prefect.engine.executors import LocalDaskExecutor, DaskExecutor
@task
def create_url_list():
"""
Given the main page html, creates a list of episode URLs
"""
url_ids = [21.10, 21.04, 20.10, 20.04, 19.10, 19.04, 18.10, 18.04, 17.10, 17.04, 16.10, 16.04,
15.10, 15.04, 14.10, 14.04, 13.10, 13.04, 12.10, 12.04, 11.10, 11.04, 10.10, 10.04, 09.10, 09.04,
08.10, 08.04, 07.10, 07.04, 06.10, 06.06, 05.10, 05.04, 04.10, 06.10, 06.06, 05.10, 05.04, 04.10]
urls = []
for url_id in url_ids:
urls.append('<http://old-releases.ubuntu.com/releases/>' + str(url_id))
return urls
@task
def retrieve_url(url):
print(url)
html = requests.get(url)
if html.ok:
return str(len(html.content))
else:
return None
def main():
"""Main Function"""
with Flow(
"On_Prem_MapTest",
) as flow:
get_urls = create_url_list()
url_results = retrieve_url.map(get_urls)
# Registering the Flow as a Docker
flow.environment = LocalEnvironment()
# flow.storage = Local()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
flow.register("SampleFlows", labels=["On_Prem_MapTest"])
# The code below when uncommented works fine
# final_state = flow.run()
# final_state_2 = final_state.result[url_results]
# print('\n'.join([f'{s.result}: {s}' for s in final_state_2.map_states[:5]]))
if __name__ == "__main__":
main()
The problem is when I try to run this using flow from the agent by registering it and then using the PrefectUI to run the task, then it fails with the below message:
Failed to retrieve task state with error: ClientError('400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on\n type "Mutation". Did you mean "get_or_create_task_run" or\n "get_or_create_mapped_task_run_children"?\n\nThe GraphQL query was:\n\n mutation {\n get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {\n id\n version\n serialized_state\n }\n }\n\nThe passed variables were:\n\n null\n')
Traceback (most recent call last):
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 360, in _send_request
response.raise_for_status()
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/requests/models.py", line 953, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
map_index=map_index,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 1331, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 303, in graphql
retry_on_api_error=retry_on_api_error,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 219, in post
retry_on_api_error=retry_on_api_error,
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 445, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _send_request
raise ClientError(f"{exc}\n{graphql_msg}") from exc
prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>
The following error messages were provided by the GraphQL server:
GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on
type "Mutation". Did you mean "get_or_create_task_run" or
"get_or_create_mapped_task_run_children"?
The GraphQL query was:
mutation {
get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {
id
version
serialized_state
}
}
The passed variables were:
null
But this works when run directly from the machine using the flow.run() command as commented in the below code without using the agent.
I am not sure why it happens like this.
It will be great if someone can point me in the right direction on how to solve this issue.
I am also not sure on how to finally iterate through the results of the method url_results, once all mapped tasks are done.
Thanks in advance,Владислав Богучаров
02/22/2022, 4:43 PMDevin Flake
02/23/2022, 12:22 AMhelm install --namespace arte-prefect --version 2022.01.25 --values prefect-values.yaml arte-prefect prefecthq/prefect-server
Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: unable to recognize "": no matches for kind "Ingress" in version "extensions/v1beta1"
Zhibin Dai
02/24/2022, 12:49 AMScarlett King
02/24/2022, 1:06 PMprefect agent kubernetes start —job-template job_template-nonprod.yaml
to start the agent. We’re currently setting up our Prefect Server on AKS and deploying via helm chart. Where can I store this file so when the agent script is run, it would pick up the file? Also, we have credentials stored on this file so it will also need to it would also need to be stored somewhere safe as well.Bruno Nunes
02/24/2022, 1:38 PMBruno Nunes
02/24/2022, 1:38 PMAnna Geller
02/24/2022, 1:47 PMBruno Nunes
02/24/2022, 3:01 PM# Global parameters
BASE_DT = Parameter('BASE_DT', default='12312019')
ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1')
CYCLE_ID = Parameter('CYCLE_ID', default='10000')
FA_ID = Parameter('FA_ID', default='2022.1.1')
FA_PATH = Parameter(
'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1')
RUN_INSTANCE = Parameter(
'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019')
# Instantiate the flow
flow_init = Flow("Initialize")
# # Initialize and run the tasks
initialize = RunSpreTask(name='Initialize', log_stdout=True)
flow_init.set_dependencies(task=initialize,
upstream_tasks=[],
keyword_tasks=dict(BASE_DT=BASE_DT,
CYCLE_ID=CYCLE_ID,
ENTITY_ID=ENTITY_ID,
FA_ID=FA_ID,
FA_PATH=FA_PATH,
RUN_INSTANCE=RUN_INSTANCE,
NODE_CODE='core_node_init.sas',
RUN_OPTION='core_cfg.run_option',
SYSTEM_OPTION='sys_cfg.run_option',
FLOW_OPTION='core_res.flow_option',
)
)
flow_data_enrichment = Flow("Data Enrichment")
filter_by_entity = RunSpreTask(name='Filter by Entity', log_stdout=True)
flow_data_enrichment.set_dependencies(task=filter_by_entity,
upstream_tasks=[flow_init],
keyword_tasks=dict(
GROUP_FLG='N',
NODE_CODE='core_node_filter_entity.sas',
ENTITY_IN='core_lnd.entity',
ENTITY_OUT='core_res.entity',
)
)
I'm getting an error because the prefect.context.parameter can't be resolved in my RunSpreTask class.Anna Geller
02/24/2022, 3:04 PM