Thread
#prefect-community
    Joël Luijmes

    Joël Luijmes

    1 year ago
    Hi there again, got different error. I’m running dbt through prefect with custom ShellTask (thus not the existing DbtShellTask). I tried couple commands and it works fine. However now Im running
    dbt test
    which generates loads of logging output, and it seems something starts to break then:
    RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
    First I got hundreds okay log lines looking something like
    INFO - prefect.ShellTask | 13:26:55 | 692 of 1563 START test not_null_src_stitch_pipedrive_us_organizations_snapshots_pipedrive_us_organization_id [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 695 of 1563 START test not_null_src_student_to_group_links_student_id [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 701 of 1563 START test not_null_src_user_attributes_updated_at....... [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 688 of 1563 START test not_null_src_stitch_pipedrive_us_deals_snapshots_status [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 702 of 1563 START test not_null_src_user_attributes_user_attribute_id [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 698 of 1563 START test not_null_src_teacher_to_group_links_teacher_id [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 690 of 1563 START test not_null_src_stitch_pipedrive_us_organizations_snapshots__sdc_batched_at [RUN]
    INFO - prefect.ShellTask | 13:26:55 | 651 of 1563 PASS not_null_src_stitch_pipedrive_nl_organizations_snapshots_is_active [PASS in 2.78s]
    Before prefect or dbt can’t cope with it anylonger and the following stack is printed like thousands and thousands of times
    INFO - prefect.ShellTask | Traceback (most recent call last):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 216, in handle
    INFO - prefect.ShellTask |     self.emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/dbt/logger.py", line 458, in emit
    INFO - prefect.ShellTask |     super().emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 840, in emit
    INFO - prefect.ShellTask |     if self.should_rollover(record, len(msg)):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 818, in should_rollover
    INFO - prefect.ShellTask |     self.stream.seek(0, 2)
    INFO - prefect.ShellTask | RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
    INFO - prefect.ShellTask | Logged from file /usr/local/lib/python3.8/site-packages/logbook/handlers.py, line 563
    INFO - prefect.ShellTask | Traceback (most recent call last):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 216, in handle
    INFO - prefect.ShellTask |     self.emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/dbt/logger.py", line 458, in emit
    INFO - prefect.ShellTask |     super().emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 840, in emit
    INFO - prefect.ShellTask |     if self.should_rollover(record, len(msg)):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 818, in should_rollover
    INFO - prefect.ShellTask |     self.stream.seek(0, 2)
    INFO - prefect.ShellTask | RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
    INFO - prefect.ShellTask | Logged from file /usr/local/lib/python3.8/site-packages/logbook/handlers.py, line 563
    INFO - prefect.ShellTask | Traceback (most recent call last):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 216, in handle
    INFO - prefect.ShellTask |     self.emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/dbt/logger.py", line 458, in emit
    INFO - prefect.ShellTask |     super().emit(record)
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 840, in emit
    INFO - prefect.ShellTask |     if self.should_rollover(record, len(msg)):
    INFO - prefect.ShellTask |   File "/usr/local/lib/python3.8/site-packages/logbook/handlers.py", line 818, in should_rollover
    INFO - prefect.ShellTask |     self.stream.seek(0, 2)
    INFO - prefect.ShellTask | RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
    This is my custom dbt_task
    @task
    def dbt_task(dbt_command: str, target: str = "prod", dbt_env: Optional[dict] = None):
        if not isinstance(dbt_command, str) or not dbt_command.startswith("dbt "):
            raise ValueError(f"Expected command to start with 'dbt', got '{dbt_command}'")
    
        command = f"{dbt_command} --target {target}"
    
        if dbt_env is None:
            dbt_env = {}
    
        dbt_env.setdefault("DBT_PROFILES_DIR", "./")
        dbt_env.setdefault("BIGQUERY_SERVICE_ACCOUNT_KEYFILE_PATH", prefect.config["dbt_bigquery_keyfile_path"])
    
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"Executing '{command}' with env: {dbt_env}")
    
        dbt_env
        ShellTask(
            command=command,
            env=dbt_env,
            stream_output=True,
        ).run(helper_script="cd gynzy-dbt")
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey again, I would not know this off the top of my head but the custom task looks fine to me. I haven’t used dbt myself. Could you try
    dbt test
    with the standard task?
    Joël Luijmes

    Joël Luijmes

    1 year ago
    I don’t think dbt is the culprit here.
    RuntimeError: reentrant call inside <_io.BufferedWriter
    Seems to be something to do with the OS / python / streaming of stdout it self: https://stackoverflow.com/questions/45680378/how-to-explain-the-reentrant-runtimeerror-caused-by-printing-in-signal-handlers It also doesn’t occur when running locally on MacOS (at server it runs on kubernetes).
    (Using the existing DbtShellTask is bit trickey as it does more than I want)
    Im going to try disable stream_output, maybe it has something to do with that
    Aha, it appears to be a memory issue. The pod is OOMKilled, subsequently the shelltask doesn’t cope with this correctly and things escalate.
    I suppose the sigterm isn’t correctly passed to the child process, giving the weird reentrant call logs
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh I see. Thanks for getting back to me on this!
    In depth article on the topic, might make a PR to mimick this behavior.