Hey everyone, I'm really struggling to wrap a cust...
# ask-community
j
Hey everyone, I'm really struggling to wrap a custom Executor into Prefect and could use some help! I know it's a little late in DC, so let me know if this should wait til tomorrow lol. So I have
future = CustomExecutor(flow.run, *my_args, **my_kwargs)
working just fine. The issue comes when I try
flow.executor = CustomExecutor(); state = flow.run()
. When I do this with, I recieve...
Unexpected error occured in FlowRunner: AttributeError("'CustomFuture' object has no attribute 'is_mapped'")
I read through the FlowRunner code and have a good guess as to what's happening, but I could use some insight from a dev. Has anyone seen this before?
c
Hey Jack — we haven’t really seen anyone build a custom executor yet and the interface is subject to change without warning, so tread carefully here! It might be worthwhile to open an issue with your needs and why you need to build a custom executor in the first place. That being said, this error message suggests to me that your executor is not resolving a “CustomFuture” to a state object when gathered with the
wait
method. This will probably be tricky to track down right now because most of our executor logic for delayed objects is built on the assumptions that Dask satisfies (e.g., dask will resolve futures implicitly without requiring an explicit
wait
call sometimes)
j
Here's my thought... FlowRunner submits run_task jobs to the Executor with the upstream_state kwarg set as a {Edge: Future}. However, one job's run_task attempts to update another's upstream_state -- even though though it's already been submitted. This update changes the 2nd job's upstream state from {Edge: Future} to {Edge: State}. So FlowRunner assumes we can update a job's kwargs even after it's been submitted to the Executor. This assumption works in Dask, but fails in many other Executors that submit via a message broker (where job is serialized and final went sent).
Ah man, that stinks to hear... I already typed up my last message right before you sent yours, so I just sent it along anyways.
👍 1
I believe I know the "design-level" fix needed in FlowRunner, but this may be best for when Prefect jumps versions/refactors. Should I open this as a feature-request issue, or add this to discussions?
c
Yea I think you should open a feature request issue — we need a place to discuss more extensible executors so I recommend phrasing it in that way
j
Sweet. I'm writing it up now and will submit later tonight. I'll add some extra details on my specific Executor for justification too. Thanks for the feedback!
c
Awesome - anytime!