Cansu Kavılı
11/21/2022, 8:47 PMDevin McCabe
11/21/2022, 8:54 PMMichael Z
11/21/2022, 9:34 PMAshley Felber
11/21/2022, 10:07 PMMadison Schott
11/21/2022, 10:40 PMFROM prefecthq/prefect:1.4.0-python3.9
and now I get this error when running dbt deps
Ashley Felber
11/21/2022, 11:03 PMSteven Wilber
11/22/2022, 12:02 AMConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 4200)
I'm assuming the flow running in the docker container cannot see the orion server on port 4200, however, I'm not sure what to do to fix that.
No doubt the next failure will be accessing the Airbyte server, but I'm not there yet.
Any help is much appreciated.Ben Muller
11/22/2022, 12:08 AMTuoyi Zhao
11/22/2022, 12:31 AMFernando Silveira
11/22/2022, 3:39 AMDockerContainer
storage block to my deployment indicating that it should just pull my docker image from AWS ECR and run the flow.
However, as far as I understand, the DockerContainer
block is also an infrastructure block and is meant to run the image against docker directly on a machine - i.e., NOT in a KubernetesJob
.
This means that so far, I've been relegated to using the S3
storage block which pushes my flow code to S3, only to pull it back to my docker image at flow run time (when the image already contains all the code it needs to run).
This feels wrong so I'm sure I must be missing something here. How do I configure KubernetesJob
to just pull a docker image and not rely on any additional storage block like S3?
PS: so far, I'm really enjoying how quickly I'm getting up to speed on prefect v2. Despite of kinks like this, I'm really enjoying the development experience.Mahesh
11/22/2022, 3:56 AMdavzucky
11/22/2022, 5:09 AMIkkyu Choi
11/22/2022, 5:44 AMlatif
11/22/2022, 8:07 AMDeepanshu Aggarwal
11/22/2022, 8:22 AMTim-Oliver
11/22/2022, 8:40 AMNic
11/22/2022, 11:00 AMimport unittest
from tasks import address_matching as am
from prefect import flow,task
class TestDatahandling(unittest.TestCase):
def test_tilslutning_forsyning(self):
am.address_matching()
if __name__ == '__main__':
unittest.main()
I get following error.message
ERROR: test_tilslutning_forsyning (__main__.TestDatahandling)
----------------------------------------------------------------------
Traceback (most recent call last):
File "c:\Users\nho\Desktop\git\Prefect\etl\geoserver\tilslutning_forsyning\test.py", line 9, in test_tilslutning_forsyning
am.address_matching()
File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\tasks.py", line 353, in __call__
return enter_task_run_engine(
File "C:\Users\nho\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 674, in enter_task_run_engine
raise RuntimeError(
RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.
Two questions
1. What would the syntax for task.fn() in my example be? I can't get it to run
2. Are there better ways, or best practices of running tests on flows or does this setup seems okay?ash
11/22/2022, 2:51 PMwith Flow("Test flow") as flow:
with DaskCluster(n_workers=n_workers) as client:
data = extract()
processed_data= transform(data)
save_data(processed_data)
The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as transform
function completes , the cluster cleanup
should happen,
Is there any way i can initiate save_data()
after transform()
is done and the cluster_cleanup
is done as well.Xavier Babu
11/22/2022, 4:19 PMPhilip MacMenamin
11/22/2022, 4:50 PM@task(name="Task A")
def task_a(x: int) -> int:
if x == 2:
raise signals.FAIL
return x + 2
@task(name="Task B")
def task_b(x: int) -> int:
return x + 2
@task(name="check")
def checker(x: int) -> None:
if anything_upstream_broke():
this_didn't_work(ID)
else:
this_was_ok(x)
with Flow("x example") as flow:
l = [2,5,7]
la = task_a.map(x=l)
lb = task_b.map(x=la)
lc = checker.map(x=lb)
That is, I have a list of things I want to run through the WF, and I map
these. Sometimes some of the elements in the list won't run properly. I'd like a way to look through all of the upstream tasks and check if any failed and do a thing for that specific input.Dmitrii Egunov
11/22/2022, 6:03 PMJimmy Le
11/22/2022, 6:04 PMJoshua Grant
11/22/2022, 6:47 PMapply_map
? Details in 🧵James Zhang
11/22/2022, 8:09 PMhttpx.ConnectTimeout
error, my prefect-orion runs on our own k8s and should be reachable from our gitlab-ci pipeline, has anyone seen this error? any idea how I could debug? Thanks!jack
11/22/2022, 9:27 PMgeoffc
11/23/2022, 1:31 AMcreate_flow_run.map
will create all of flow runs at once, which is clogging up my agent's backlog when N gets too large. This creates a lot of failures, which occur before my child flow runs can even begin. Unfortunately, I am not the admin and don't have control over the agent we're using, so I don't have a way to easily investigate why this is or resolve it. Even if I was, it seems like kicking off this many flows at once probably isn't a good idea. Thus, I'm wondering if there is some way to throttle create_flow_run
my flow without pre-defining my N child flows. I'm even OK with running all N flows in sequence. I.e. kick off flow run 1 -> wait for flow run 1 -> kick off flow run 2 -> etc.
Here is the structure I'm using now:
def create_wait_on_child(parameters):
create_id = create_flow_run(flow_name="GenericChild",
project_name="Project",
parameters=parameters)
return wait_for_flow_run(create_id,
raise_final_state=True)
with Flow('Parent Flow') as flow:
flow_runs_params_list = get_flow_runs_params()
apply_map(schedule_run_backfill, flow_runs_params_list)
My Question: Is there some way to combine the functionality of`create_flow_run` and wait_for_flow_run
in the same task? I know I can't just add a task()
decorator to create_wait_on_child
in the example above (since that would involve tasks with tasks), but that is the functionality I'm going for. Or does anyone have alternative recommendations?
Appreciate anyone's input.Tim Galvin
11/23/2022, 7:47 AMPekka
11/23/2022, 7:49 AMash
11/23/2022, 11:17 AMwith Flow("Test flow") as flow:
with DaskCluster(n_workers=n_workers) as client:
data = extract()
processed_data= transform(data)
save_data(processed_data)
The problem is dask cluster is not shutting until save_data function completes but i expected that as soon as transform
function completes , the cluster cleanup
should happen,
Is there any way i can initiate save_data()
after transform()
is done and the cluster_cleanup
is done as well.Dan Wise
11/23/2022, 12:54 PMDan Wise
11/23/2022, 12:54 PMAnna Geller
11/23/2022, 1:36 PMDan Wise
11/23/2022, 1:54 PMAnna Geller
11/23/2022, 2:54 PM