Hi, for some reason I am not seeing task inputs in...
# prefect-community
t
Hi, for some reason I am not seeing task inputs in the UI - see the images below. I am running the latest version of prefect (2.7.6)
n
Hi @Tadej Svetina - apologies that this is unclear, but this page is meant to track relationships between tasks we're currently working on clarifying the purpose of this page! (also see link for an example)
t
Thanks for the clarification. I think it would be useful if that page showed all input arguments - right now I log them manually at the start of the task, which feels a bit redundant.
n
I can see why that would be useful for you - thanks for the suggestion! It's a bit tricky, since many folks do not want to / cannot (for compliance reasons) share the data that passes through their flow with Prefect Cloud. Here's some (slightly dated) background on our thinking in that regard if you wanted to, you could create a simple decorator that logs your task args automatically (will be visible in prefect cloud flow run logs, so you should only do this if you're comfortable exposing your task run input values to Prefect Cloud)
Copy code
from prefect import get_run_logger

 def log_my_args(my_task):
     def func(*args, **kwargs):
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"{my_task.__name__!r} received {args=}, {kwargs=}")
        return my_task(*args, **kwargs)
     return func
which you could then use like
Copy code
@log_my_args
@task
def something(x, y, z):
   pass

@flow
def my_flow():
   something(x=1, y=2, z='hi')
which would give this
Copy code
10:39:31.663 | INFO    | prefect.engine - Created flow run 'quantum-sawfish' for flow 'my-flow'
10:39:33.553 | INFO    | Flow run 'quantum-sawfish' - 'something' received: args=(), kwargs={'x': 1, 'y': 2, 'z': 'hi'}
10:39:33.890 | INFO    | Flow run 'quantum-sawfish' - Created task run 'something-d639da25-0' for task 'something'
10:39:33.892 | INFO    | Flow run 'quantum-sawfish' - Executing 'something-d639da25-0' immediately...
10:39:34.781 | INFO    | Task run 'something-d639da25-0' - Finished in state Completed()
10:39:35.014 | INFO    | Flow run 'quantum-sawfish' - Finished in state Completed('All states completed.')
Out[33]: [Completed(message=None, type=COMPLETED, result=LiteralResult(type='literal', value=None))]
t
Hi @Nate, thanks for the idea. Unfortunately, I can't seem to get it to work like I want. Using your solution, I am unable to call
something.submit()
on my task. If I switch the order of the decorators I can do this - but then, for some reason, all arguments are given as
args
- which among other things means I lose the ability to log the arg names. So I am back to logging everything manually...
n
Hi @Tadej Svetina - ahh that's true how about this? This should work for both sync and async tasks, whether you call directly or use submit
Copy code
import asyncio
from functools import wraps
from prefect import flow, get_run_logger, task
from prefect.utilities.asyncutils import is_async_fn
from prefect.utilities.callables import get_call_parameters

def log_my_args(func):
    
    if not is_async_fn(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger = get_run_logger()
            <http://logger.info|logger.info>(f"received: {get_call_parameters(func, args, kwargs)}")
            return func(*args, **kwargs)
        return wrapper
    elif is_async_fn(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            logger = get_run_logger()
            <http://logger.info|logger.info>(f"received: {get_call_parameters(func, args, kwargs)}")
            return await func(*args, **kwargs)
        return wrapper


# ----- Sync example -----
print('\n\n\n' + '-'*20 + ' Sync example ' + '-'*20 + '\n\n\n')


@task
@log_my_args
def test(x, y, z):
    return " ".join([x, y, z])

@flow(name="Sync example")
def test_flow():
    result = test("sync", "task", "call")
    
    future = test.submit("sync", "task", "submit")
        
test_flow()

# ----- Async example -----
print('\n\n\n' + '-'*20 + ' Async example ' + '-'*20 + '\n\n\n')

@task
@log_my_args
async def async_test(x, y, z):
    return " ".join([x, y, z])

@flow(name="Async example")
async def async_test_flow():
    result = await async_test("async", "task", "call")
    
    future = await async_test.submit("async", "task", "submit")
    
asyncio.run(async_test_flow())