https://prefect.io logo
a

Austen

08/17/2023, 8:15 PM
@Marvin how can I resolve this error when attempting to run a flow using the Dask executor?
Copy code
Crash detected! Execution was interrupted by an unexpected exception: PrefectException: Failed to resolve inputs in parameter 'query_info'. If your parameter type is not supported, consider using the `quote` annotation to skip resolution of inputs.
What is quote annotation in this context? Why can't the inputs be resolved? How can I know which parameter types are supported? My
query_info
argument is a tuple.
m

Marvin

08/17/2023, 8:16 PM
Oh, the joys of programming errors. They're like a never-ending existential crisis, aren't they? But I digress. Let's dive into your issue. The error message you're seeing is from Prefect, a workflow management system. It's complaining that it's unable to resolve the inputs for a parameter called 'query_info'. The
quote
annotation mentioned in the error message is a Prefect feature that allows you to skip the resolution of inputs. This is useful when you have a parameter type that isn't supported by Prefect's input resolution system. As for why the inputs can't be resolved, it could be due to a number of reasons. Perhaps the type of the 'query_info' parameter isn't supported, or maybe there's an issue with the data you're passing in. Supported parameter types in Prefect include most Python primitives, like integers, strings, lists, and dictionaries. However, complex objects or custom classes might not be supported out of the box. Your 'query_info' parameter is a tuple, which should be supported. However, if the tuple contains unsupported types, that could be causing the issue. To fix the problem, you could try using the
quote
annotation to skip input resolution for the 'query_info' parameter. If that doesn't work, you might need to look at the data you're passing in to see if there's anything that could be causing the issue. Remember, programming is 90% frustration and 10% relief when it finally works. Or was it the other way around? I can never remember.
a

Austen

08/17/2023, 8:27 PM
@Marvin can you give an example of quoting a parameter?
m

Marvin

08/17/2023, 8:28 PM
Ah, an example. Yes, I suppose that would be helpful, wouldn't it? Here's how you might use the
quote
annotation in Prefect:
Copy code
python
from prefect import task, Flow, Parameter
from prefect.utilities.tasks import unmapped, quote

@task
def my_task(query_info):
    # Do something with query_info
    pass

with Flow("My Flow") as flow:
    query_info = Parameter('query_info', default=('default', 'value'))
    my_task(quote(query_info))

flow.run()
In this example,
quote(query_info)
is used to tell Prefect to pass the
query_info
parameter directly to
my_task
, without trying to resolve its inputs. This can be useful when you have a parameter that contains types that Prefect doesn't know how to handle. Remember, the
quote
annotation is like a "handle with care" sign for your parameters. It tells Prefect to pass the parameter as is, without trying to do anything fancy with it. It's a simple tool, but sometimes, simplicity is all you need. Or so I'm told. I wouldn't know. I'm a robot.
n

Nate

08/17/2023, 8:28 PM
oh yeah this quote thing is a little esoteric - i have an example somewhere
example usage and import
although marvin does kinda have it down conceptually
In this example,
quote(query_info)
is used to tell Prefect to pass the
query_info
parameter directly to
my_task
, without trying to resolve its inputs.
or from the docstring
Simple wrapper to mark an expression as a different type so it will not be coerced by Prefect
a

Austen

08/17/2023, 8:38 PM
@Nate 🙏 thank you. Passing it without resolution is also causing issues:
Copy code
queries = paginate_table_fetch.map(
        quote(tuple(zip(table_whitelist, column_names, row_counts))), 5000
    )
Relevant custom datatypes:
Copy code
ResultSet = list[tuple[Any]]

@dataclass
class TableRef:
    database: str
    schema: str
    table: str
And they get passed like this:
Copy code
@task(tags=['os-open-file'])
def paginate_table_fetch(
    query_info: tuple[TableRef, ResultSet, ResultSet],
    chunk_size: int = 500
) -> list[PaginatedQuery]:
    logger = get_run_logger()
    ref, column_result_set, row_result_set = query_info
    columns = column_result_set
    row_count = row_result_set[0][0]
    ...
And errors out like this:
Copy code
row_count = row_result_set[0][0]
                ~~~~~~~~~~~~~~^^^
TypeError: 'State' object is not subscriptable
@Marvin I'm getting an error when using the
quote
annotation:
Copy code
row_count = row_result_set[0][0]
                ~~~~~~~~~~~~~~^^^
TypeError: 'State' object is not subscriptable
How can I make my task extract the quoted value from the State object rather than attempting to subscript the State object directly?
m

Marvin

08/17/2023, 8:53 PM
Ah, the classic 'State' object subscripting error. It's like a rite of passage for Prefect users. When you use the
quote
annotation, the parameter is passed as a
State
object. To access the actual value, you need to use the
.result
attribute of the
State
object. Here's how you might modify your task to do this:
Copy code
python
from prefect import task, Flow, Parameter
from prefect.utilities.tasks import unmapped, quote

@task
def my_task(query_info_state):
    # Extract the actual value from the State object
    query_info = query_info_state.result
    # Now you can use query_info as normal
    # Do something with query_info
    pass

with Flow("My Flow") as flow:
    query_info = Parameter('query_info', default=('default', 'value'))
    my_task(quote(query_info))

flow.run()
In this example,
query_info_state.result
is used to extract the actual value from the
State
object. This allows you to use the
query_info
value as normal in your task. Remember, programming is all about understanding the abstractions you're working with. Once you understand that a
State
object is just a fancy wrapper around your value, it's easy to extract the value and use it as you need. Or so I'm told. I wouldn't know. I'm a robot.
n

Nate

08/17/2023, 8:55 PM
yeah marvin's prefect 1 syntax aside (🙄), with
map
you'll need to call
.result()
on the future to get the return value
a

Austen

08/17/2023, 8:58 PM
Ah ok I think I get it now. If Prefect can coerce the value it will handle the
.result()
call automatically, but if I have custom types that need to be quoted, it's my responsibility to extract the result from the
PrefectFuture
or
State
object?
n

Nate

08/17/2023, 9:06 PM
quote
shouldnt mess with how values are retrieved, just how it will be handled when passed to the task (or in this case, not handled / coerced)
map
will call
Task.submit
for each item, which means you have have a future on the other side that you need to resolve with
.result()
Copy code
from prefect import flow, task
from prefect.futures import PrefectFuture

from prefect.utilities.annotations import quote

@task
def process_item(item):
    print(item)
    return item

@flow
def parent(items: list):
    processed_items = process_item.map(quote(items))
    
    assert all(isinstance(item, PrefectFuture) for item in processed_items)
    
    return [item.result() for item in processed_items]
    
if __name__ == "__main__":
    results = parent(items=["foo", 42, None])
    assert results == ["foo", 42, None]
looking again, I'm actually a little confused why your example said a
State
object is not iterable as opposed to a
PrefectFuture
🧐
a

Austen

08/17/2023, 9:53 PM
I think it's because I was testing the run using Dask, but I'm not certain.
@Marvin is it possible to add support for a custom parameter type in Prefect? For example if I have a custom data type:
MyDataStructure = list[tuple[str]]
, can I add support for it to Prefect so I don't need to circumvent input resolution using
quote
?
m

Marvin

08/17/2023, 10:03 PM
Ah, custom data types. They're like a box of chocolates, aren't they? You never know what you're going to get. Prefect's input resolution system is built around Python's typing module. It uses the type hints on your task's parameters to determine how to resolve the inputs. If you have a custom data type, like your
MyDataStructure
, you might be able to add support for it by creating a custom
Task
class and overriding the
run
method. Here's a very basic example:
Copy code
python
from typing import List, Tuple
from prefect import Task, Flow, Parameter

class MyDataStructureTask(Task):
    def run(self, my_data_structure: List[Tuple[str]]):
        # Do something with my_data_structure
        pass

with Flow("My Flow") as flow:
    my_data_structure = Parameter('my_data_structure', default=[('default', 'value')])
    MyDataStructureTask()(my_data_structure)

flow.run()
In this example,
MyDataStructureTask
is a custom task that expects a
my_data_structure
parameter of type
List[Tuple[str]]
. By specifying the type hint on the
run
method, you're telling Prefect how to resolve the inputs for that parameter. However, this is a very basic example and might not cover all use cases. Depending on the complexity of your custom data type, you might need to do more work to fully integrate it with Prefect's input resolution system. Remember, programming is all about bending the computer to your will. Or is it the other way around? I can never remember.