Tom Klein
07/06/2022, 1:35 PMMY_INPUT=chunk1.csv
) - since this is the way to communicate with the NodeJS script we run on this CSV
• execute the NodeJS script locally as a ShellTask
• read the output CSV it has created (e.g. chunk1-output.csv
)
• use the S3Upload
task to send it to S3
and we wanna do this for a list of e.g. 10-20 CSVs that were split from a big onemapping
and looping
, but i’m not sure how exactly can all the bullets i wrote above be compressed into a single Task that we then map
on?
specifically im not sure how to combine for example ShellTask
and S3Upload
task, the rest (e.g. env stuff or CSV read) can be done with simply PythonAnna Geller
Tom Klein
07/06/2022, 2:11 PMenv
param from what i understand) -
i just dont understand how to combine multiple prefect tasks into “one” that can then be mapped ?Sylvain Hazard
07/06/2022, 2:21 PMTom Klein
07/06/2022, 3:11 PMSylvain Hazard
07/06/2022, 3:12 PMTom Klein
07/06/2022, 3:12 PMShellTask
in a way that i can convert a name of a file (e.g. my_chunk3.csv
) to an env paramSylvain Hazard
07/06/2022, 3:14 PMTom Klein
07/06/2022, 3:14 PMsaved_main_csv = save_as_csv(accounts_data)
chunk_names = split_csv(file_name="/tmp/myin.csv", rowsize=5000, upstream_tasks=saved_main_csv)
run_output = sh_task(command='npm run partial_enrich').map(chunk_names)
^^ this wouldn’t work because i don’t think sh_task
can be mapped this way, and even if it could, just giving it the list of filenames wouldn’t inject them as an env paramsh_task
is an invocation, not a task “object”ShellTask
sh_task = ShellTask(helper_script="cd /usr/src/app",
log_stderr=True,
return_all=True,
stream_output=True,
log_stdout=False)
@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_name):
output = sh_task.run(command='npm run partial_enrich', env={
"LE_INPUT_FILE": input_file_name,
"LE_OUTPUT_FILE": 'enriched-' + input_file_name
})
and then in the flow itself:
run_outputs = wrapped_enrich_shell_task.map(chunk_names)
will this work?
and is there a way to make it sequential?Anna Geller
Tom Klein
07/06/2022, 5:01 PMMapped
state…
is that related somehow to not having a LocalExecutor
?state_handlers
based on the solution suggested in this github issue:
https://github.com/PrefectHQ/prefect/issues/3951state_handlers
Kevin Kho
Tom Klein
07/06/2022, 8:36 PMLocalExecutor
it doesn’t seem to make any difference, the tasks won’t progress from a mapped
state and don’t seem to generate any “child tasks”Kevin Kho
for chunk in chunk_names:
wrapped_enrich_shell_task.run(chunk)
so that there is no Prefect Flow and you can see if the loop still hangsTom Klein
07/06/2022, 8:42 PMsaved = save_as_csv(accounts_data)
chunk_names = split_csv(file_name="/tmp/myin.csv", rowsize=5000, upstream_tasks=[saved])
run_output_filenames = wrapped_enrich_shell_task.map(chunk_names, upstream_tasks=[touch_output, chunk_names])
my_output_csvs = read_from_csv.map(run_output_filenames)
my_uploads = wrapped_s3_upload_task.map(my_output_csvs)
first line just saves some snowflake results as CSV
second line splits the big CSV into smaller CSVs
third line is supposed to run a ShellTask on each of the smaller CSV filenames (injecting it as an env
of the shell task)
the last two lines are kinda obvious, read csv and upload to s3Kevin Kho
Tom Klein
07/06/2022, 8:45 PMKevin Kho
@task
def mytask():
...
mytask.run()
will just call the Python function under the hood. No Flow
object needed. This way we can debug the Python under the hoodTom Klein
07/06/2022, 8:46 PMKevin Kho
Tom Klein
07/06/2022, 8:47 PM@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_name):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"starting to work on file {input_file_name}")
output = sh_task.run(command='npm run partial_enrich', env={
"LE_INPUT_FILE": input_file_name,
"LE_OUTPUT_FILE": 'enriched-' + input_file_name
})
return 'enriched-' + input_file_name
Kevin Kho
mytask.map(..)
we try the for loopTom Klein
07/06/2022, 8:47 PMKevin Kho
Tom Klein
07/06/2022, 8:49 PM@task(log_stdout=True)
def wrapped_enrich_shell_task(input_file_names):
logger = prefect.context.get('logger')
output_file_names = []
for input_file_name in input_file_names:
<http://logger.info|logger.info>(f"starting to work on file {input_file_name}")
output = sh_task.run(command='npm run partial_enrich', env={
"LE_INPUT_FILE": input_file_name,
"LE_OUTPUT_FILE": 'enriched-' + input_file_name
})
output_file_names.append('enriched-' + input_file_name)
return output_file_names
Kevin Kho
from prefect import Flow, task
@task
def mytask(x):
return 1
and have a Flow:
with Flow("...") as flow:
mytask.map([1,2,3,4])
take out the Flow and do:
for x in [1,2,3,4]:
mytask(x)
so in your case,
for chunk in chunk_names:
wrapped_enrich_shell_task(chunk)
and just comment out the flow for nowTom Klein
07/06/2022, 8:52 PMKevin Kho
wrapped_enrich_shell_task
and just running something like echo something
and then mapping over that to see if that can reproduce it?Tom Klein
07/06/2022, 8:55 PMKevin Kho
Tom Klein
07/06/2022, 8:57 PM@task
def wrapped_enrich_shell_task2(input_file_name):
sh_task.run(command=f"echo {input_file_name}")
return 'enriched-' + input_file_name
Kevin Kho
npm run
flow.executor = LocalDaskExecutor()
I really don’t know if the result will be the same as LocalExecutor()Tom Klein
07/06/2022, 8:59 PMKevin Kho
Tom Klein
07/06/2022, 9:00 PMLocalDaskExecutor
will execute in parallel, im not interested in parallel excution. i can try it just for the sport but it’s not what im interested inKevin Kho
Tom Klein
07/06/2022, 9:01 PMKevin Kho
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
sh_task = ShellTask(helper_script="cd /usr/src/app",
log_stderr=True,
return_all=True,
stream_output=True,
log_stdout=False)
@task
def wrapped_enrich_shell_task2(input_file_name):
sh_task.run(command=f"echo {input_file_name}")
return 'enriched-' + input_file_name
@task
def get_file_names():
return ['a.txt','b.txt','c.txt','d.txt']
with Flow("..") as flow:
names = get_file_names()
wrapped_enrich_shell_task2.map(names)
flow.run()
Tom Klein
07/06/2022, 9:06 PMtouch_output = sh_task(command="touch src/.env")
chunk_names = get_file_names()
run_output_filenames = wrapped_enrich_shell_task2.map(chunk_names, upstream_tasks=[touch_output, chunk_names])
Kevin Kho
Tom Klein
07/06/2022, 9:09 PM#flow.set_reference_tasks([run_output_filenames])
?Kevin Kho
Tom Klein
07/06/2022, 9:11 PMKevin Kho
from prefect import unmapped
run_output_filenames = wrapped_enrich_shell_task2.map(chunk_names, upstream_tasks=[unmapped(touch_output)])
Tom Klein
07/06/2022, 9:11 PMupstream_tasks
and the other upstream task)Kevin Kho
Tom Klein
07/06/2022, 9:14 PMKevin Kho
Tom Klein
07/06/2022, 9:15 PMLocalExecutor
would that do the trick?)Kevin Kho
from prefect import Flow, task, unmapped
@task
def abc(x, y):
return x+y
with Flow("..") as flow:
# This version is working well
abc.map([1,2,3],[4,5,6])
with Flow("..") as flow2:
# Note for uneven list, it only runs once
abc.map([1,2,3],[4])
@task
def items():
return [1]
with Flow("..") as flow3:
a = items()
# The behavior is similar for upstream mapped tasks.
# They become linked element-wise
abc.map([1,2,3], [4,5,6], upstream_tasks=[a])
with Flow("..") as flow4:
a = items()
# Indicating unmapped will say not to pair them element-wise
abc.map([1,2,3], [4,5,6], upstream_tasks=[unmapped(a)])
flow.run()
flow2.run()
flow3.run()
flow4.run()
Tom Klein
07/06/2022, 9:24 PMupstream
only tells it the dependency though, not that it influences what it tries to run onKevin Kho
unmapped
on upstream will say just run the upstream to completion and then run the downstreamTom Klein
07/06/2022, 9:26 PMKevin Kho
Tom Klein
07/06/2022, 9:26 PMPrefectResult
or something?Kevin Kho
Tom Klein
07/06/2022, 9:29 PMKevin Kho
Tom Klein
07/06/2022, 9:32 PMrun_output_filenames = wrapped_enrich_shell_task.map(chunk_names, upstream_tasks=[unmapped(touch_output)])
my_output_csvs = read_from_csv.map(run_output_filenames)
my_uploads = wrapped_s3_upload_task.map(my_output_csvs)
Kevin Kho
Tom Klein
07/06/2022, 9:33 PMKevin Kho
Tom Klein
07/06/2022, 9:35 PMKevin Kho
Tom Klein
07/06/2022, 9:36 PMKevin Kho
Save File A -> Save File B -> Save File C -> Upload File A -> Upload File B -> Upload File C
and then Save File C
fails, you lose the work of A and B and upon restart, it’s not there to upload anymore. This feels a lot simpler if you directly use the S3ResultTom Klein
07/06/2022, 9:43 PMKevin Kho
Tom Klein
07/06/2022, 9:45 PMKevin Kho
Tom Klein
07/06/2022, 10:09 PMAnna Geller
Tom Klein
07/06/2022, 10:20 PM