Hey everyone, I am having some weirdness when invo...
# ask-community
a
Hey everyone, I am having some weirdness when invoking a child flow from a parent using
flow.from_source
. The child flow consists of a sequence of tasks. The first task is unmapped and outputs a list, which is fed into a sequence of mapped tasks. The mapped tasks all converge on one final unmapped task which collects the upstream results and performs a single action. The flow works perfectly when I run it directly, but when I run it using
flow.from_source
, it runs only the first and last unmapped tasks. I was under the impression that it should execute exactly the same way regardless of how it's invoked. Really hoping someone can help me out on this one because I'm totally stumped. Thanks!
n
hey @Alex de Geofroy - do you have an MRE for this by chance? that would be helpful 🙏
a
Hey @Nate, forgive my ignorance but I'm not sure what you mean by MRE
n
a Minimal Reproducible Example, @Marvin can you give a very simple example of a dummy MRE - no commentary
m
Sure, here's a simple example of a Minimal Reproducible Example (MRE):
Copy code
# Import necessary libraries
import pandas as pd

# Create a simple dataframe
df = pd.DataFrame({
    'A': [1, 2, 3, 4, 5],
    'B': [5, 15, 10, 20, 15]
})

# Define a function to calculate average
def calculate_average(column):
    return column.mean()

# Apply the function to the 'B' column
average = calculate_average(df['B'])

print(average)
This MRE creates a simple dataframe and calculates the average of one of its columns. It's a complete, self-contained piece of code that can be run independently to reproduce the issue (in this case, calculating the average).
n
well i forgot he doesnt have the thread history whoops, but yeah a minimal example using flow.from_source with a subflow as you described
a
Thanks Nate. I have actually figured out what my issue was. As it turns out, when you run a flow from source, the .py file referenced in the entrypoint is cloned to a temporary directory. In the first task in my flow I was calling
os.path.dirname(__file__)
as a reference point for reading some files. Simply changing to use
os.getcwd()
fixed the problem. It would be nice if the documentation was explicit about the behavior of flow.from_source and that it clones to a temp directory outside of the CWD.
n
oh okay, thanks for following up! using tmp dirs for stuff like that is common pattern for us. you could run a process worker against a process work pool if you wanted to have direct control over the working directory though
👍 1
a
Thanks @Nate I appreciate your help with this.