Nitin Bansal
03/16/2022, 2:34 AMdavzucky
03/16/2022, 6:21 AMSerge Tarkovski
03/16/2022, 8:45 AMMuddassir Shaikh
03/16/2022, 9:16 AMJunhyun Park
03/16/2022, 10:53 AMVadym Dytyniak
03/16/2022, 11:26 AMChris Reuter
03/16/2022, 12:26 PMNoam polak
03/16/2022, 12:35 PMAzer Rustamov
03/16/2022, 12:36 PMBrett Naul
03/16/2022, 2:11 PMprefect.engine.signals.FAIL: Flow finished in state <Failed: "Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("\'Comment\' object has no attribute \'_end\'")')">
the flows are stored in GCS, python is 3.9.10 both locally (MacOS) and remote (docker+k8s). same prefect+cloudpickle versions. I've had the same thing happen before when python versions don't match but bumping has always fixed it, not really sure what else to look into now. any suggestions...?Adam Roderick
03/16/2022, 2:25 PMBradley Hurley
03/16/2022, 2:43 PMList
and Dict
from the visual schematic displayed in the UI?Chris Reuter
03/16/2022, 3:00 PMJuan David Barreto
03/16/2022, 3:19 PMArun Giridharan
03/16/2022, 3:48 PMRajan Subramanian
03/16/2022, 6:36 PMChris Reuter
03/16/2022, 6:48 PME Li
03/16/2022, 8:13 PMdef get_target_name(x, **kwargs):
…
return target_name
def get_task_name(x, **kwargs):
…
return task_name
@task(task_run_name=get_task_name,target=get_target_name,checkpoint=True,result=LocalResult())
def task_a(x, y, z):
…
return …
Jared Robbins
03/16/2022, 8:47 PMDarshan
03/16/2022, 10:13 PMdherincx
03/16/2022, 10:21 PMShaoyi Zhang
03/16/2022, 11:56 PMdherincx
03/17/2022, 2:03 AMexecute_ddls
that only executes if new_ddl_exist
is True. When execute_ddls
is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried skip_on_upstream_skip = False
on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
with Flow('bi_test_flow') as flow:
# new DDLs (if any)
ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))
# # # execute new DDLs ONLY if they exist
new_ddl_exist = do_new_ddl_scripts_exist(ddls)
with case(new_ddl_exist, True):
execute_ddls = execute_sql(ddls)
dbt = dbt(
command="dbt run -m anlyz_base.views",
upstream_tasks=[execute_ddls],
)
dbt_operations = dbt(
command="dbt run-operation materialize_views"
)
Madhup Sukoon
03/17/2022, 8:06 AMfrom prefect.utilities.notifications import slack_notifier
.
.
flow.state_handlers = [slack_notifier()]
I have added the prefect app and also added the webhook URL to the SLACK_WEBHOOK_URL
secret. Any pointers on why this is happening / how to debug?Ievgenii Martynenko
03/17/2022, 10:54 AMlogger = logging.getLogger(__ _name ___)
and I want log messages from that module to appear in Prefect.
I registered extra logger, and log messages defined in Task (AWSPOC) appear, but not the log messages defined inside 'bear'.
My code for "prefect_flow.py" :
import logging
format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
logger = logging.getLogger('magic_logger')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
class AWSPOC(Task):
def __init__(self, name: str, config_file: str):
self.config_file = config_file
super().__init__(name=name)
def run(self):
configuration = load_file(self.config_file)
loader = Loader(configuration=configuration)
<http://logger.info|logger.info>("This message appears in Prefect log output")
loader.run()
executor = LocalDaskExecutor()
host_config = {...}
storage = S3(...)
env = {
"PREFECT__LOGGING__EXTRA_LOGGERS": "['magic_logger']"
}
docker_run_config = DockerRun(image=..., host_config=host_config, env=env)
with Flow(name="AWS POC", executor=executor, storage=storage, run_config=docker_run_config) as flow:
task = AWSPOC(name='...', config_file='...')
task()
flow.register(project_name='AWS POC')
Where I'm wrong? The only idea I have is that 'bear' should have it's own named logger.Vadym Dytyniak
03/17/2022, 11:37 AMDaniel Nilsen
03/17/2022, 12:00 PMVishnu
03/17/2022, 3:03 PMMarc Lipoff
03/17/2022, 3:58 PMwhile has_next_url: ...
) . Is there a better "prefect" way to do this? One of the downsides of my basic way is that, if there is an error along the way (let's say at record 199,999 of 200,000), I lose what I had.Kevin Mullins
03/17/2022, 4:43 PMLatestOnlyOperator
or using depends_on_past
. In Oozie (I know, not my favorite orchestrator), this could be achieved by setting a concurrency
to 1
and execution
to FIFO
or ONLYLAST
.
Are there any plans in Orion to provide similar functionality so users don’t have to implement custom solutions to this common pattern?