Michael Reynolds
07/25/2022, 9:40 PM@flow
. I will attach a micro example of what i am trying to accomplish... would someone mind helping me understanding the error message:
FAILED my-prefect-project/tests/flows.py::test_pipeline - TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'
Michael Reynolds
07/25/2022, 9:40 PM########################################################################
# Source interfaces
########################################################################
@dataclass
class Source:
__metaclass__ = ABCMeta
@abstractmethod
def poll() -> List[ Tuple[ str,str ] ]:
return NotImplemented()
@dataclass
class MySource( Source ):
def __init__( self, host: str, port: int ):
self.host = host
self.port = port
self.stream = MyStream.connect( host, port )
def poll() : List[ Tuple[ str,str ] ]:
return self.stream.poll( 100 )
########################################################################
# Sink interfaces
########################################################################
@dataclass
class Sink:
__metaclass__ = ABCMeta
@abstractmethod
def do_sink( data : List[ dict ] ) -> int:
raise NotImplemented()
@dataclass
class MySink:
def __init__( self, host: str, port: int ):
self.host = host
self.port = port
self.datastore = MyDatastore.connect( host, port )
def do_sink( data : List[dict] ):
self.datastore.batch_save( data )
return len( data )
@task( name = 'read-source' )
async def read_source( source : Source ):
return source.poll()
@task( name = 'process-items' )
async def process_items( data : List[ dict] ) -> List[dict]:
return process( data )
@task( name = 'sink-results' )
async def sink_results( data : List[ dict ], sink : Sink ):
return sink.do_sink( data )
@flow( name = 'myflow' )
def run_flow( sources : List[ Source ], sinks : List[ Sink ] ):
aggregated = []
for src in sources:
msgs = await read_source( source )
results = await process_items( msgs )
aggregated.append( results )
for sink in sinks:
sink_results( aggregated, sinks )
def run():
sources = [ MySource( 'host', 1010 ) ]
sinks = [ MySink( 'host', 1011 ) ]
run_flow( sources, sinks )
Michael Reynolds
07/25/2022, 9:42 PMMichael Reynolds
07/25/2022, 9:42 PMexpr = MySource(), visit_fn = <function resolve_inputs.<locals>.visit_fn at 0x7fd4fb6b4710>, return_data = True
async def visit_collection(
expr, visit_fn: Callable[[Any], Awaitable[Any]], return_data: bool = False
):
"""
This function visits every element of an arbitrary Python collection. If an element
is a Python collection, it will be visited recursively. If an element is not a
collection, `visit_fn` will be called with the element. The return value of
`visit_fn` can be used to alter the element if `return_data` is set.
Note that when using `return_data` a copy of each collection is created to avoid
mutating the original object. This may have significant performance penalities and
should only be used if you intend to transform the collection.
Supported types:
- List
- Tuple
- Set
- Dict (note: keys are also visited recursively)
- Dataclass
- Pydantic model
Args:
expr (Any): a Python object or expression
visit_fn (Callable[[Any], Awaitable[Any]]): an async function that
will be applied to every non-collection element of expr.
return_data (bool): if `True`, a copy of `expr` containing data modified
by `visit_fn` will be returned. This is slower than `return_data=False`
(the default).
"""
def visit_nested(expr):
# Utility for a recursive call, preserving options.
# Returns a `partial` for use with `gather`.
return partial(
visit_collection, expr, visit_fn=visit_fn, return_data=return_data
)
# Get the expression type; treat iterators like lists
typ = list if isinstance(expr, IteratorABC) else type(expr)
typ = cast(type, typ) # mypy treats this as 'object' otherwise and complains
# do not visit mock objects
if isinstance(expr, Mock):
return expr if return_data else None
elif typ in (list, tuple, set):
result = await gather(*[visit_nested(o) for o in expr])
return typ(result) if return_data else None
elif typ in (dict, OrderedDict):
assert isinstance(expr, (dict, OrderedDict)) # typecheck assertion
keys, values = zip(*expr.items()) if expr else ([], [])
keys = await gather(*[visit_nested(k) for k in keys])
values = await gather(*[visit_nested(v) for v in values])
return typ(zip(keys, values)) if return_data else None
elif is_dataclass(expr) and not isinstance(expr, type):
values = await gather(
*[visit_nested(getattr(expr, f.name)) for f in fields(expr)]
)
result = {field.name: value for field, value in zip(fields(expr), values)}
> return typ(**result) if return_data else None
E TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'
venv/lib/python3.7/site-packages/prefect/utilities/collections.py:280: TypeError
=========================================================================================================================== short test summary info ===========================================================================================================================
FAILED my-prefect-project/tests/flows.py::test_pipeline - TypeError: __init__() missing 2 required positional arguments: 'host' and 'port'
Andrew Huang
07/25/2022, 9:57 PM@dataclass
class MySink:
host: str
port: int
def __init__( self, host: str, port: int ):
self.host = host
self.port = port
self.datastore = MyDatastore.connect( host, port )
def do_sink( data : List[dict] ):
self.datastore.batch_save( data )
return len( data )
Then:
sources = [ MySource(host='host', port=1010 ) ]
Michael Reynolds
07/25/2022, 10:56 PMprefecty
way to implement what i am trying to do here?Andrew Huang
07/25/2022, 11:01 PMprefecty
way is using Blocks; that way, you can add and save configs on the UI https://orion-docs.prefect.io/concepts/blocks/#blocks-overview
(simply remove your dataclass decorators and inherit from Block)
then register the block in the cli using prefect block register file.py
(need >=2.0b13 for the CLI)Michael Reynolds
07/25/2022, 11:03 PMAndrew Huang
07/25/2022, 11:05 PMMichael Reynolds
07/26/2022, 2:27 PMMichael Reynolds
07/26/2022, 2:27 PMMichael Reynolds
07/26/2022, 2:28 PMAnna Geller
Anna Geller
Anna Geller
Anna Geller