https://prefect.io logo
Title
m

Michael Reeves

08/19/2020, 10:16 PM
Question: if I have a task I want to perform over all the lines in a file, but have each task run on a line depend on the next (I don't want to continue iterating if a previous line fails), whats the best prefect way to do this? for right now I'm doing:
with Flow('file') as flow: 
    name = Parameter('name')    
    lines = read_file(name=name)
    for l in lines:
        do_line(l)
    finish()
I know theres a prefect
Map
functionality, but I didn't see an easy way to make it so each task for each line depends on its previous line task in an iterative manner
c

Chris White

08/19/2020, 11:00 PM
Hey Michael! This is an interesting question - if you use the
LocalExecutor
with mapping, then your tasks will execute sequentially as you expect. I’m going to keep thinking about this though - I am intrigued by the idea of dynamic dependencies on dynamic tasks
m

Michael Reeves

08/19/2020, 11:01 PM
yeah I want to use the dask executor so that doesn't really help unfortunately 😞 what I'm doing is creating a for loop and initiliazing tasks based on the number of lines in a file, kinda like above, is that not possible?
and then manually setting the upstream dependency of each new task to the previous instantiated task
c

Chris White

08/19/2020, 11:02 PM
All Prefect tasks represent deferred computation, so it is possible but only if you read the file at Flow build time, not at runtime
😞 1
so if you know how many lines the file has before the flow runs, then you can make this work
😞 1
m

Maxwell Dylla

08/19/2020, 11:03 PM
Could you could use
Task.map
, and then
Flow.add_edge
to introduce new execution dependencies between the mapped tasks?
c

Chris White

08/19/2020, 11:04 PM
You can’t add edges to a Flow at runtime, because those edges are used when traversing the computational graph
👌 1
m

Michael Reeves

08/19/2020, 11:05 PM
dang 😞 so I'll need a flow constructor based on the size of the file? that messes up the whole flow unfortuantely 😞 the other option is to just make a REALLY big task which is less robust but ah well
But if I can't make tasks at runtime, looks like thats the solution for now
c

Chris White

08/19/2020, 11:05 PM
out of curiosity - why do you want to use the dask executor for this? The Flow you appear to be describing doesn’t seem to benefit from parallelism
m

Michael Reeves

08/19/2020, 11:08 PM
I'm running the same flow for different files, where no files depend on each other, so lines across different files are only dependent within that same file. These "file executions" can all happen in parallel, so i want to run them across dask
idk if that makes sense, but ideally I can say, read in these files and do things, and it will run them all in parallel across the dask architecture
c

Chris White

08/19/2020, 11:09 PM
each flow run will occur independently / in parallel with other flow runs, so you don’t need dask to parallelize at the flow run level
m

Michael Reeves

08/19/2020, 11:10 PM
will the localexecutor automatically parrallelize flows?
c

Chris White

08/19/2020, 11:11 PM
Executors don’t know about Flows. Agents submit Flows to run in whatever execution environment (subprocesses / k8s / fargate / etc). These submissions occur in parallel. Executors are configured on each individual flow - executors are responsible for running the tasks within a single flow.
m

Michael Reeves

08/19/2020, 11:18 PM
hmmm ok seems like localexecutors may be the better way to go, but the only other issue is that I want the std in/out/error of my underlying python functions I call in these tasks, and dask makes this REALLY easy, I haven't found a really good way to do that with a local executor
i understand tasks have a @task(stdoutlog) value, but that only gets it if I do print("asdf") in the task, and not the stdout of internal functions being called within that task. I really appreciate all this help with abnormal requests haha
c

Chris White

08/19/2020, 11:22 PM
No worries at all! I think I’m a bit confused - the local executor doesn’t manipulate output in any way so it should be much easier to see the output from your tasks in the local executor vs. the dask executor
You might benefit from the
--show-flow-logs
flag on the agent also
m

Michael Reeves

08/19/2020, 11:24 PM
I'm not running an agent
c

Chris White

08/19/2020, 11:24 PM
Ah then all your stdout / stderr from your tasks should already be displayed
m

Michael Reeves

08/19/2020, 11:25 PM
yeah but its not recorded very easily without doing:
python3 flows.py > out.log
which isn't very scalable when I want this to be process that runs for a while
c

Chris White

08/19/2020, 11:26 PM
Yea it sounds like you should be using a Prefect backend (either server or Cloud) to manage this and persist your logs
m

Michael Reeves

08/19/2020, 11:26 PM
yeah lol I should be XD, but another issue is I'm trying to avoid using docker with my project, as I said its a little complicated
c

Chris White

08/19/2020, 11:27 PM
if you use Cloud you don’t need to use Docker
m

Michael Reeves

08/19/2020, 11:27 PM
i really like prefect server, its really nice, but its not very lightweight 😕 (versus just only depending on python code)
and I really do want to use cloud 😞 unfortunatley due to what I'm running and where I'm running code we can't have alot of outbound connectivity 😞
c

Chris White

08/19/2020, 11:29 PM
gotcha - for what it’s worth we have a lot of highly secure clients on Cloud if you’d like to have a discussion on security we’re open to it, but I know these things can be complicated so no pressure!
👍 1
m

Michael Reeves

08/19/2020, 11:30 PM
yeah thanks! atm i don't think it'd be possible, but I think its something I'd be interested in the future
👍 1