https://prefect.io logo
Title
t

Tom Klein

07/06/2022, 1:35 PM
Hello, maybe this is kind of a silly question but - assuming we have a big CSV that we then split into multiple CSVs (as a task), and then, we wanna run a set of tasks for each of these “mini-CSVs” (either in parallel or sequentially, probably sequentially though) — we’re trying to understand what’s the beach approach here (continued in thread…)
1
e.g., we wanna : • alter an ENV var to point at that mini CSV location (e.g.
MY_INPUT=chunk1.csv
) - since this is the way to communicate with the NodeJS script we run on this CSV • execute the NodeJS script locally as a ShellTask • read the output CSV it has created (e.g.
chunk1-output.csv
) • use the
S3Upload
task to send it to S3 and we wanna do this for a list of e.g. 10-20 CSVs that were split from a big one
i see that there’s both
mapping
and
looping
, but i’m not sure how exactly can all the bullets i wrote above be compressed into a single Task that we then
map
on? specifically im not sure how to combine for example
ShellTask
and
S3Upload
task, the rest (e.g. env stuff or CSV read) can be done with simply Python
a

Anna Geller

07/06/2022, 1:47 PM
you would need to use iterated mapping https://docs.prefect.io/core/concepts/mapping.html#iterated-mapping but I agree this is challenging if some processes are not Python. Rather than env var, perhaps you can push that info to somve KV Store, grab that info in a Python process/Prefect task and pass to downstream tasks? just some ideas
t

Tom Klein

07/06/2022, 2:11 PM
hmm, not sure - what is the KV store meant to solve?i dont have a problem altering the env for the shell task (this could be done at runtime by passing it an
env
param from what i understand) - i just dont understand how to combine multiple prefect tasks into “one” that can then be mapped ?
s

Sylvain Hazard

07/06/2022, 2:21 PM
You don't have to combine them into a single task, you can juste "chain" mapped tasks together, creating some sort of "pipeline" for each of your initial objects.
t

Tom Klein

07/06/2022, 3:11 PM
@Sylvain Hazard oh, interesting, is there an example for this?
s

Sylvain Hazard

07/06/2022, 3:12 PM
Yes, this is a good example
t

Tom Klein

07/06/2022, 3:12 PM
so i guess now i just need to figure out a way of wrapping a
ShellTask
in a way that i can convert a name of a file (e.g.
my_chunk3.csv
) to an env param
s

Sylvain Hazard

07/06/2022, 3:14 PM
You could maybe add a task that generates the env param name from the file name ?
t

Tom Klein

07/06/2022, 3:14 PM
for example:
saved_main_csv = save_as_csv(accounts_data)
chunk_names = split_csv(file_name="/tmp/myin.csv", rowsize=5000, upstream_tasks=saved_main_csv)
run_output = sh_task(command='npm run partial_enrich').map(chunk_names)
^^ this wouldn’t work because i don’t think
sh_task
can be mapped this way, and even if it could, just giving it the list of filenames wouldn’t inject them as an env param
sure but how would the mapping work?
this
sh_task
is an invocation, not a task “object”
im just not sure how mapping works for built-in prefect tasks like
ShellTask
i guess i can do something like this:
sh_task = ShellTask(helper_script="cd /usr/src/app",
                    log_stderr=True,
                    return_all=True,
                    stream_output=True,
                    log_stdout=False)

@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_name):
    output = sh_task.run(command='npm run partial_enrich', env={
        "LE_INPUT_FILE": input_file_name,
        "LE_OUTPUT_FILE": 'enriched-' + input_file_name
    })
and then in the flow itself:
run_outputs = wrapped_enrich_shell_task.map(chunk_names)
will this work? and is there a way to make it sequential?
a

Anna Geller

07/06/2022, 4:39 PM
try it and you'll know 😉 to make it sequential, you would need to attach LocalExecutor to your flow
t

Tom Klein

07/06/2022, 5:01 PM
@Anna Geller hmm - im not sure if it works or not cause the mapped tasks are stuck in the
Mapped
state… is that related somehow to not having a
LocalExecutor
?
isn’t it the default executor anyway?
it could be related to the fact i added
state_handlers
based on the solution suggested in this github issue: https://github.com/PrefectHQ/prefect/issues/3951
nope, it happens even without the
state_handlers
@Anna Geller @Kevin Kho ?
k

Kevin Kho

07/06/2022, 8:27 PM
What was your executor again? I scrolled through but didn’t see?
t

Tom Klein

07/06/2022, 8:36 PM
i didn’t define any specifically, later, based on Anna’s suggestion i explicitly set it to
LocalExecutor
it doesn’t seem to make any difference, the tasks won’t progress from a
mapped
state and don’t seem to generate any “child tasks”
k

Kevin Kho

07/06/2022, 8:41 PM
I can’t really see anything wrong, and we normally see this on LocalDaskExecutor. Are you open to trying task looping if you need this to be iterative anyway? Also, I think you can try something like:
for chunk in chunk_names:
    wrapped_enrich_shell_task.run(chunk)
so that there is no Prefect Flow and you can see if the loop still hangs
t

Tom Klein

07/06/2022, 8:42 PM
but there is no loop, it just doesn’t create any child tasks i’d very much like to understand why it doesn’t work since it seems pretty straightforward (and surely we’ll need mapping elsewhere anyway)
this is the code:
saved = save_as_csv(accounts_data)
chunk_names = split_csv(file_name="/tmp/myin.csv", rowsize=5000, upstream_tasks=[saved])
run_output_filenames = wrapped_enrich_shell_task.map(chunk_names, upstream_tasks=[touch_output, chunk_names])
my_output_csvs = read_from_csv.map(run_output_filenames)
my_uploads = wrapped_s3_upload_task.map(my_output_csvs)
first line just saves some snowflake results as CSV second line splits the big CSV into smaller CSVs third line is supposed to run a ShellTask on each of the smaller CSV filenames (injecting it as an
env
of the shell task) the last two lines are kinda obvious, read csv and upload to s3
i confirmed (using logging) that the split_csv task indeed splits it into (in my test 1 ) chunks and gives back an array of strings (in my test, and array of size 1)
k

Kevin Kho

07/06/2022, 8:44 PM
The map is basically like a for loop for LocalExecutor. I’m suggesting to use it so we can see if it hangs when running outside of a Flow context
t

Tom Klein

07/06/2022, 8:45 PM
how can i run it outside of a flow context though? not sure i follow
there’s a snowflake query task that preceeds it..
k

Kevin Kho

07/06/2022, 8:46 PM
If you have a task,
@task
def mytask():
    ...

mytask.run()
will just call the Python function under the hood. No
Flow
object needed. This way we can debug the Python under the hood
t

Tom Klein

07/06/2022, 8:46 PM
are u saying that instead of mapping i move the for loop inside the wrapped task?
k

Kevin Kho

07/06/2022, 8:46 PM
Cuz I think something might be getting stuck there
t

Tom Klein

07/06/2022, 8:47 PM
this is the wrapped shell task:
@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_name):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(f"starting to work on file {input_file_name}")
    output = sh_task.run(command='npm run partial_enrich', env={
        "LE_INPUT_FILE": input_file_name,
        "LE_OUTPUT_FILE": 'enriched-' + input_file_name
    })
    return 'enriched-' + input_file_name
k

Kevin Kho

07/06/2022, 8:47 PM
So I am proposing instead of:
mytask.map(..)
we try the for loop
t

Tom Klein

07/06/2022, 8:47 PM
ok so i move the loop inside this task you mean?
k

Kevin Kho

07/06/2022, 8:49 PM
No no. Let me make an example
t

Tom Klein

07/06/2022, 8:49 PM
like so?
@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_names):
    logger = prefect.context.get('logger')
    output_file_names = []
    for input_file_name in input_file_names:
        <http://logger.info|logger.info>(f"starting to work on file {input_file_name}")
        output = sh_task.run(command='npm run partial_enrich', env={
            "LE_INPUT_FILE": input_file_name,
            "LE_OUTPUT_FILE": 'enriched-' + input_file_name
        })
        output_file_names.append('enriched-' + input_file_name)
    return output_file_names
before i had this mapping, i just took the CSV and ran it as a single input to the task, it worked fine the issue only happened when i tried to run it mapped
k

Kevin Kho

07/06/2022, 8:51 PM
If you have this:
from prefect import Flow, task

@task
def mytask(x):
    return 1
and have a Flow:
with Flow("...") as flow:
    mytask.map([1,2,3,4])
take out the Flow and do:
for x in [1,2,3,4]:
    mytask(x)
so in your case,
for chunk in chunk_names:
    wrapped_enrich_shell_task(chunk)
and just comment out the flow for now
t

Tom Klein

07/06/2022, 8:52 PM
i can’t run this task out of context, there’s too many stuff it needs to run, env vars, the input from snowflake, etc.
here, i did what i pasted above ^^^ and it’s running fine:
in other words if i execute the for loop explicitly within the task (and make it accept an array instead of a single input), it works fine
but if i do it mapped, i have this thing (the wrapped shell task) enter a “Mapped” state, and that’s it. nothing happens. nothing hangs either. the flow ends successfuly, it just doesn’t do anything. it doesn’t create any “children”
k

Kevin Kho

07/06/2022, 8:55 PM
I think the issue might be the multiple shell tasks though. It’s ok if you can’t run the task out of Flow context, can we try then just commenting out the content of the
wrapped_enrich_shell_task
and just running something like
echo something
and then mapping over that to see if that can reproduce it?
t

Tom Klein

07/06/2022, 8:55 PM
sure
k

Kevin Kho

07/06/2022, 8:56 PM
Also what happens on LocalDaskExecutor?
t

Tom Klein

07/06/2022, 8:57 PM
i never tried it
here is what im trying now:
@task
def wrapped_enrich_shell_task2(input_file_name):
    sh_task.run(command=f"echo {input_file_name}")
    return 'enriched-' + input_file_name
k

Kevin Kho

07/06/2022, 8:58 PM
Yes that would help at least isolate if the behavior is the shell task or potentially related to the
npm run
Worth trying the default LocalDaskExecutor too:
flow.executor = LocalDaskExecutor()
I really don’t know if the result will be the same as LocalExecutor()
t

Tom Klein

07/06/2022, 8:59 PM
same thing:
becomes “mapped” and nothing happens, flow finishes
k

Kevin Kho

07/06/2022, 9:00 PM
Ok that’s helpful. I can try to replicate on my end
t

Tom Klein

07/06/2022, 9:00 PM
but i understood that
LocalDaskExecutor
will execute in parallel, im not interested in parallel excution. i can try it just for the sport but it’s not what im interested in
k

Kevin Kho

07/06/2022, 9:00 PM
Yeah that would help
t

Tom Klein

07/06/2022, 9:01 PM
same thing:
k

Kevin Kho

07/06/2022, 9:03 PM
Trying to replicate with this:
from prefect import Flow, task
from prefect.tasks.shell import ShellTask

sh_task = ShellTask(helper_script="cd /usr/src/app",
                    log_stderr=True,
                    return_all=True,
                    stream_output=True,
                    log_stdout=False)

@task
def wrapped_enrich_shell_task2(input_file_name):
    sh_task.run(command=f"echo {input_file_name}")
    return 'enriched-' + input_file_name

@task
def get_file_names():
    return ['a.txt','b.txt','c.txt','d.txt']


with Flow("..") as flow:
    names = get_file_names()
    wrapped_enrich_shell_task2.map(names)

flow.run()
t

Tom Klein

07/06/2022, 9:06 PM
same thing
i removed the snowflake query as well
this is my code right now:
touch_output = sh_task(command="touch src/.env")
    chunk_names = get_file_names()
    run_output_filenames = wrapped_enrich_shell_task2.map(chunk_names, upstream_tasks=[touch_output, chunk_names])
k

Kevin Kho

07/06/2022, 9:08 PM
I get something similar, though it seems the mapped task runs do run
Will fiddle with it a bit
t

Tom Klein

07/06/2022, 9:09 PM
ok
could it be related to me doing
#flow.set_reference_tasks([run_output_filenames])
?
k

Kevin Kho

07/06/2022, 9:11 PM
No. I think I know one issue
t

Tom Klein

07/06/2022, 9:11 PM
looks better now though:
k

Kevin Kho

07/06/2022, 9:11 PM
Can you try:
from prefect import unmapped
run_output_filenames = wrapped_enrich_shell_task2.map(chunk_names, upstream_tasks=[unmapped(touch_output)])
t

Tom Klein

07/06/2022, 9:11 PM
(i did some other changes thoguh, like remove the
upstream_tasks
and the other upstream task)
oh interesting, i didn’t really get what was the unmapped thing
k

Kevin Kho

07/06/2022, 9:13 PM
I will explain if it ends up helping
t

Tom Klein

07/06/2022, 9:14 PM
yep, looks good:
k

Kevin Kho

07/06/2022, 9:14 PM
Ok I can draft up an example to show you the difference
t

Tom Klein

07/06/2022, 9:15 PM
hmm ok sure, thanks
(also, is there a way to guarantee serial execution rather than parallel? if i move it back to
LocalExecutor
would that do the trick?)
(and does checkpointing work for mapped tasks? e.g. let’s say i run it - and only 3 out of 15 mapped tasks finish, if i restart the job is there a way to make it continue from the 4th mapped task?)
(sorry for the question bombardment)
k

Kevin Kho

07/06/2022, 9:20 PM
LocalExecutor is serial execution, but order is not guaranteed. Only looping guarantees order. Checkpoint works, but there is a caveat that you need distinct results for each of the mapped tasks because it checks if the result exists. So you need to use templating to give them distinct names.
Try to see if you can understand this code snippet:
from prefect import Flow, task, unmapped

@task
def abc(x, y):
    return x+y

with Flow("..") as flow:
    # This version is working well
    abc.map([1,2,3],[4,5,6])

with Flow("..") as flow2:
    # Note for uneven list, it only runs once
    abc.map([1,2,3],[4])

@task
def items():
    return [1]

with Flow("..") as flow3:
    a = items()
    # The behavior is similar for upstream mapped tasks.
    # They become linked element-wise
    abc.map([1,2,3], [4,5,6], upstream_tasks=[a])

with Flow("..") as flow4:
    a = items()
    # Indicating unmapped will say not to pair them element-wise
    abc.map([1,2,3], [4,5,6], upstream_tasks=[unmapped(a)])

flow.run()
flow2.run()
flow3.run()
flow4.run()
Each flow behaves differently. Your issue was you have an empty list (I assume) as an upstream to a mapped task
t

Tom Klein

07/06/2022, 9:24 PM
i thought setting
upstream
only tells it the dependency though, not that it influences what it tries to run on
ya the order is not important in our case, we just have e.g. 50K inputs and we wanna operate on them in batches, the only thing important to us is that we can finish them all, and have retries in case of failures (without having to rerun the entire thing), etc.
k

Kevin Kho

07/06/2022, 9:25 PM
So if the upstream is a list also. The assumption is to pair the downstream execution to each upstream in an element-wise fashion so that we can do depth first execution as opposed to breadth first.
unmapped
on upstream will say just run the upstream to completion and then run the downstream
t

Tom Klein

07/06/2022, 9:26 PM
huh, interesting
k

Kevin Kho

07/06/2022, 9:26 PM
These are very tricky, but I believe it has to be that way to support the depth first execution of consecutive mapped calls.
t

Tom Klein

07/06/2022, 9:26 PM
regarding templating - i think it would be easy in our case because we split the big CSV into smaller ones where each one is called for example 5000A, 5000B, and so on… so that could be in the task name too, right?
do we need to do anything to enable it though? like say the result is a
PrefectResult
or something?
k

Kevin Kho

07/06/2022, 9:29 PM
Yes that can be in the Task Run name. It will work with any Result we have, but the PrefectResult has a size limit I think because that is stored in our database compared to something like S3 result which is a file in a bucket you own.
t

Tom Klein

07/06/2022, 9:29 PM
but we have to have it set to something other than local i presume?
we actually don’t need to use it for persistence because we upload the results to S3 in the end anyway… so if local would work too for checkpointing that’s even better
k

Kevin Kho

07/06/2022, 9:31 PM
LocalResult can work too with the checkpointing. But if you restart the Flow, you might lose those files if you use a container and it restarts unless you use some kind of of hard disk?
t

Tom Klein

07/06/2022, 9:32 PM
we upload the results to S3, i guess what you’re saying is that we’ll lose the data if the flow somehow dies in the middle before all results were persisted?
this is what it looks like:
run_output_filenames = wrapped_enrich_shell_task.map(chunk_names, upstream_tasks=[unmapped(touch_output)])
    my_output_csvs = read_from_csv.map(run_output_filenames)
    my_uploads = wrapped_s3_upload_task.map(my_output_csvs)
run (as a shell task), read the output, push it to S3
k

Kevin Kho

07/06/2022, 9:33 PM
If you are relying on container storage which is ephemeral right?
t

Tom Klein

07/06/2022, 9:33 PM
while running, yes, but the assumption is that pushing it to s3 upon success would persist it
that’s kind of why i wanted to “compress” the tasks originally, so that each mapped task would be paired with its next step, i guess what you called depth-first
e.g. run subtask A, read mini CSV A, push output A to S3 THEN run subtask B, read mini CSV B, push output B to s3, and so on
k

Kevin Kho

07/06/2022, 9:34 PM
I think this will be fine, but the assumption is that you never have to restart between saving locally and uploading to s3. If a file is saved but the upload fails, then you won’t have it upon the Flow restart because the container died right?
t

Tom Klein

07/06/2022, 9:35 PM
rather than - run all , then read all, then push all
yes but there’s literally nothing that happens between finishing the run and uploading (other than reading the CSV)
k

Kevin Kho

07/06/2022, 9:36 PM
I get that, but the upload can fail right? That would require a restart
t

Tom Klein

07/06/2022, 9:36 PM
presumably, yes. but im not worried about that. it never failed for us 🙂 and i can add retries
in other words, i agree that there are ways to make it more robust, im more worried about being able to just rerun the entire flow and have each batch only run if it never completed before --- cause the thing that can realistically fail is the shell task
but i dont even mind trying to use an S3Result for practice.. i am more trying to understand what are the necessary preconditions for checkpointing to function correctly
k

Kevin Kho

07/06/2022, 9:40 PM
But I think anyway successful tasks are not re-run? We can’t enforce depth first execution though so the risk here is that if you have
Save File A -> Save File B -> Save File C -> Upload File A -> Upload File B -> Upload File C
and then
Save File C
fails, you lose the work of A and B and upon restart, it’s not there to upload anymore. This feels a lot simpler if you directly use the S3Result
S3Result will likely be a lot better because it’s persistent and will be around for the restarts. And then maybe you don’t need to upload anymore yourself.
Anyway I think you have enough stuff to try for now
t

Tom Klein

07/06/2022, 9:43 PM
wait so depth first is not enforced?
k

Kevin Kho

07/06/2022, 9:44 PM
For Local and LocalDask, not enforced. For DaskExecutor, highly preferred but not enforced
t

Tom Klein

07/06/2022, 9:45 PM
interesting. ok thanks, i will play around with it a bit, much appreciated 🙏
k

Kevin Kho

07/06/2022, 9:45 PM
Of course!
t

Tom Klein

07/06/2022, 10:09 PM
k, just tested it out with an S3Result and added random failures to the (mock) shell task, tried a restart and it did pick up well from the failures and finished everything (in the end i had to set an S3Result for the entire flow, cause otherwise the mapped task has nothing to work on, which i kind of missed earlier — although i suppose i can also turn off checkpointing for the earlier steps) i think the only thing i dislike is that it just throws everything to the bucket unless i manually tell it to go to a specific dir (now the top level of my entire prefect bucket is filled with persisted task results…) but at least it’s working 😄
a

Anna Geller

07/06/2022, 10:11 PM
wohoo! those 100 messages paid off 🎉 😄
thx for the update Tom
t

Tom Klein

07/06/2022, 10:20 PM
hug
😂 2