Richard Hughes
02/18/2022, 8:28 PMWilliam Grim
02/18/2022, 8:39 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'create_params_file\' on <module \'our_filename.py\' from \'/our_filename.py\'>")')
The signature of create_params_file
, which is not a task but a method that can be called looks like:
def create_params_file(base_filename: str, **kwargs) -> str:
Henning Holgersen
02/18/2022, 9:15 PMDexter Antonio
02/18/2022, 9:36 PMMY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
prefect.config.flows.checkpointing = True
!export PREFECT__FLOWS__CHECKPOINTING=true
with Flow("please work", result=MY_RESULTS) as f:
t1 = my_task()
state = f.run()
!aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder> # nothing is here
Is there something obvious, which I am missing?Brian Lorenz
02/19/2022, 12:27 AMline 282, in get_profile_context raise MissingContextError("No profile context found.")
Any suggestions on how to fix this?Heeje Cho
02/19/2022, 1:07 AMcreate_flow_run
to create a persistent scheduled flow run? Not a flow that runs only ones at a scheduled time but a flow that runs at intervals?Aric Huang
02/19/2022, 1:38 AMmap
- With the following sample flow, I was expecting the mapped task f
to run concurrently with wait
because there are no dependencies and LocalDaskExecutor
is being used. However, the behavior I see is that only `wait`'s mapped tasks get executed, so f
is not executed until all the wait
tasks return.
from prefect import Flow, task
import time
from prefect.executors import LocalDaskExecutor
@task
def f(x):
return x*2
@task
def wait(x):
time.sleep(x)
with Flow("test") as flow:
a = list(range(4))
wait.map(a)
result = f.map(a)
flow.executor = LocalDaskExecutor()
Kivanc Yuksel
02/19/2022, 3:18 PMtarget
, however, from time to time I want to re-run these tasks without manually deleting target files. Is there a way to "force" re-run for such tasks?Dexter Antonio
02/19/2022, 5:58 PMOmar Sultan
02/19/2022, 8:55 PMSamay Kapadia
02/20/2022, 2:35 PMpip install prefect[azure]
on my m1 mac 😞Brian Lorenz
02/21/2022, 1:09 AMMax Lei
02/21/2022, 4:39 AMAntonio Manuel BR
02/21/2022, 7:47 AMGuillaume Latour
02/21/2022, 9:20 AMMichael Hadorn
02/21/2022, 11:15 AMDotan Asselmann
02/21/2022, 12:09 PMiñigo
02/21/2022, 12:15 PMLucas Hosoya
02/21/2022, 12:54 PMArnaldo Russo
02/21/2022, 2:02 PMnew_flow_context=prefect.context.get('config')
Tomek Florek
02/21/2022, 2:29 PMMarwan Sarieddine
02/21/2022, 3:19 PMAqib Fayyaz
02/21/2022, 3:28 PMNick Hart
02/21/2022, 4:52 PMfrom prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import threading
def thread_flows(flowname):
print("Running thread for: ",flowname)
flow_id = create_flow_run.run(flow_name=flowname)
flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
if __name__ == "__main__":
flow_list = ["FlowA", "FlowB", "FlowC"]
threads = []
for flowname in flow_list:
x = threading.Thread(target = thread_flows, args=(flowname,))
threads.append(x)
x.start()
for thread in threads:
thread.join()
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
self._target(*self._args, **self._kwargs)
File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
flow_id = create_flow_run.run(flow_name=flowname)
File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
logger = prefect.context.logger
AttributeError: 'Context' object has no attribute 'logger'
Aaron Rnd
02/21/2022, 4:56 PMRaimundo Pereira De Souza Neto
02/21/2022, 7:58 PMprefect=2.0a12
, and I would like to schedule my flow like a cronJob. It's possible with decorator?
from prefect import flow, task
@task()
def s1(message):
print(message)
@flow() # where I put schedule params?
def update_flow() -> None:
s1("hello prefect")
Anna Geller
Patrick Tan
02/21/2022, 8:51 PMDaniel Komisar
02/21/2022, 10:40 PMupdate_agent_config
? Thanks!shijas km
02/22/2022, 4:39 AM