<@ULVA73B9P> is it possible to define a flow that ...
# ask-marvin
p
@Marvin is it possible to define a flow that takes a list as an input? I have a flow defined like
Copy code
@flow
def a_flow(
    arg1: list[str]
) -> bool:
    # etc.
    pass
and I get this error:
Copy code
TypeError: issubclass() arg 1 must be a class
and it's complaining because the type annotation
list[str]
isn't a class
m
In Prefect, flows are like functions. They can take inputs, perform work, and return an output. You can turn any function into a Prefect flow by adding the
@flow
decorator. When a function becomes a flow, its behavior changes, giving it several advantages, including the ability to have persistent state and type-validated input arguments. However, the error you're seeing,
TypeError: issubclass() arg 1 must be a class
, is not directly related to Prefect. It's a Python error that occurs when the first argument to the
issubclass()
function is not a class. It's possible that this error is being raised because of how you're trying to use a list as an argument in your flow definition. Here's an example of how you can define a flow that takes a list as an argument:
Copy code
from prefect import Flow, Parameter

def a_flow(arg1: list):
    # your flow logic here
    pass

with Flow('My Flow') as flow:
    arg1 = Parameter('arg1', default=[])
    result = a_flow(arg1)
In this example,
arg1
is a parameter that defaults to an empty list. You can pass a list to your flow when you run it. If you're still having trouble, could you provide more context or a more complete example of your code? That would help me give a more precise answer.
p
ah... it seems like there's no way to annotate flow inputs as typed containers, but I can annotate task inpuhts like so;
n
wait im confused by this
ah... it seems like there's no way to annotate flow inputs as typed containers, but I can annotate task inpuhts like so
Copy code
In [1]: from prefect import flow

In [2]: @flow
   ...: def foo(anything: list[str]) -> list[str]:
   ...:     return anything
   ...:

In [3]: foo(['a', 'b', 'c'])
11:38:21.766 | INFO    | prefect.engine - Created flow run 'rampant-chowchow' for flow 'foo'
11:38:22.591 | INFO    | Flow run 'rampant-chowchow' - Finished in state Completed()
Out[3]: ['a', 'b', 'c']
is my example missing something?
p
oh huh
no that's exactly how I have my flow set up
n
do you have something else in your namespace called
flow
?
p
well, not exaclty, it's a more complicated flow, but let me try setting up this minimal example and see what happens
`nope;. only
nope;
prefect.flow
is the only
flow
variable in the namespace
πŸ‘ 1
okay this is what my code looks like, and it's sitll failing on the same error:
Copy code
from prefect import flow


@flow
def mini_flow(arg1: list[str]) -> str:
    print(arg1)
    return ",".join(arg1)


if __name__ == "__main__":
    mini_flow.serve(name="mini-flow")
do I need to provide it some default arguments or something?
n
shouldnt, lemme try
p
my installed version is 2.14.3
n
wait when do you that error? upon deployment run? or at serve time
mine is serving okay
p
at serve time
this file is in
run_flow.py
and I call
python run_flow.py
oh sorry
I guess at deploy time is when it crashes
by "serve time" you mean when I trigger a run?
n
lol i made up that language, sorry. i just meant when you start the serve process vs when you trigger a run
p
got it; the crash happens the moment I run my
python run_flow.py
script
so I don't even get to the part when I trigger a run
n
hmm - i am not seeing that. can you show output of
prefect version
(contains some other stuff besides just the version)
p
Version: 2.14.3 API version: 0.8.4 Python version: 3.10.12 Git commit: f1ff9257 Built: Thu, Nov 2, 2023 4:12 PM OS/Arch: darwin/arm64 Profile: default Server type: server
n
that is real strange
can you give the full trace here please?
p
yeah give me a sec to obfuscate some stuff
n
πŸ™
p
Copy code
Traceback (most recent call last):
  File "/Users/big-n00b/local-repo/run_flow.py", line 5, in <module>
    def mini_flow(arg1: list[str]) -> str:
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 1339, in flow
    Flow(
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/context.py", line 185, in __register_init__
    __init__(__self__, *args, **kwargs)
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 299, in __init__
    self.parameters = parameter_schema(self.fn)
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/utilities/callables.py", line 322, in parameter_schema
    if HAS_PYDANTIC_V2 and has_v2_model_as_param(signature):
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/_internal/pydantic/v2_schema.py", line 22, in has_v2_model_as_param
    return any(
  File "/Users/big-n00b/Library/Caches/pypoetry/virtualenvs/my-project-RcQVLUoH-py3.10/lib/python3.10/site-packages/prefect/_internal/pydantic/v2_schema.py", line 24, in <genexpr>
    or (inspect.isclass(p.annotation) and issubclass(p.annotation, V2BaseModel))
  File "/Users/big-n00b/.pyenv/versions/3.10.12/lib/python3.10/abc.py", line 123, in __subclasscheck__
    return _abc_subclasscheck(cls, subclass)
TypeError: issubclass() arg 1 must be a class
n
ah jeez
can you try
pip install 'pydantic<2'
and try again
p
yep that worked
but sadly we need our pydantic version
n
ok this seems like a bug related to our validation of annotations with pydantic 2 - would you mind opening an issue? ill play with this on pydantic v2 for a moment to see if there's a quick workaround
p
goot it; will do!
n
πŸ™
p
thank you for your help!
n
sure thing
weird. i am still not seeing this
Copy code
(bleeding-prefect) pad-2 :: testing/prefect-sandbox/testing-prefect β€Ήmain*β€Ί
Β» cat run_python.py

       β”‚ File: run_python.py

   1   β”‚ from prefect import flow
   2   β”‚
   3   β”‚
   4   β”‚ @flow
   5   β”‚ def mini_flow(arg1: list[str]) -> str:
   6   β”‚     print(arg1)
   7   β”‚     return ",".join(arg1)
   8   β”‚
   9   β”‚
  10   β”‚ if __name__ == "__main__":
  11   β”‚     mini_flow.serve(name="mini-flow")

(bleeding-prefect) pad-2 :: testing/prefect-sandbox/testing-prefect β€Ήmain*β€Ί
Β» python run_python.py


β”‚ Your flow 'mini-flow' is being served and polling for scheduled runs!                                                                                                          β”‚
β”‚                                                                                                                                                                                β”‚
β”‚ To trigger a run for this flow, use the following command:                                                                                                                     β”‚
β”‚                                                                                                                                                                                β”‚
β”‚         $ prefect deployment run 'mini-flow/mini-flow'                                                                                                                         β”‚
β”‚                                                                                                                                                                                β”‚
β”‚ You can also run your flow via the Prefect UI:                                                                                                                                 

β”‚ <https://app.prefect.cloud/account/xxx>

^C11:58:32.559 | INFO    | prefect.runner - Pausing schedules for all deployments...
11:58:32.683 | INFO    | prefect.runner - All deployment schedules have been paused!

(bleeding-prefect) pad-2 :: testing/prefect-sandbox/testing-prefect β€Ήmain*β€Ί
Β» pip list | rg 'prefect|pydantic'
prefect                                      2.14.3
pydantic                                     2.4.2
pydantic_core                                2.10.1
the only tangible difference i notice is that you're on server and im on cloud, but the error seems entirely client side / SDK - so it is sort of strange. probably warrants an issue
p
woah, interesting
n
aha. so i was on 3.11, if i switch to 3.10 then I reproduce (on the flow definition, this doesnt have to do with serve)
we think it has to do with the fact that before 3.11 you couldnt use builtins as type annotations
but still feels like something we should handle - just thinking out loud πŸ™‚
p
oh wait, it was python 3.11 that allows you to use builtins as type annotations?
I thought it was 3.9?
n
not positive, there’s some difference here between 3.10 and 3.11
p
got it
I can give this a try later with 3.11 and see if I can cause that to happen