https://prefect.io logo
Title
m

Michael Reynolds

07/25/2022, 9:40 PM
I am having trouble with an error message I am receiving. I believe it has to do with the engine analyzing domain model classes i provide as input to a
@flow
. I will attach a micro example of what i am trying to accomplish... would someone mind helping me understanding the error message:
FAILED my-prefect-project/tests/flows.py::test_pipeline - TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'
1
micro example code:
########################################################################
# Source interfaces
########################################################################
@dataclass
class Source:
	__metaclass__ = ABCMeta

	@abstractmethod
	def poll() -> List[ Tuple[ str,str ] ]:
		return NotImplemented()


@dataclass
class MySource( Source ):
	def __init__( self, host: str, port: int ):
        self.host = host
        self.port = port
        self.stream = MyStream.connect( host, port )


   	def poll() : List[ Tuple[ str,str ] ]:
   		return self.stream.poll( 100 )

########################################################################
# Sink interfaces
########################################################################

@dataclass
class Sink:
	__metaclass__ = ABCMeta

	@abstractmethod
	def do_sink( data : List[ dict ] ) -> int:
		raise NotImplemented()



@dataclass
class MySink:
	def __init__( self, host: str, port: int ):
        self.host = host
        self.port = port
        self.datastore = MyDatastore.connect( host, port )


    def do_sink( data : List[dict] ):
    	self.datastore.batch_save( data )
    	return len( data )


@task( name = 'read-source' )
async def read_source( source : Source ):
	return source.poll()

@task( name = 'process-items' )
async def process_items( data : List[ dict] ) -> List[dict]:
	return process( data )

@task( name = 'sink-results' )
async def sink_results( data : List[ dict ], sink : Sink ):
	return sink.do_sink( data )
	

@flow( name = 'myflow' )
def run_flow( sources : List[ Source ], sinks : List[ Sink ] ):
	aggregated = []
	for src in sources:
		msgs = await read_source( source )
		results = await process_items( msgs )
		aggregated.append( results )


	for sink in sinks:
		sink_results( aggregated, sinks )


def run():
	sources = [ MySource( 'host', 1010 ) ]
	sinks = [ MySink( 'host', 1011 ) ]

	run_flow( sources, sinks )
full error:
expr = MySource(), visit_fn = <function resolve_inputs.<locals>.visit_fn at 0x7fd4fb6b4710>, return_data = True

    async def visit_collection(
        expr, visit_fn: Callable[[Any], Awaitable[Any]], return_data: bool = False
    ):
        """
        This function visits every element of an arbitrary Python collection. If an element
        is a Python collection, it will be visited recursively. If an element is not a
        collection, `visit_fn` will be called with the element. The return value of
        `visit_fn` can be used to alter the element if `return_data` is set.
    
        Note that when using `return_data` a copy of each collection is created to avoid
        mutating the original object. This may have significant performance penalities and
        should only be used if you intend to transform the collection.
    
        Supported types:
        - List
        - Tuple
        - Set
        - Dict (note: keys are also visited recursively)
        - Dataclass
        - Pydantic model
    
        Args:
            expr (Any): a Python object or expression
            visit_fn (Callable[[Any], Awaitable[Any]]): an async function that
                will be applied to every non-collection element of expr.
            return_data (bool): if `True`, a copy of `expr` containing data modified
                by `visit_fn` will be returned. This is slower than `return_data=False`
                (the default).
        """
    
        def visit_nested(expr):
            # Utility for a recursive call, preserving options.
            # Returns a `partial` for use with `gather`.
            return partial(
                visit_collection, expr, visit_fn=visit_fn, return_data=return_data
            )
    
        # Get the expression type; treat iterators like lists
        typ = list if isinstance(expr, IteratorABC) else type(expr)
        typ = cast(type, typ)  # mypy treats this as 'object' otherwise and complains
    
        # do not visit mock objects
        if isinstance(expr, Mock):
            return expr if return_data else None
    
        elif typ in (list, tuple, set):
            result = await gather(*[visit_nested(o) for o in expr])
            return typ(result) if return_data else None
    
        elif typ in (dict, OrderedDict):
            assert isinstance(expr, (dict, OrderedDict))  # typecheck assertion
            keys, values = zip(*expr.items()) if expr else ([], [])
            keys = await gather(*[visit_nested(k) for k in keys])
            values = await gather(*[visit_nested(v) for v in values])
            return typ(zip(keys, values)) if return_data else None
    
        elif is_dataclass(expr) and not isinstance(expr, type):
            values = await gather(
                *[visit_nested(getattr(expr, f.name)) for f in fields(expr)]
            )
            result = {field.name: value for field, value in zip(fields(expr), values)}
>           return typ(**result) if return_data else None
E           TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'

venv/lib/python3.7/site-packages/prefect/utilities/collections.py:280: TypeError
=========================================================================================================================== short test summary info ===========================================================================================================================
FAILED my-prefect-project/tests/flows.py::test_pipeline - TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'
a

Andrew Huang

07/25/2022, 9:57 PM
Maybe this?
@dataclass
class MySink:

    host: str
    port: int

	def __init__( self, host: str, port: int ):
        self.host = host
        self.port = port
        self.datastore = MyDatastore.connect( host, port )


    def do_sink( data : List[dict] ):
    	self.datastore.batch_save( data )
    	return len( data )
Then:
sources = [ MySource(host='host', port=1010 ) ]
m

Michael Reynolds

07/25/2022, 10:56 PM
okay i will give that a shot later @Andrew Huang, thanks! does an implementation like this make sense? or is there a more
prefecty
way to implement what i am trying to do here?
a

Andrew Huang

07/25/2022, 11:01 PM
a more
prefecty
way is using Blocks; that way, you can add and save configs on the UI https://orion-docs.prefect.io/concepts/blocks/#blocks-overview (simply remove your dataclass decorators and inherit from Block) then register the block in the cli using
prefect block register file.py
(need >=2.0b13 for the CLI)
m

Michael Reynolds

07/25/2022, 11:03 PM
@Andrew Huang awesome thank you for that link
a

Andrew Huang

07/25/2022, 11:05 PM
no problem!
m

Michael Reynolds

07/26/2022, 2:27 PM
@Andrew Huang i just have a question about blocks... in this example one of my "sources" is kafka... the way kafka consumers work best is if each instance of my application can start up and use a single consumer for the entire lifecycle of the application
am i able to do that with blocks?
kafka gets really weird if you keep creating a bunch of short lived consumers that join and leave a group rapidly... so having a consumer connect per workflow doesn't seem to be viable
solves the same use case
blocks are mainly for storing things like credentials, your business logic to build Kafka consumer can be defined separately
using the above approach you can avoid short lived consumer, too