https://prefect.io logo
#prefect-community
Title
# prefect-community
p

Philip MacMenamin

04/14/2022, 6:50 PM
Cryptic errors in ShellTask - Hi, I'm running a flow with a number of shellTasks, and encountering errors like:
Copy code
2022-04-14 09:42:06-0600] ERROR - prefect.TaskRunner | Task 'ShellTask[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/blah/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/blah/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/blah/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/blah/python3.9/site-packages/prefect/tasks/shell.py", line 131, in run
    tmp.write(command.encode())
AttributeError: 'list' object has no attribute 'encode'
I have a couple of questions: • Is there a way to tag shell_tasks such that you can see some clue as to which one failed? • Can I get a better description of the failure At the moment I have
Copy code
shell_task = ShellTask(log_stderr=True, return_all=True, stream_output=True)
k

Kevin Kho

04/14/2022, 6:52 PM
I think that is the most you can do do get the traceback. You could template the name of the task based on the input though so that you can more easily identify which one failed.
I think something empty might be being returned by one of the mapped tasks that causes this. Are you on Dask also?
p

Philip MacMenamin

04/14/2022, 6:55 PM
At the moment I've defined one
shell_task
and I'm throwing different commands at it. Ideally I'd like to keep that, and just give it diff labels.
switched off Dask ATM, because it's a nightmare to try to debug
k

Kevin Kho

04/14/2022, 6:57 PM
Yeah just change the run name specifically
p

Philip MacMenamin

04/14/2022, 7:04 PM
I don't think I want to vary the names of the runs, I would have thought I'd just want to vary the names of the shell_job
k

Kevin Kho

04/14/2022, 7:07 PM
Am confused because if the shell_job maps over 3-4 items, you want that run name to change right? That is the task run name.
p

Philip MacMenamin

04/14/2022, 10:31 PM
ah, yes, I misread that. So how would you do that? I guess subclass the ShellTask object, and change the run method to accept
task_run_name
or something along those lines?
k

Kevin Kho

04/14/2022, 10:45 PM
You can pass the
task_run_name
to the ShellTask also. The task library should take the standard task kwargs otherwise it’s a bug
ShellTask(…,task_run_name=….)
p

Philip MacMenamin

04/15/2022, 1:56 PM
I can create the
ShellTask
object with the
task_run_name={val}
but then I can't load anything into
var
because
run()
doesn't take kwargs. Or no?
As in I want to do something like:
Copy code
task = ShellTask(task_run_name="{val}")
with Flow("My Flow") as f:
      contents = task(command='ls', task_run_name='list task')
      things = task(command='other command', task_run_name='other task')
k

Kevin Kho

04/15/2022, 1:58 PM
You ShellTask would need to take in a keyword named
val
there. But you can make your own copy of the ShellTask that just takes the
val
and doesn’t do anything
p

Philip MacMenamin

04/15/2022, 2:03 PM
So, you're saying I'd need to do something like:
Copy code
task_list = ShellTask(task_run_name="does listing")
task_thing = ShellTask(task_run_name="does thing")
? As in create a shell task object per type of command? The idea was I didn't want to have to create another ShellTask object per command, I just wanted to give them diff labels / some way I could ID them.
k

Kevin Kho

04/15/2022, 2:05 PM
No no let me make an example
Maybe something like this. You’d need a real flow run to test it though
Copy code
from prefect.tasks.shell import ShellTask
from typing import Any, List, Union, Optional
import prefect
from prefect.utilities.tasks import defaults_from_attrs

def echo(command):
    return f"echo {command}"

class MyShellTask(ShellTask):

    @defaults_from_attrs("command", "env", "helper_script")
    def run(
        self, command: str = None, env: dict = None, helper_script: str = None, new_arg: str=None
    ) -> Optional[Union[str, List[str]]]:

        new_command = echo(command) + new_arg
        
        super().run(command=new_command, env=env, helper_script=helper_script)

MyShellTask(log_stdout=True, stream_output=True, task_run_name="{new_arg}").run(command="1", new_arg="test")
p

Philip MacMenamin

04/15/2022, 2:17 PM
ok, right. Subclass the ShellTask class, and add to the run method. Nice one! Thanks!