https://prefect.io logo
n

Nick Hart

02/21/2022, 4:52 PM
I'm looking to run create_flow_runs using threading for a custom module I'm writing and for some reason I'm getting an attribute error. Below is my test code and the error. Would you know how to fix this? Also, I'm assuming this is not a prefect problem and more of a threading problem, but I was hoping someone would be able to help! Thanks in advance
Copy code
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading

def thread_flows(flowname):
    print("Running thread for: ",flowname)
    flow_id = create_flow_run.run(flow_name=flowname)
    flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#

if __name__ == "__main__":
    flow_list = ["FlowA", "FlowB", "FlowC"]

    threads = []
    for flowname in flow_list:
        x = threading.Thread(target = thread_flows, args=(flowname,))
        threads.append(x)
        x.start()

    for thread in threads:
        thread.join()
Copy code
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
    self._target(*self._args, **self._kwargs)
  File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
    flow_id = create_flow_run.run(flow_name=flowname)
  File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
    logger = prefect.context.logger
AttributeError: 'Context' object has no attribute 'logger'
z

Zanie

02/21/2022, 4:53 PM
Hey Nick,
create_flow_run
is intended to be called from a
Flow
n

Nick Hart

02/21/2022, 4:54 PM
I've also tried this code but I get a new error:
Copy code
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading

def thread_flows(flowname):
    print("Running thread for: ",flowname)
    with prefect.context(logger = prefect.context.get("logger")):
        flow_id = create_flow_run.run(flow_name=flowname)
        flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#

if __name__ == "__main__":
    flow_list = ["FlowA", "FlowB", "FlowC"]

    threads = []
    for flowname in flow_list:
        x = threading.Thread(target = thread_flows, args=(flowname,))
        threads.append(x)
        x.start()

    for thread in threads:
        thread.join()
Copy code
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/test/Documents/create-flow1.py", line 8, in thread_flows
    flow_id = create_flow_run.run(flow_name=flowname)
  File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 124, in create_flow_run
    logger.debug("Looking up flow metadata...")
AttributeError: 'NoneType' object has no attribute 'debug'
z

Zanie

02/21/2022, 4:54 PM
I see you’re using
.run
to call the task outside a flow. I think the issue here is that the context does not pass data across threads and the logger is only present in the main thread.
upvote 1
You’d have to send the logger as an argument to your thread and retrieve it in your main block not in the thread target function.
n

Nick Hart

02/21/2022, 4:55 PM
@Zanie I usually like to call create_flow_run within a flow but for our module we do not want it to be a flow 😕 and how would I go about sending the logger as an argument to my thread?
z

Zanie

02/21/2022, 4:56 PM
Copy code
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading


def thread_flows(flowname, logger):
    print("Running thread for: ", flowname)
    with prefect.context(logger=logger):
        flow_id = create_flow_run.run(flow_name=flowname)
        flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)  #


if __name__ == "__main__":
    flow_list = ["FlowA", "FlowB", "FlowC"]

    threads = []
    for flowname in flow_list:
        x = threading.Thread(
            target=thread_flows, args=(flowname, prefect.context.logger)
        )
        threads.append(x)
        x.start()

    for thread in threads:
        thread.join()
a

Anna Geller

02/21/2022, 4:57 PM
this may also be helpful
n

Nick Hart

02/21/2022, 5:10 PM
Oh wow! that worked I didn't even think of passing it as a variable 🙌 Thanks @Zanie and thank you @Anna Geller for the other example!
8 Views