Nick Hart
02/21/2022, 4:52 PMfrom prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
logger = prefect.context.logger
AttributeError: 'Context' object has no attribute 'logger'
Zanie
create_flow_run
is intended to be called from a Flow
Nick Hart
02/21/2022, 4:54 PMimport prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
with prefect.context(logger = prefect.context.get("logger")):
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 8, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 124, in create_flow_run
logger.debug("Looking up flow metadata...")
AttributeError: 'NoneType' object has no attribute 'debug'
Zanie
.run
to call the task outside a flow. I think the issue here is that the context does not pass data across threads and the logger is only present in the main thread.Nick Hart
02/21/2022, 4:55 PMZanie
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname, logger):
print("Running thread for: ", flowname)
with prefect.context(logger=logger):
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True) #
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(
target=thread_flows, args=(flowname, prefect.context.logger)
)
threads.append(x)
x.start()
for thread in threads:
thread.join()
Anna Geller
Nick Hart
02/21/2022, 5:10 PM