https://prefect.io logo
Title
c

Conrad Dobberstein

02/28/2023, 10:38 AM
Hello Everyone, I have a question regarding the behaviour of flow call parameters. In the example below, the flow
my_flow
takes an iterable of strings as the first parameter. When calling the flow with an iterator, its first element is missing inside the flow run. This also applies when calling the flow with a list and enabling
validate_parameters
. Is this a bug, or am I missing something obvious here? I'm on the latest version of Prefect, i.e. Prefect 2.8.3.
from collections.abc import Iterable

import prefect


@prefect.flow
def my_flow(texts: Iterable[str]) -> None:
    print(f"{type(texts)=}")
    print(f"{list(texts)=}")


if __name__ == "__main__":
    # type(texts)=<class 'list_iterator'>
    # list(texts)=['World', '!']
    my_flow.with_options(validate_parameters=True)(["Hello", "World", "!"])

    # type(texts)=<class 'list_iterator'>
    # list(texts)=['World', '!']
    my_flow.with_options(validate_parameters=False)(iter(["Hello", "World", "!"]))

    # type(texts)=<class 'list'>
    # list(texts)=['Hello', 'World', '!']
    my_flow.with_options(validate_parameters=False)(["Hello", "World", "!"])
m

Mason Menges

02/28/2023, 7:05 PM
Hey @Conrad Dobberstein I think after playing around with this something is occurring when we're attempting to serialize the parameter inputs, the list_iterator isn't serializable so we may be missing something there. I don't know for sure if this is a bug but the behavior is definitely reproducible if your open to it I think it would be worth opening an issue for this with the example you have so it can be discussed there.
For some extra context by default validate_parameters is True and is validated through pydantic, i.e. you don't need to specify it as true in the first flow run, when we validate the parameters we also attempt to coerce the parameter to the specified type the second flow run you have on there is essentially mimicking that behavior thus why the outputs are the same. Stepping through the process it seems like the first argument is lost when we attempt to serialize the parameters but I'm not personally sure if that's actually a bug 😅
c

Conrad Dobberstein

03/01/2023, 11:44 AM
Thanks for the quick answer and the useful insights. I have also noticed that for generators the flow receives an empty generator. I'm open to migrating this into an issue. Are you opening one, or should I? Complete Example:
import prefect


@prefect.flow
def my_flow(numbers):
    print(f"{type(numbers)=}")
    print(f"{list(numbers)=}")


if __name__ == "__main__":
    # type(numbers)=<class 'generator'>
    # list(numbers)=[]
    my_flow((x for x in range(3)))

    # type(numbers)=<class 'list_iterator'>
    # list(numbers)=[1, 2]
    my_flow(iter([0, 1, 2]))