Is there anywhere I can get an example workflow, o...
# show-us-what-you-got
p
Is there anywhere I can get an example workflow, or get pointed to an example workflow which looks something like the following: • starts flow and creates a unique local directory for that run on disk • executes a task which creates a file - eg downloads a file from some URL using
urllib
, and saves it in
$RUN_LOC/my_file
• execs a task which runs a bash command - eg
wc -l $RUN_LOC/my_file > file_len
Obviously the tasks here are silly. My aim is to have an example of something which I can have a series of tasks do work on files. The task results are almost not important, the result might be nothing more than a RAN_OK/ NOT_OK. What matters to me is I can have a mechanism to operate on files, and shell out to other utils to operate on files and check their return status. I've been looking at https://docs.prefect.io/core/concepts/results.html#how-to-configure-task-result-persistence
Copy code
from prefect import task, Flow
from prefect.engine.results import LocalResult


@task(result=LocalResult(location="initial_data.prefect"))
def root_task():
    return [1, 2, 3]

@task(result=LocalResult(location="{date:%A}/{task_name}.prefect"))
def downstream_task(x):
    return [i * 10 for i in x]

with Flow("local-results") as flow:
    downstream_task(root_task)
I guess at a push, if creating a global uniq root per run is not possible, and every task keeps passing back a different location and I keep record of these, it wouldn't be the end of the world. It's possible that Prefect just isn't suited for this kind of application? Or is this heat making me miss things? I'm just failing to see how to do this and it's getting to make or break time.
z
Hi @Philip MacMenamin, just want to clarify your use case. For each run, you'd like to: • create a unique directory wherever your flow is running • download something to that unique directory • execute some arbitrary shell command on the file in that directory, correct?
p
The uniq dir per run isn't essential, if that's very difficult. What I do need is to be able to run tasks, on files, and have the task know where that file is.
Meaning, if every time a task execs it creates a file in a random place, then I at least have to know where this random place is.
Ideally I'd prefer if these were all in the same place, and I could just tell a task to open
file_a
and it knows that it needs to open file_a for that specific run, and I wouldn't have to keep explicitly pass around full paths. But that's for neatness, that's not a deal breaker I suppose.
As in, I would prefer to be able to say
open file_a
as opposed to having an argument getting passed in at the top of the task, and prepending this to the file path, and then looking up where this tasks set of files would be written, and then return that path on for the next task. I hope I'm making sense here. 😕
When I run the example given in the docs, the flow runs successfully, but I cannot find where any of these files are. They are not `~/.prefect/results/`for example. I've set PREFECT_FLOWS_CHECKPOINTING to true in the
~/.prefect/config.toml
file
z
I have a potentially silly question: if the results aren't important, are you attached to using results? This almost sounds like a use case better suited to two
ShellTasks
, potentially with a Python task sandwiched in between to download your file.
p
No! I initially didn't think results was what I wanted to use.
So, no, I am not wedded to results. All I care about is having the workflow, being able to operate on the files in python if needed, and being able to shell out to other utils (and get the return status of that shell command)
z
Okay, awesome. This is a toy flow, so you'll need to customize it to whatever you're trying to do, but here's a super naive example:
Copy code
from prefect import task, Flow
from prefect.tasks.shell import ShellTask

directory = "testing456"

mkdir_task = ShellTask()
# in your use case you'd download the file
# rather than using touch_task
touch_task = ShellTask()
with Flow("shelltaskflow") as flow:
        mkdir_task(command=f"mkdir {directory}")
        touch_task(command=f"touch {directory}/test.txt")

out = flow.run()
If you want to make your directory unique per run, you could easily substitute
testing456
for a UUID or a timestamp or something along those lines that floats your boat.
Does something like this look like it'd work for your pattern?
p
OK. And, if I touch nothing else, I should see
testing456
in... the ~.prefect/results dir?
I know I can set that using toml confs
(or I think I can.)
z
Ah, not quite. So in the case of the script I sent you, you'll see
testing456
in whatever directory you ran your flow in.
But if you wanted to persist things at a certain root, you could do so using the
helper_script
argument. https://docs.prefect.io/api/latest/tasks/shell.html#shelltask
p
ok, that's fine. Np. Again, I can set this. So you simply scope out the dir to be visible to the tasks, and everything uses that. That seems fine.
z
Okay, solid! Is there anything else we can help with?
p
ok. Yes, I think that's good.
No, I think I can get working. thanks Zachery!
*Zachary 🤦‍♂️
z
Woohoo, glad I could help. 🎉
👍 1
(And no worries, I've gotten everything from Zak to Zaquery. 🙂 )
p
sounds like the next version of JQuery
z
As I spelled it out again, I kicked myself for not snagging that as a Github handle. 😂
😎 1