Question: if I have a task I want to perform over ...
# ask-community
m
Question: if I have a task I want to perform over all the lines in a file, but have each task run on a line depend on the next (I don't want to continue iterating if a previous line fails), whats the best prefect way to do this? for right now I'm doing:
Copy code
with Flow('file') as flow: 
    name = Parameter('name')    
    lines = read_file(name=name)
    for l in lines:
        do_line(l)
    finish()
I know theres a prefect
Map
functionality, but I didn't see an easy way to make it so each task for each line depends on its previous line task in an iterative manner
c
Hey Michael! This is an interesting question - if you use the
LocalExecutor
with mapping, then your tasks will execute sequentially as you expect. I’m going to keep thinking about this though - I am intrigued by the idea of dynamic dependencies on dynamic tasks
m
yeah I want to use the dask executor so that doesn't really help unfortunately 😞 what I'm doing is creating a for loop and initiliazing tasks based on the number of lines in a file, kinda like above, is that not possible?
and then manually setting the upstream dependency of each new task to the previous instantiated task
c
All Prefect tasks represent deferred computation, so it is possible but only if you read the file at Flow build time, not at runtime
😞 1
so if you know how many lines the file has before the flow runs, then you can make this work
😞 1
m
Could you could use
Task.map
, and then
Flow.add_edge
to introduce new execution dependencies between the mapped tasks?
c
You can’t add edges to a Flow at runtime, because those edges are used when traversing the computational graph
👌 1
m
dang 😞 so I'll need a flow constructor based on the size of the file? that messes up the whole flow unfortuantely 😞 the other option is to just make a REALLY big task which is less robust but ah well
But if I can't make tasks at runtime, looks like thats the solution for now
c
out of curiosity - why do you want to use the dask executor for this? The Flow you appear to be describing doesn’t seem to benefit from parallelism
m
I'm running the same flow for different files, where no files depend on each other, so lines across different files are only dependent within that same file. These "file executions" can all happen in parallel, so i want to run them across dask
idk if that makes sense, but ideally I can say, read in these files and do things, and it will run them all in parallel across the dask architecture
c
each flow run will occur independently / in parallel with other flow runs, so you don’t need dask to parallelize at the flow run level
m
will the localexecutor automatically parrallelize flows?
c
Executors don’t know about Flows. Agents submit Flows to run in whatever execution environment (subprocesses / k8s / fargate / etc). These submissions occur in parallel. Executors are configured on each individual flow - executors are responsible for running the tasks within a single flow.
m
hmmm ok seems like localexecutors may be the better way to go, but the only other issue is that I want the std in/out/error of my underlying python functions I call in these tasks, and dask makes this REALLY easy, I haven't found a really good way to do that with a local executor
i understand tasks have a @task(stdoutlog) value, but that only gets it if I do print("asdf") in the task, and not the stdout of internal functions being called within that task. I really appreciate all this help with abnormal requests haha
c
No worries at all! I think I’m a bit confused - the local executor doesn’t manipulate output in any way so it should be much easier to see the output from your tasks in the local executor vs. the dask executor
You might benefit from the
--show-flow-logs
flag on the agent also
m
I'm not running an agent
c
Ah then all your stdout / stderr from your tasks should already be displayed
m
yeah but its not recorded very easily without doing:
python3 flows.py > out.log
which isn't very scalable when I want this to be process that runs for a while
c
Yea it sounds like you should be using a Prefect backend (either server or Cloud) to manage this and persist your logs
m
yeah lol I should be XD, but another issue is I'm trying to avoid using docker with my project, as I said its a little complicated
c
if you use Cloud you don’t need to use Docker
m
i really like prefect server, its really nice, but its not very lightweight 😕 (versus just only depending on python code)
and I really do want to use cloud 😞 unfortunatley due to what I'm running and where I'm running code we can't have alot of outbound connectivity 😞
c
gotcha - for what it’s worth we have a lot of highly secure clients on Cloud if you’d like to have a discussion on security we’re open to it, but I know these things can be complicated so no pressure!
👍 1
m
yeah thanks! atm i don't think it'd be possible, but I think its something I'd be interested in the future
👍 1