Nick Hart
02/21/2022, 4:52 PMfrom prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
logger = prefect.context.logger
AttributeError: 'Context' object has no attribute 'logger'
Aaron Rnd
02/21/2022, 4:56 PMRaimundo Pereira De Souza Neto
02/21/2022, 7:58 PMprefect=2.0a12
, and I would like to schedule my flow like a cronJob. It's possible with decorator?
from prefect import flow, task
@task()
def s1(message):
print(message)
@flow() # where I put schedule params?
def update_flow() -> None:
s1("hello prefect")
Anna Geller
02/21/2022, 8:13 PMPatrick Tan
02/21/2022, 8:51 PMDaniel Komisar
02/21/2022, 10:40 PMupdate_agent_config
? Thanks!shijas km
02/22/2022, 4:39 AMNoam polak
02/22/2022, 7:01 AMKonstantin
02/22/2022, 9:48 AMHedgar
02/22/2022, 9:49 AMawswrangler
to save my dataframe to s3. my environment is already awscli
compliant, do I need to go through prefect Storage?
Secondly if I'm doing all this with aws ec2 remote instance how do I activate my flow. Can I do on the command line: python myfirstprefectflow.py
? Assuming I have already indicated a schedule in my flow code ? Or do I use crontab to schedule again 🤔. I would be glad if I could be pointed in the right direction on using aws ec2 to run prefect.Steph Clacksman
02/22/2022, 10:31 AMwith Flow("my-flow) as flow:
# do some things here
# EITHER flow.register("project_name")
# OR flow.run(executor=DaskExecutor())
I want to run my flow through prefect cloud, so I understand that I need to register my flow , but when I want to run it it looks like I need to edit the code, which seems a bit weird when it's dockerised and running on a remote server.
Is it normal to register the flow like above before building the docker image, then switch it to the run command before building? Or is there something I'm missing?Daniel Nilsen
02/22/2022, 10:44 AMModuleNotFoundError("No module named 'parameters'")
root
- src
- flows
- flow_1
- flow_1.py
- parameters.py
I am registering the flow with this
bin/prefect register --project «myProject» --path ./src/flows/flow_1/flow_1.py
When I run the flow locally with flow.run()
it works fine 🤔Frederick Thomas
02/22/2022, 1:02 PMFrederick Thomas
02/22/2022, 4:13 PMDonnchadh McAuliffe
02/22/2022, 4:37 PM@task(retries=2, retry_delay_seconds=10)
def task_1(time_to_sleep: int):
raise Exception()
time.sleep(time_to_sleep)
@task(retries=3, retry_delay_seconds=10)
def task_2(time_to_sleep: int):
time.sleep(time_to_sleep)
@flow(name="steps_flow_dask", task_runner=DaskTaskRunner(address={dask_address}))
def flow_to_run_tasks():
task_1(3)
task_2(2)
Currently when task_1
throws the exception task_2
immediately executes. The behaviour I want is that task_2
doesn't execute unless task_1
is successful. Also, if task_1
hits its' maximum number of retries then task_2
should not execute.
Any ideas? This is on Orion.David Elliott
02/22/2022, 4:53 PMDeliveroo
prefect tenant, happy to send over some specifics / URLs if helpfulGonzalo Stillo
02/22/2022, 5:04 PMbrian
02/22/2022, 5:21 PMMehdi Nazari
02/22/2022, 5:28 PMMatthew Seligson
02/22/2022, 5:56 PMScott Aefsky
02/22/2022, 6:57 PMJacqueline Riley Garrahan
02/22/2022, 8:05 PMruns = FlowRunView._query_for_flow_run(where={"flow_id": {"_eq": id}})
Constantino Schillebeeckx
02/22/2022, 8:59 PMRUNNING
tasks like those shown below which have been running for days? Are those truly still running? Am I being billed for idle compute here?Chris Martinez
02/22/2022, 9:00 PM0.15.12
from 0.14.21
Failed to load and execute Flow's environment: NotGitRepository('No git repository was found ...
I am using the git storage interface and have no issues when using version 0.14.21
Matthias
02/22/2022, 9:04 PMprefect run
in Orion? Or do you have to run flows with python flow.py
?Lee Cullen
02/22/2022, 9:11 PMFailed to load and execute Flow's environment: ValueError('No flows found in file.')
.
The flow context manager is defined within the body of a function that returns a flow, I'm wondering if that is the reason why cloud can't find the flow?Leon Kozlowski
02/22/2022, 9:28 PMdbt-ol run
)Wesley Jin
02/22/2022, 10:08 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named 'constants'")
I’m importing constants which will be shared across many flows: the project name, ECS cluster name, Github Token secret name from a constants.py
file in the same directory. Running the flow locally with flow.run()
works just fine as expected, but when running it in ECS constants
is not found. How do I register other module dependencies with Github storage, if possible? Or do I have to add these constant values to each of my flow files? Thank you!
Example below:
. (repo root)
|____flows
| |____hello.py
| |____constants.py
| |______init__.py
Jack Chang
02/22/2022, 10:17 PMJack Chang
02/22/2022, 10:17 PMAnna Geller
02/22/2022, 10:19 PMJack Chang
02/22/2022, 10:21 PM