Rafael Sá
03/15/2022, 6:03 PMSarah Floris
03/15/2022, 6:27 PMRafael Sá
03/15/2022, 8:16 PMSarah Floris
03/15/2022, 8:38 PMSystem Version check: OK
/opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 130, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named "Module"
Brad
03/15/2022, 9:04 PMSet default storage to 'local'.
Traceback (most recent call last):
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 58, in wrapper
return fn(*args, **kwargs)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
return await func(*args)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/storage.py", line 136, in create
exit_with_success(f"Set default storage to {name!r}.")
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 193, in exit_with_success
raise typer.Exit(0)
click.exceptions.Exit: 0
An exception occurred.
Darshan
03/15/2022, 9:33 PMBrad
03/15/2022, 10:11 PMNitin Bansal
03/16/2022, 2:34 AMdavzucky
03/16/2022, 6:21 AMSerge Tarkovski
03/16/2022, 8:45 AMMuddassir Shaikh
03/16/2022, 9:16 AMJunhyun Park
03/16/2022, 10:53 AMVadym Dytyniak
03/16/2022, 11:26 AMChris Reuter
03/16/2022, 12:26 PMNoam polak
03/16/2022, 12:35 PMAzer Rustamov
03/16/2022, 12:36 PMBrett Naul
03/16/2022, 2:11 PMprefect.engine.signals.FAIL: Flow finished in state <Failed: "Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("\'Comment\' object has no attribute \'_end\'")')">
the flows are stored in GCS, python is 3.9.10 both locally (MacOS) and remote (docker+k8s). same prefect+cloudpickle versions. I've had the same thing happen before when python versions don't match but bumping has always fixed it, not really sure what else to look into now. any suggestions...?Adam Roderick
03/16/2022, 2:25 PMBradley Hurley
03/16/2022, 2:43 PMList
and Dict
from the visual schematic displayed in the UI?Chris Reuter
03/16/2022, 3:00 PMJuan David Barreto
03/16/2022, 3:19 PMArun Giridharan
03/16/2022, 3:48 PMRajan Subramanian
03/16/2022, 6:36 PMChris Reuter
03/16/2022, 6:48 PME Li
03/16/2022, 8:13 PMdef get_target_name(x, **kwargs):
…
return target_name
def get_task_name(x, **kwargs):
…
return task_name
@task(task_run_name=get_task_name,target=get_target_name,checkpoint=True,result=LocalResult())
def task_a(x, y, z):
…
return …
Jared Robbins
03/16/2022, 8:47 PMDarshan
03/16/2022, 10:13 PMdherincx
03/16/2022, 10:21 PMShaoyi Zhang
03/16/2022, 11:56 PMdherincx
03/17/2022, 2:03 AMexecute_ddls
that only executes if new_ddl_exist
is True. When execute_ddls
is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried skip_on_upstream_skip = False
on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
with Flow('bi_test_flow') as flow:
# new DDLs (if any)
ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))
# # # execute new DDLs ONLY if they exist
new_ddl_exist = do_new_ddl_scripts_exist(ddls)
with case(new_ddl_exist, True):
execute_ddls = execute_sql(ddls)
dbt = dbt(
command="dbt run -m anlyz_base.views",
upstream_tasks=[execute_ddls],
)
dbt_operations = dbt(
command="dbt run-operation materialize_views"
)
dherincx
03/17/2022, 2:03 AMexecute_ddls
that only executes if new_ddl_exist
is True. When execute_ddls
is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried skip_on_upstream_skip = False
on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
with Flow('bi_test_flow') as flow:
# new DDLs (if any)
ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))
# # # execute new DDLs ONLY if they exist
new_ddl_exist = do_new_ddl_scripts_exist(ddls)
with case(new_ddl_exist, True):
execute_ddls = execute_sql(ddls)
dbt = dbt(
command="dbt run -m anlyz_base.views",
upstream_tasks=[execute_ddls],
)
dbt_operations = dbt(
command="dbt run-operation materialize_views"
)
Kevin Kho
03/17/2022, 2:21 AMexecute_ddls
raises SKIP and then dbt raises SKIP. To avoid that, set dbt’s trigger to always_run. You can find more on triggers heredherincx
03/17/2022, 2:43 AMunexpected keyword argument: trigger
. I understand the propogration, but not sure why the trigger parameter isn't recognized here, despite inheriting from the Task class
dbt_operations = dbt(
command="dbt run-operation materialize_views",
trigger=all_successful
)
Kevin Kho
03/17/2022, 2:53 AMdherincx
03/17/2022, 2:55 AM