Tadej Svetina
01/06/2023, 2:41 PMNate
01/06/2023, 3:45 PMTadej Svetina
01/06/2023, 4:07 PMNate
01/06/2023, 4:45 PMfrom prefect import get_run_logger
def log_my_args(my_task):
def func(*args, **kwargs):
logger = get_run_logger()
<http://logger.info|logger.info>(f"{my_task.__name__!r} received {args=}, {kwargs=}")
return my_task(*args, **kwargs)
return func
which you could then use like
@log_my_args
@task
def something(x, y, z):
pass
@flow
def my_flow():
something(x=1, y=2, z='hi')
which would give this
10:39:31.663 | INFO | prefect.engine - Created flow run 'quantum-sawfish' for flow 'my-flow'
10:39:33.553 | INFO | Flow run 'quantum-sawfish' - 'something' received: args=(), kwargs={'x': 1, 'y': 2, 'z': 'hi'}
10:39:33.890 | INFO | Flow run 'quantum-sawfish' - Created task run 'something-d639da25-0' for task 'something'
10:39:33.892 | INFO | Flow run 'quantum-sawfish' - Executing 'something-d639da25-0' immediately...
10:39:34.781 | INFO | Task run 'something-d639da25-0' - Finished in state Completed()
10:39:35.014 | INFO | Flow run 'quantum-sawfish' - Finished in state Completed('All states completed.')
Out[33]: [Completed(message=None, type=COMPLETED, result=LiteralResult(type='literal', value=None))]
Tadej Svetina
01/08/2023, 9:25 PMsomething.submit()
on my task. If I switch the order of the decorators I can do this - but then, for some reason, all arguments are given as args
- which among other things means I lose the ability to log the arg names. So I am back to logging everything manually...Nate
01/08/2023, 10:52 PMimport asyncio
from functools import wraps
from prefect import flow, get_run_logger, task
from prefect.utilities.asyncutils import is_async_fn
from prefect.utilities.callables import get_call_parameters
def log_my_args(func):
if not is_async_fn(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger = get_run_logger()
<http://logger.info|logger.info>(f"received: {get_call_parameters(func, args, kwargs)}")
return func(*args, **kwargs)
return wrapper
elif is_async_fn(func):
@wraps(func)
async def wrapper(*args, **kwargs):
logger = get_run_logger()
<http://logger.info|logger.info>(f"received: {get_call_parameters(func, args, kwargs)}")
return await func(*args, **kwargs)
return wrapper
# ----- Sync example -----
print('\n\n\n' + '-'*20 + ' Sync example ' + '-'*20 + '\n\n\n')
@task
@log_my_args
def test(x, y, z):
return " ".join([x, y, z])
@flow(name="Sync example")
def test_flow():
result = test("sync", "task", "call")
future = test.submit("sync", "task", "submit")
test_flow()
# ----- Async example -----
print('\n\n\n' + '-'*20 + ' Async example ' + '-'*20 + '\n\n\n')
@task
@log_my_args
async def async_test(x, y, z):
return " ".join([x, y, z])
@flow(name="Async example")
async def async_test_flow():
result = await async_test("async", "task", "call")
future = await async_test.submit("async", "task", "submit")
asyncio.run(async_test_flow())