Jerry Thomas
08/23/2019, 11:31 AMpython
def get_data():
with open("data/data.json","r") as f:
input = json.load(f)
return input
@task
def test(config):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(config)
def main(values):
with Flow("custom") as flow:
logger = get_logger("Flow")
<http://logger.info|logger.info>("flow started")
<http://logger.info|logger.info>(values) # this works
test(values) # this does not start
flow.run()
main(get_data())
If I use the flow logger to print the values fetched from a file it works, but the task does not seem to accept the values when it is fetched from a file. If I convert get_data() into a task and run it on a distributed environment where would it read from?Chris White
08/24/2019, 2:49 AMtest(values)
line that you have commented is within a Flow context which is just building the flow (not executing it), so I wouldn’t expect it to run until flow.run()
is called. The behavior here is independent of how your get_data
function is implemented, so I’m not sure why you’re seeing a difference thereIf I convert get_data() into a task and run it on a distributed environment where would it read from?If you’ve implemented it to read from a file, it will attempt to locate / read from the file in the local file system of whatever machine the task is running on — this is why depending on filesystems isn’t usually a good pattern for distributed computing
Jerry Thomas
08/24/2019, 6:29 AMtest
also logs the data on the screen.
python pf1.py
[2019-08-24 06:26:27,319] INFO - prefect.Flow | flow started
[2019-08-24 06:26:27,320] INFO - prefect.FlowRunner | Beginning Flow run for 'custom'
[2019-08-24 06:26:27,321] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-24 06:26:27,326] INFO - prefect.TaskRunner | Task '4': Starting task run...
[2019-08-24 06:26:27,327] INFO - prefect.TaskRunner | Task '4': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,327] INFO - prefect.TaskRunner | Task '3': Starting task run...
[2019-08-24 06:26:27,328] INFO - prefect.TaskRunner | Task '3': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,328] INFO - prefect.TaskRunner | Task '1': Starting task run...
[2019-08-24 06:26:27,329] INFO - prefect.TaskRunner | Task '1': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,329] INFO - prefect.TaskRunner | Task '2': Starting task run...
[2019-08-24 06:26:27,330] INFO - prefect.TaskRunner | Task '2': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,330] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2019-08-24 06:26:27,331] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,331] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2019-08-24 06:26:27,332] INFO - prefect.Task | [1, 2, 3, 4]
[2019-08-24 06:26:27,332] INFO - prefect.TaskRunner | Task 'test': finished task run for task with final state: 'Success'
[2019-08-24 06:26:27,332] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
However when I have a larger file this is where it gets stuck. The task never moves forward. To check if the file is read, I added the logger in the flow and that prints out but the test function does not. What could be the issue?
python pf1.py
[2019-08-24 06:24:13,090] INFO - prefect.Flow | flow started
Chris White
08/24/2019, 6:35 PMConstant
task to prevent the auto-generation:
from prefect.tasks.core.constants import Constant
test(Constant(values)) # treats the entire object as a fixed Constant
alternatively, if you do intend to change this from run-to-run, I’d recommend using a Parameter
instead:
from prefect import Parameter
data = Parameter("values", default=values)
test(data)
Jerry Thomas
08/27/2019, 8:38 AM