Nicolas Gastaldi
12/12/2022, 7:24 PMScott Chamberlain
12/12/2022, 8:32 PMprefect deployment update
? Do you just nuke your deployment.yaml file, and rerun prefect deployment build
then prefect deployment apply foo-deployment.yaml
? Maybe it depends on where storage is located? Im using GCS storage.Tomas Moreno
12/12/2022, 10:58 PMNikhil Jain
12/12/2022, 11:22 PMDan Marling
12/12/2022, 11:55 PMAshley Felber
12/12/2022, 11:59 PMif __name__ == "__main__":
extract_load_events ('20221030','20221030','<https://us1.api.clevertap.com/1/events.json','https://us1.api.clevertap.com/1/events.json?cursor=>')
Scott Chamberlain
12/13/2022, 12:01 AMprefect agent …
on a VM as a service, does anyone have a setup for injecting secrets into the supervisord.conf
file for the prefect url and api key? A solution with chamber
would be ideal, but curious about any solution that worksAndy Yeung
12/13/2022, 1:22 AMDeepanshu Aggarwal
12/13/2022, 9:12 AMAndy Yeung
12/13/2022, 9:35 AMasyncio.gather
, calling submit
is required for asynchronous execution on the ConcurrentTaskRunner
.”Mark Nuttall-Smith
12/13/2022, 9:56 AMdbt_cli_profile = DbtCliProfile(
name="dbt_dwh",
target=DEFAULT_BLOCK,
target_configs=PostgresTargetConfigs(
credentials=DatabaseCredentials.load(DEFAULT_BLOCK), schema="public"
),
)
dbt_cli_profile.save(name="dbt_dwh")
TypeError: Type is not JSON serializable: URL
Has anyone else seen this?redsquare
12/13/2022, 1:46 PM/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
Charalampos Malathounis
12/13/2022, 1:50 PMTask 'TaskPreprocessing': Exception encountered during task execution!
Traceback (most recent call last):
File "my-file.py", line 105, in run_job
job.RunNamespacedJob(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/kubernetes/job.py", line 730, in run
job = api_client_job.read_namespaced_job_status(
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1393, in read_namespaced_job_status
return self.read_namespaced_job_status_with_http_info(name, namespace, **kwargs) # noqa: E501
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1480, in read_namespaced_job_status_with_http_info
return self.api_client.call_api(
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 239, in GET
return self.request("GET", url,
File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 233, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Audit-Id': '<Audit-Id>', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '<X-Kubernetes-Pf-Flowschema-Uid>', 'X-Kubernetes-Pf-Prioritylevel-Uid': '<X-Kubernetes-Pf-Prioritylevel-Uid>', 'Date': 'Tue, 13 Dec 2022 11:02:40 GMT', 'Content-Length': '150'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"rpc error: code = Unavailable desc = transport is closing","code":500}
Miel Vander Sande
12/13/2022, 3:10 PMAshley Felber
12/13/2022, 3:19 PMFred Birchall
12/13/2022, 4:04 PM.submit
a task and use one of the parameters as the task name? I know we can currently do my_task.submit(name="My Task")
and this will show up as my_task-a1b2c3-0
in the UI, but what I would like to do is:
from prefect import flow, task
@flow(name="My Flow")
def my_flow():
my_task("Hello")
my_task(name="World")
@task(name_from_param="name")
def my_task(name):
pass
Then in the Prefect UI I will see:
Running Flow: My Flow
Running Task: Hello-a1b2c3-0
Running Task: World-a1b2c3-1
A bit of background for context, we run all of our tasks either using AWS Lambda or DBT. For Lambda, I’ve built a central function (which is decorated as a task) with a signature run_lambda(lambda_name, payload={})
, and most users are using it like run_lambda.submit("MyLambdaFunction")
but then in the logs all the task names are run_lambda-[ascii]-0
which is annoying when things go pop, as you have to click into the task run where I have added <http://logger.info|logger.info>("Lambda name: %s", lambda_name)
for traceability. For all of my personal Flows I have the following convention run_lambda.with_options(name="Lambda MyLambdaFunction").submit("MyLambdaFunction")
, which gives me the desired Prefect Task names in the UI, but getting everyone else to follow suit is another matter. I’ve attempted many solutions to dynamically name a task mainly focused on custom decorators, but they have all failed in one way or another… So I wanted to see if the Prefect community has any ideas or solutions! ThanksThomas Opsomer
12/13/2022, 5:19 PMSlackbot
12/13/2022, 5:35 PMJosh
12/13/2022, 5:35 PMJason
12/13/2022, 5:40 PMClaire Herdeman
12/13/2022, 5:46 PMAshley Felber
12/13/2022, 5:54 PMAshley Felber
12/13/2022, 7:21 PMSenthil M.K.
12/13/2022, 8:14 PMKalise Richmond
12/13/2022, 8:27 PMKalise Richmond
12/13/2022, 8:39 PMScott Chamberlain
12/13/2022, 9:40 PMcreate_flow_run
in Prefect 1.0 https://docs-v1.prefect.io/api/latest/tasks/prefect.html#create-flow-run ? Trying to translate to prefect 2, and this method create_flow_run
is somewhat confusing - a task that creates a flow within a flow?Parwez Noori
12/14/2022, 3:14 AMEdmund Tian
12/14/2022, 4:49 AMYoung Ho Shin
12/14/2022, 5:41 AMPREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
would do the job, but this seems to turn off the logs on the web UI as well as the console.Young Ho Shin
12/14/2022, 5:41 AMPREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
would do the job, but this seems to turn off the logs on the web UI as well as the console.Bianca Hoch
12/14/2022, 8:50 PM~/.prefect/
# Prefect logging config file.
#
# Any item in this file can be overridden with an environment variable:
# `PREFECT_LOGGING_[PATH]_[TO]_[KEY]=VALUE`
#
# Templated values can be used to insert values from the Prefect settings at runtime.
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s.%(msecs)03d | %(message)s"
datefmt: "%H:%M:%S"
standard:
(): prefect.logging.formatters.PrefectFormatter
format: "%(asctime)s.%(msecs)03d | %(levelname)-7s | %(name)s - %(message)s"
flow_run_fmt: "%(asctime)s.%(msecs)03d | %(levelname)-7s | Flow run %(flow_run_name)r - %(message)s"
task_run_fmt: "%(asctime)s.%(msecs)03d | %(levelname)-7s | Task run %(task_run_name)r - %(message)s"
datefmt: "%H:%M:%S"
json:
class: prefect.logging.formatters.JsonFormatter
format: "default"
# filters:
# Define any custom filters to drops records containing
# sensitive information
# my_filter:
# class: your_module.FilterClass
handlers:
# The handlers we define here will output all logs they receieve by default
# but we include the `level` so it can be overridden by environment
orion:
level: 0
class: prefect.logging.handlers.OrionHandler
loggers:
prefect:
level: "${PREFECT_LOGGING_LEVEL}"
prefect.extra:
level: "${PREFECT_LOGGING_LEVEL}"
handlers: [orion]
prefect.flow_runs:
level: NOTSET
handlers: [orion]
prefect.task_runs:
level: NOTSET
handlers: [orion]
prefect.orion:
level: "${PREFECT_LOGGING_SERVER_LEVEL}"
prefect.client:
level: "${PREFECT_LOGGING_LEVEL}"
prefect.infrastructure:
level: "${PREFECT_LOGGING_LEVEL}"
uvicorn:
level: "${PREFECT_LOGGING_SERVER_LEVEL}"
fastapi:
level: "${PREFECT_LOGGING_SERVER_LEVEL}"
import logging
from prefect import flow, task, get_run_logger
@task
def test_task():
logger = get_run_logger()
<http://logger.info|logger.info>("abc")
@flow
def test_logging():
test_task()
if __name__ == "__main__":
test_logging()
#Deployment commands to enter in the terminal
#prefect deployment build ./disable_console_log.py:test_logging -n test-logging -q logging-queue
#prefect deployment apply ./test_logging-deployment.yaml
Young Ho Shin
12/15/2022, 3:05 AMlogging.yml
does seem to work.
I was also able to get the behavior I want (i.e. INFO levels logs on orion, but ERROR logs on console) by changing to level: ERROR
for the console handler in the default yml file.
I am still a bit confused why my original try of PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
does not work.
From the [docs](https://docs.prefect.io/concepts/logs/#logging-configuration) it seems like it should. I've tried PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=ERROR
and PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL="ERROR"
, but this doesn't work either.