https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jerry Thomas

08/23/2019, 11:31 AM
This code just hangs after the logger. It seems to have something to do with the fact that I am reading a file.
Copy code
python
def get_data():
    with open("data/data.json","r") as f:
        input = json.load(f)
    return input

@task
def test(config):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(config)

def main(values):
  with Flow("custom") as flow:
      logger = get_logger("Flow")
      <http://logger.info|logger.info>("flow started")
      <http://logger.info|logger.info>(values)  # this works
      test(values)  # this does not start

    flow.run()

main(get_data())
If I use the flow logger to print the values fetched from a file it works, but the task does not seem to accept the values when it is fetched from a file. If I convert get_data() into a task and run it on a distributed environment where would it read from?
c

Chris White

08/24/2019, 2:49 AM
the
test(values)
line that you have commented is within a Flow context which is just building the flow (not executing it), so I wouldn’t expect it to run until
flow.run()
is called. The behavior here is independent of how your
get_data
function is implemented, so I’m not sure why you’re seeing a difference there
If I convert get_data() into a task and run it on a distributed environment where would it read from?
If you’ve implemented it to read from a file, it will attempt to locate / read from the file in the local file system of whatever machine the task is running on — this is why depending on filesystems isn’t usually a good pattern for distributed computing
j

Jerry Thomas

08/24/2019, 6:29 AM
Chris, The code includes flow.run(). When I have a file that has few lines everything works. Which means that the task
test
also logs the data on the screen.
Copy code
python pf1.py
[2019-08-24 06:26:27,319] INFO - prefect.Flow | flow started
[2019-08-24 06:26:27,320] INFO - prefect.FlowRunner | Beginning Flow run for 'custom'
[2019-08-24 06:26:27,321] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-24 06:26:27,326] INFO - prefect.TaskRunner | Task '4': Starting task run...
[2019-08-24 06:26:27,327] INFO - prefect.TaskRunner | Task '4': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,327] INFO - prefect.TaskRunner | Task '3': Starting task run...
[2019-08-24 06:26:27,328] INFO - prefect.TaskRunner | Task '3': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,328] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-08-24 06:26:27,329] INFO - prefect.TaskRunner | Task '1': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,329] INFO - prefect.TaskRunner | Task '2': Starting task run...
[2019-08-24 06:26:27,330] INFO - prefect.TaskRunner | Task '2': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,330] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2019-08-24 06:26:27,331] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,331] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2019-08-24 06:26:27,332] INFO - prefect.Task | [1, 2, 3, 4]
[2019-08-24 06:26:27,332] INFO - prefect.TaskRunner | Task 'test': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,332] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
However when I have a larger file this is where it gets stuck. The task never moves forward. To check if the file is read, I added the logger in the flow and that prints out but the test function does not. What could be the issue?
Copy code
python pf1.py
[2019-08-24 06:24:13,090] INFO - prefect.Flow | flow started
It’s a 30m file. General read takes only a few seconds, but flow seems to be causing the data to be evaluated in a distributed manner for each nested element. The way this code is written the file is read and the values are passed to the flow, but inside the flow it looks like the data has to be recreated element wise. For the simple example where the data was a simple number list each number had a task auto created and a task for the list also. So for a deeply nested json data of 5000 items with 100 keys and more nested paths this will ed up creating a very huge tree of tasks. Is it possible to process the input without having to generate a huge tree of sub tasks?
c

Chris White

08/24/2019, 6:35 PM
ah my apologies - I understand. If this configuration object is truly unchanging, you can wrap it in a
Constant
task to prevent the auto-generation:
Copy code
from prefect.tasks.core.constants import Constant

test(Constant(values)) # treats the entire object as a fixed Constant
alternatively, if you do intend to change this from run-to-run, I’d recommend using a
Parameter
instead:
Copy code
from prefect import Parameter

data = Parameter("values", default=values)
test(data)
either way will prevent the deep auto-generation of constants
j

Jerry Thomas

08/27/2019, 8:38 AM
Thanks Chris.