I want to create a flow that abstracts away from w...
# prefect-community
s
I want to create a flow that abstracts away from where its inputs come from, so that I can run it with inputs from different sources: for instance, a string, or an s3 object. To do this I thought of creating a flow, and adding some dummy tasks, then running the flow with a flow runner, and starting with an initial state of “Success” for these tasks and using results that wrap the inputs. For a string this seems straightforward enough. For an object in S3, loaded lazily, I can subclass Result, and specify
value
as a property which loads and caches the object when called. However, the first thing I thought of is that I should use an
S3Result
. But this doesn’t seem to be the right sort of object, as it just takes a bucket, not a key. It seems more of a “result storage location” than a
Result
to me. I’m wondering why it has an
isa
relation to
Result
. Maybe I’m not understanding the object hierarchy properly, and/or I should be trying to achieve my goal differently? [Update had overlooked that base
Result
takes
location
. Still
S3Result
has no ability to load results on demand, except via
read
, which takes a new location.] A related question is how I should trigger an already-created flow, that lives on the prefect server, with a new runner?
Client.create_flow_run
allows me to create a new run, but not a dictionary of initial states. It would seem that I am abusing flows a little… Perhaps I need to create initial tasks that take a serialized specification for where their “real” inputs live, that can be passed in parameters. Thinking about it, this seems more the intended way I should be doing things, but it also feels that I am duplicating what Prefect is doing somehow. Guidance on what the best practice should be would be greatly appreciated.
k
Hey @Shaun Cutts, I feel like you may be correct that you are starting to step into what Prefect is doing a bit - creating a flow run and declaring initial states for tasks isn't meant to be a manual process. I am curious to your intentions behind this flow: would a flow that does different things based on initial inputs satisfy your requirements? In this instance, a task to check an input type and determine subsequent downstream tasks with conditional branching logic could be handy - and should be easier (or at least more explicit) than initializing tasks in specified state. You would also be able to use
create_flow_run
via a
Client
or GraphQL API call as expected.
s
@Kyle Moon-Wright We are compiling a declarative machine-learning oriented data-ops language into flows. If a flow fails, whether due to code or data change, we want to be able to rerun with substitute data, which sometimes may not be from the same source. E.g. usually data might be from a trigger in S3, but to investigate what is going on, an engineer may want to rerun using a string generated manually or a local file. Indeed, for the moment what we have decided to do is to have inputs specified as parameters that specify the data source (e.g. just
{value: …}
or
{s3: {bucket: …, key: …}}
etc) Still it seems a shame to have to replicate some of the machinery that you’ve built into
Result
and your caching apparatus. What is an output of one task is the input of another, so it would seem that, if you can configure outputs, you should also be able to configure inputs. [Edit] … indeed, a related question is how to reconfigure outputs. I see you can get task state per task on the client, but is it possible to change the availability of results from one run to the next? Is this possible with the graphql interface? (I haven’t come across a reference document to it, yet….)