Robert Bastian
02/24/2021, 11:11 PM@task
def get_url(run_id, hook):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("RUN ID: %s", run_id)
url = hook.get_run_page_url(run_id=run_id)
return url
SubmitRun = DatabricksSubmitRun()
with Flow("test_databricks", storage=STORAGE, run_config=RUN_CONFIG) as flow:
conn = PrefectSecret('DATABRICKS_CONNECTION_STRING')
json = get_job_config()
run_id = SubmitRun(json=json, databricks_conn_secret=conn)
hook = SubmitRun.get_hook()
url = get_url(run_id, hook)
Here is the exception:
ERROR:prefect.TaskRunner:Unexpected error: TypeError("argument of type 'NoneType' is not iterable")
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/rbastian/enverus/RAI/prefect/flows/test-databricks.py", line 32, in get_url
url = hook.get_run_page_url(run_id=run_id)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 248, in get_run_page_url
response = self._do_api_call(GET_RUN_ENDPOINT, json)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/tasks/databricks/databricks_hook.py", line 148, in _do_api_call
if "token" in self.databricks_conn:
Thanks in advance!Dhiraj Golhar
02/25/2021, 6:05 AMJoël Luijmes
02/25/2021, 8:33 AM{
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "Operation cannot be fulfilled on resourcequotas \"gke-resource-quotas\": the object has been modified; please apply your changes to the latest version and try again",
"reason": "Conflict",
"details": {
"name": "gke-resource-quotas",
"kind": "resourcequotas"
},
"code": 409
}
Googling didn’t yield much result but it seems like an internal kubernetes issue.
So to fix this, I think there are two ways:
1. Use prefect retry mechanism
2. Modify prefect tasks to retry on this error (willing to contribute myself)
3. Modify my code to retry
My question: What would be the best approach here?
With 1) it still might fail because on retry, the same burst exists when creating kubernetes objects (or can I perform random delay?) + in a resource manager I may create multiple resources -> retry does not exist (AFAIK), and if it does how would I track which resources exist.
With 2) don’t know if this is right methodology, can imagine retrying in task lilbs is anti-pattern
With 3) no downsides except I have to impleement this ev3rywereMichael Hadorn
02/25/2021, 9:40 AMCarl
02/25/2021, 10:15 AMupstream_task
will fix this because the load(clean)
task needs to run after the DataFrame()
bit.
clean = impute.map(data_cols, replacement_dict=unmapped({np.nan: 0})) # task
clean = DataFrame(clean, column_names) # function, not a task
load(clean) # task
Adam
02/25/2021, 12:07 PMregister
with an idempotency_key
set, the flow is uploaded to storage despite it not having changed. Is this intended? Command used in the commentsNikhil Akki
02/25/2021, 1:46 PMAndor Tóth
02/25/2021, 2:41 PMAndor Tóth
02/25/2021, 2:43 PMdate
becomes a string
Like this
@task(task_run_name='{name}-{date:%FT%T}')
def say_hello(name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, %s!", name)
<http://logger.info|logger.info>('Flow run name: %s', prefect.context.flow_run_name)
jorwoods
02/25/2021, 3:37 PMPedro Machado
02/25/2021, 4:56 PMAndor Tóth
02/25/2021, 5:12 PMAndor Tóth
02/25/2021, 5:30 PMAndor Tóth
02/25/2021, 5:33 PMPeyton Runyan
02/25/2021, 5:47 PMValueError: Could not infer an active Flow context.
Details in thread
with Flow("Batch update airtable flow") as f:
proj, stry = get_selected_bases()
cmdf = get_communities_2017()
scrubbed_proj, scrubbed_stry = scrub_dataframes(proj, stry, cmdf)
debug_task(scrubbed_proj, scrubbed_stry)
Kieran
02/25/2021, 6:13 PMBeginning health checks...
System Version check: OK
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'flows.tasks'
Registering the same flow locally is passing my check.
from prefect.utilities.debug import is_serializable
if is_serializable(flow): ...
My PYTHONPATH is the root of the directory and when SSH'ing into the container I am able to import the "missing" module...
Any pointers?Matheus Calvelli
02/25/2021, 7:22 PMDave
02/25/2021, 7:31 PMPeyton Runyan
02/25/2021, 7:34 PMwith Flow as f:
x, y = func_1():
no_return_func():
res = final_func(x,y)
Using the example above, how do I signal for final_func
to wait on no_return_func
?
Do I just add a third argument to the function and have no_return_func
return True
? Or is there a cleaner way?Alex Welch
02/25/2021, 10:03 PMSean Talia
02/25/2021, 10:26 PMMaria
02/25/2021, 11:59 PMfrom prefect import Flow
from prefect.tasks.shell import ShellTask
task = ShellTask(helper_script="cd ~")
with Flow("My Flow") as f:
contents = task(command='ls')
out = f.run()
[2021-02-26 10:32:28+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
[2021-02-26 10:32:28+1100] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-02-26 10:32:28+1100] ERROR - prefect.TaskRunner | Unexpected error: FileNotFoundError(2, 'The system cannot find the file specified', None, 2, None)
Traceback (most recent call last):
.....
....
python\python38-32\lib\subprocess.py", line 1307, in _execute_child
hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
FileNotFoundError: [WinError 2] The system cannot find the file specified
I do have bash installed, also specifying path doesn't help task = ShellTask(shell="C:\Windows\System32\bash.exe", helper_script="cd ~")
matta
02/26/2021, 1:24 AMUnexpected error: ClientError('400 Client Error: Bad Request for url: <http://prefect-apollo:4200/graphql/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "user" on type "Query".\n\nThe GraphQL query was:\n\n query {\n user {\n default_membership {\n tenant {\n slug\n }\n }\n }\n }\n\nThe passed variables were:\n\n null\n',)
Josh
02/26/2021, 2:18 AMcase
with mypy
? When I try to add a case to the flow, mypy throws Module not callable
with Flow("flow") as flow:
case(task_result, True):
other_task()
Alex Welch
02/26/2021, 4:30 AMKamil Okáč
02/26/2021, 12:34 PMLaura Vaida
02/26/2021, 4:36 PMDocker(
env_vars={
# append modules directory to PYTHONPATH
"PYTHONPATH": "C://Users/laura.vaida.000/anaconda3/envs/prefect/Lib/site-packages"
},
stored_as_script=True
)
Brian Mesick
02/26/2021, 5:29 PMContainer 'flow' state: terminated
Exit Code:: 2
Reason: Error
The job shows up submitted, then ~30 seconds later we get 4 of those errors. Other nearly identical flows are working. The pods don’t hang around long enough to get logs (if they even start). Has anyone else run into this?Alex Welch
02/26/2021, 7:30 PMKeyError
. The second is that it is showing me the correct key and yet stating that it was not found. I have the secrets defined in the Prefect UI
Failed to load and execute Flow's environment: KeyError('The secret <secret> was not found. Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
I followed the docs to establish the secret and am using the same variable name in both the UI and the code. GH_TOKEN
GH_TOKEN = Secret("GH_TOKEN").get()
STORAGE = GitHub(
repo="<my_repo_name",
path=f"flows/my_flow.py",
access_token_secret=GH_TOKEN
)
S K
02/26/2021, 8:02 PM@task()
def mainprocess(a):
global df_single_record
try:
for indx in df_data_extracted.index + 1:
df_single_record = df_data_extracted.iloc[indx - 1:indx]
converttojson(df_single_record)
postjsontoqapi(df_json_data)
except Exception as e:
logger = get_logger()
logger.error(str(get_pst_time())
+ '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
+ str(e))
raise SystemExit(0)
S K
02/26/2021, 8:02 PM@task()
def mainprocess(a):
global df_single_record
try:
for indx in df_data_extracted.index + 1:
df_single_record = df_data_extracted.iloc[indx - 1:indx]
converttojson(df_single_record)
postjsontoqapi(df_json_data)
except Exception as e:
logger = get_logger()
logger.error(str(get_pst_time())
+ '==========ERROR IN mainprocess() WHILE CONVERTING TO JSON/POSTING TO Q-API: '
+ str(e))
raise SystemExit(0)
Michael Adkins
02/26/2021, 8:12 PMSystemExit
is a reserved exception type https://docs.python.org/3/library/exceptions.html#SystemExitS K
02/26/2021, 8:28 PMMichael Adkins
02/26/2021, 8:29 PMraise FAIL()
) https://docs.prefect.io/core/concepts/execution.html#state-signalsS K
02/26/2021, 9:57 PMMichael Adkins
02/26/2021, 9:58 PMS K
02/26/2021, 10:01 PMwith Flow('flow_name', storage=Local()) as flow:
check_if_flow_is_running()
getdata = readssmandextractdata()
vmainprocess = mainprocess(getdata)
updatecontroltable(vmainprocess)
flow.run()
@Michael Adkins This is how I am executing the taskscheckflow = check_if_flow_is_running()
getdata = readssmandextractdata(checkflow)
mainlogic = mainprocess(getdata)
updatecontroltable(mainlogic)
@Michael Adkins thx much, able to stop the flow by doing as above...Michael Adkins
02/26/2021, 10:32 PM