davzucky
11/22/2022, 5:09 AMIkkyu Choi
11/22/2022, 5:44 AMlatif
11/22/2022, 8:07 AMDeepanshu Aggarwal
11/22/2022, 8:22 AMTim-Oliver
11/22/2022, 8:40 AMNic
11/22/2022, 11:00 AMimport unittest
from tasks import address_matching as am
from prefect import flow,task
class TestDatahandling(unittest.TestCase):
def test_tilslutning_forsyning(self):
am.address_matching()
if __name__ == '__main__':
unittest.main()
I get following error.message
ERROR: test_tilslutning_forsyning (__main__.TestDatahandling)
----------------------------------------------------------------------
Traceback (most recent call last):
File "c:\Users\nho\Desktop\git\Prefect\etl\geoserver\tilslutning_forsyning\test.py", line 9, in test_tilslutning_forsyning
am.address_matching()
File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\tasks.py", line 353, in __call__
return enter_task_run_engine(
File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 674, in enter_task_run_engine
raise RuntimeError(
RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.
Two questions
1. What would the syntax for task.fn() in my example be? I can't get it to run
2. Are there better ways, or best practices of running tests on flows or does this setup seems okay?ash
11/22/2022, 2:51 PMwith Flow("Test flow") as flow:
with DaskCluster(n_workers=n_workers) as client:
data = extract()
processed_data= transform(data)
save_data(processed_data)
The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as transform
function completes , the cluster cleanup
should happen,
Is there any way i can initiate save_data()
after transform()
is done and the cluster_cleanup
is done as well.Xavier Babu
11/22/2022, 4:19 PMPhilip MacMenamin
11/22/2022, 4:50 PM@task(name="Task A")
def task_a(x: int) -> int:
if x == 2:
raise signals.FAIL
return x + 2
@task(name="Task B")
def task_b(x: int) -> int:
return x + 2
@task(name="check")
def checker(x: int) -> None:
if anything_upstream_broke():
this_didn't_work(ID)
else:
this_was_ok(x)
with Flow("x example") as flow:
l = [2,5,7]
la = task_a.map(x=l)
lb = task_b.map(x=la)
lc = checker.map(x=lb)
That is, I have a list of things I want to run through the WF, and I map
these. Sometimes some of the elements in the list won't run properly. I'd like a way to look through all of the upstream tasks and check if any failed and do a thing for that specific input.Dmitrii Egunov
11/22/2022, 6:03 PMJimmy Le
11/22/2022, 6:04 PMJoshua Grant
11/22/2022, 6:47 PMapply_map
? Details in 🧵James Zhang
11/22/2022, 8:09 PMhttpx.ConnectTimeout
error, my prefect-orion runs on our own k8s and should be reachable from our gitlab-ci pipeline, has anyone seen this error? any idea how I could debug? Thanks!jack
11/22/2022, 9:27 PMGeoff Coyner
11/23/2022, 1:31 AMcreate_flow_run.map
will create all of flow runs at once, which is clogging up my agent's backlog when N gets too large. This creates a lot of failures, which occur before my child flow runs can even begin. Unfortunately, I am not the admin and don't have control over the agent we're using, so I don't have a way to easily investigate why this is or resolve it. Even if I was, it seems like kicking off this many flows at once probably isn't a good idea. Thus, I'm wondering if there is some way to throttle create_flow_run
my flow without pre-defining my N child flows. I'm even OK with running all N flows in sequence. I.e. kick off flow run 1 -> wait for flow run 1 -> kick off flow run 2 -> etc.
Here is the structure I'm using now:
def create_wait_on_child(parameters):
create_id = create_flow_run(flow_name="GenericChild",
project_name="Project",
parameters=parameters)
return wait_for_flow_run(create_id,
raise_final_state=True)
with Flow('Parent Flow') as flow:
flow_runs_params_list = get_flow_runs_params()
apply_map(schedule_run_backfill, flow_runs_params_list)
My Question: Is there some way to combine the functionality of`create_flow_run` and wait_for_flow_run
in the same task? I know I can't just add a task()
decorator to create_wait_on_child
in the example above (since that would involve tasks with tasks), but that is the functionality I'm going for. Or does anyone have alternative recommendations?
Appreciate anyone's input.Tim Galvin
11/23/2022, 7:47 AMPekka
11/23/2022, 7:49 AMash
11/23/2022, 11:17 AMwith Flow("Test flow") as flow:
with DaskCluster(n_workers=n_workers) as client:
data = extract()
processed_data= transform(data)
save_data(processed_data)
The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as transform
function completes , the cluster cleanup
should happen,
Is there any way i can initiate save_data()
after transform()
is done and the cluster_cleanup
is done as well.Dan Wise
11/23/2022, 12:54 PMVadym Dytyniak
11/23/2022, 1:01 PMJared Robbins
11/23/2022, 1:16 PMTibs
11/23/2022, 1:30 PMSlackbot
11/23/2022, 1:49 PMTim-Oliver
11/23/2022, 2:11 PMasyncio
exception when using DaskTaskRunner
which leads to tasks hanging and not completing. -->Khuyen Tran
11/23/2022, 3:28 PMLuca Schneider
11/23/2022, 4:29 PMPREFECT_LOGGING_EXTRA_LOGGERS
Has it to be set in the flow, on the agent or on the orion server ? ThanksEsdras Lopes Nani
11/23/2022, 4:30 PMAgent started! Looking for work from queue(s): xxxxx...
Don't know what else I can do 😅
Thanks!Tim-Oliver
11/23/2022, 4:59 PMPermission denied
errors with the github storage blocks since today. Has anyone seen this as well? The error appears if the github repo is already present from a previous run. If the repo is deleted the gitrepo is cloned without any error.Chris Marchetti [Datateer]
11/23/2022, 6:31 PMUnknown Opcode
error in our prefect flow. I looked up the error and saw several issues relating to mismatched python versions. I have verified that we are using python 3.8 (3.87 and 3.8) for everything. Does anyone have any ideas why this error may be occurring? Other tasks in our flow ran successfully. Only one failed.Ashley Felber
11/23/2022, 6:40 PM