Andor Tóth
03/08/2021, 5:11 PMTrevor Kramer
03/08/2021, 5:29 PMprefect.context.get('flow_run_name')
How do I get the flow run name (like zealous-dormouse) from inside the flow?kevin
03/08/2021, 5:47 PMSlackTask()
be configured to send messages to different channels depending on what flow it is being run in?Luis Gallegos
03/08/2021, 6:44 PMjames.lamb
03/08/2021, 8:59 PMflow.serialize()
?
Will share more details in thread, thanks!Soren Daugaard
03/08/2021, 9:25 PMvalidation_operators
in my great_expectations.yml
file so I call the task as follows:
validation_task = RunGreatExpectationsValidation()
validation_results = validation_task(
batch_kwargs=validation.get_batch_kwargs(records, 'pandas_s3'), expectation_suite_name=GE_EXPECTATION_SUITE_NAME, validation_operator=None
)
However when I execute my flow I get the following error:
Unexpected error: DataContextError('No validation operator `None` was found in your project. Please verify this in your great_expectations.yml')
Traceback (most recent call last):
File "/Users/soren.daugaard/Code/data-pipelines/.direnv/python-3.8.6/lib/python3.8/site-packages/great_expectations/data_context/data_context.py", line 1272, in run_validation_operator
validation_operator = self.validation_operators[validation_operator_name]
KeyError: None
According to the documentation that parameter should be optional, but I am not able to figure out how to set it to nothing. I tried None
and the empty string but with similar errors. Any pointers would be much appreciated.MasatoShima
03/09/2021, 7:56 AMUncaught (in promise) Error: passed invalid or empty tenant object
at l.setDefaultTenant (index.js:51)
at vuex.esm.js:840
at vuex.esm.js:462
at Array.forEach (<anonymous>)
at vuex.esm.js:461
at l._withCommit (vuex.esm.js:620)
at l.commit (vuex.esm.js:460)
at l.commit (vuex.esm.js:405)
at tenantNavGuard.js:12
at u (runtime.js:63)
Does anyone have any idea how to solve this problem ?
By the way, the architecture on AWS is as follows.
client - ALB - ECS (Fargate)
Till
03/09/2021, 9:41 AMTill
03/09/2021, 9:43 AMVitaly Shulgin
03/09/2021, 10:07 AMLaura Vaida
03/09/2021, 10:38 AMflow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",
python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake", "datetime"], secrets=["GCP_CREDENTIALS"])
above doesn't properly work
thanks!Trevor Kramer
03/09/2021, 1:05 PMFlorian Kühnlenz
03/09/2021, 2:45 PMKonstantinos
03/09/2021, 2:59 PMAdam Brusselback
03/09/2021, 3:45 PMMitchell Bregman
03/09/2021, 4:11 PMhaf
03/09/2021, 4:39 PMprefect register flow --file ... --project ...
? Is the python code in the file executed?
Similarly, if I am using Prefect cloud, and a k8s agent that spawns jobs; what env vars are available when:
1. The job is starts the python file (thereby declaring the flow / DAG)
2. The flow is runCharles Liu
03/09/2021, 5:33 PMLaura Vaida
03/09/2021, 6:03 PMhaf
03/09/2021, 8:19 PMhaf
03/09/2021, 8:21 PMParameter
to a value in a def
?Brian Mesick
03/09/2021, 8:30 PMhaf
03/09/2021, 9:04 PMhaf
03/09/2021, 9:57 PMstart
command that also takes --job-template <path>
— how do I configure my own job template when using prefect agent kubernetes install
?Maria
03/09/2021, 10:42 PMAdam Brusselback
03/09/2021, 11:11 PMAdam Brusselback
03/09/2021, 11:12 PMTrevor Kramer
03/10/2021, 12:15 AMn_estimators = Parameter('n_estimators', default=500)
BatchSubmit()(..., batch_kwargs={
'parameters': {'blah', n_estimators}})
The issue is that batch requires the values for parameters be strings but I want the input Parameters to be integers. Is there a way to convert a Parameter from one type to another? Should I have a task that takes the parameter and returns the string version?Hui Zheng
03/10/2021, 12:29 AMretries=3
for a task_A, we want to retry task_A for up to 3 times when it fails due to some network connection issue or intermittent failures. Meanwhile, however, we have the logic in the task_A
that explicitly and intentionally raise FAIL_A
signal for downstream tasks. That FAIL_A
is not meant to be re-tried by task_A
. When task_A
encounters this intentional FAIL_A
, it shall skip retries and return it directly. How could I do that?Carl
03/10/2021, 4:20 AMwith Flow('MyFlow') as flow:
do_thing = Parameter('do_think', default=False, required=False)
data = get_data()
with case(do_thing, True):
data = do_thing_task(data)
data = do_another_thing(data) # This gets skipped when do_thing = False