John Shearer
12/17/2021, 2:33 PMPedro Machado
12/17/2021, 3:30 PMFilterTask
as explained here to allow the reduce task to save the data. It works ok with small results, but when I try it with the real API output, I am getting State payload is too large
I tried setting result=None
for this task but that did not fix it. Any suggestions?
Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I also tried setting the trigger on the reduce task to trigger=any_successful
and it worked but I wonder why the filter task is not working. Thanks!Alejandro Sanchez Losa
12/17/2021, 3:44 PMAlejandro Sanchez Losa
12/17/2021, 4:12 PMJadei
12/17/2021, 5:45 PMfrom prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
weekday_schedule = CronSchedule(
"30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
)
with Flow("parent-flow", schedule=weekday_schedule) as flow:
# assumes you have registered the following flows in a project named "examples"
flow_a = create_flow_run(flow_name="A", project_name="examples")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="examples")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_c = create_flow_run(flow_name="C", project_name="examples")
wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
flow_d = create_flow_run(flow_name="D", project_name="examples")
wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
b = wait_for_flow_b(upstream_tasks=[wait_for_flow_a])
c = wait_for_flow_c(upstream_tasks=[wait_for_flow_a])
d = wait_for_flow_d(upstream_tasks=[b, c])
flow.run()
Constantino Schillebeeckx
12/17/2021, 5:52 PM@task
def check_cond():
return False
@task
def run_task(anchor_date, days_from):
logger = prefect.context.logger
logger.critical(f"{days_from=}")
with Flow('foo') as flow:
anchor_date = Parameter(name="anchor_date", default=None)
cond = check_cond()
with case(cond, True):
anchor_date = run_task(anchor_date, days_from=0)
with case(cond, False):
anchor_date = run_task(anchor_date, days_from=-1)
however the False
condition never gets run. When check_cond
returns True
- the flows runs as expected. Am I missing something?Chris Reuter
12/17/2021, 8:55 PMon Youtubeā¾
Danny Vilela
12/17/2021, 9:51 PMtoday
template filled in a result location
or task target
string? I keep getting a `KeyError`:
from prefect.engine.results import LocalResult
# Create a new local result pointing to current working directory.
result = LocalResult(dir=".", location="{today}.txt")
assert result.location == "{today}.txt"
# This raises `KeyError: 'today'`.
result.write(value_="hello!")
Iām following this doc: https://docs.prefect.io/core/concepts/results.html#choose-a-serializer (under āTemplating Result
locationsā)Ovo Ojameruaye
12/17/2021, 10:44 PMDanny Vilela
12/17/2021, 10:58 PMlocation
or task target
? I know the Prefect context gives you a bunch of variables ā notably including parameters
, which looks promising! ā but what if we have some data (the strings "foo"
or "bar"
) computed by some task compute_foo_or_bar
and weād like to reference that output in a subsequent taskās output target?Jelle Vegter
12/18/2021, 12:40 PMAlejandro Sanchez Losa
12/18/2021, 2:30 PMOvo Ojameruaye
12/19/2021, 1:31 AMConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
. I can't tell if this is related. I created a config.toml file
# base configuration directory
home_dir = "C:/Users/XXX/.prefect"
backend = "server"
[server]
host = "<http://XXXXXX>"
port = "4200"
host_port = "4200"
endpoint = "${server.host}:${server.port}"
I am certain the server is running on the host ip addressAnh Nguyen
12/19/2021, 12:47 PMYash
12/19/2021, 1:19 PMale
12/20/2021, 9:47 AMSamuel Hinton
12/20/2021, 1:14 PMtimeout
which we can pass in, but Im currently experience some odd issues where tasks seem to be lost in dask somewhere (they are submitted to dask but never come back, never time out), and this means my flows never end. Ideally Ill try to dig into our env and dask and prefect and figure out what is causing silent untracked failure, but as an interim solution, does anyone know of a way I can say āCancel the flows and all tasks if its been an hour since you started?āBrian Phillips
12/20/2021, 1:52 PMjars
12/20/2021, 1:54 PMAnna Geller
12/20/2021, 1:59 PMAnna Geller
12/20/2021, 2:24 PMjars
12/20/2021, 2:29 PMDaniel Komisar
12/20/2021, 3:21 PMDekel R
12/20/2021, 3:25 PMraise InterruptedError(line.get("error"))
InterruptedError: name invalid: Request contains an invalid argument.
In order to register my flow I use the following command -
prefect register --project "Project_name" --path path_to_flow_file.py --name "brands-recognition"
Project_name is the project I have at Prefect cloud
path_to_flow_file is the path to the file which contains my flow
brands-recognition is the flow name.
This is my flow snippet (for debug purposes - just 2 tasks)-
schedule = IntervalSchedule(interval=timedelta(days=7))
with Flow("brands-recognition",
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
dockerfile="./Dockerfile"), schedule=schedule) as flow: # , schedule=schedule
mode = Parameter('FULL_UPDATE', default=None)
task_a(mode)
flow.run_config = VertexRun(machine_type='e2-standard-8', labels=["ml"],
service_account='prefect-integration@xx')
# flow.run()
Now when running locally - the flow runs smoothly and generates a result.
It seems like the problem is only at the pushing phase (when I register the flow).
What am I missing?
Thanks.Constantino Schillebeeckx
12/20/2021, 3:35 PMRun History
keeps re-materializing as I scrollAlejandro Sanchez Losa
12/20/2021, 4:39 PMAlejandro Sanchez Losa
12/20/2021, 4:41 PMBilly McMonagle
12/20/2021, 4:51 PMVipul
12/20/2021, 5:00 PMKyle Heath
12/20/2021, 9:01 PMKyle Heath
12/20/2021, 9:01 PMAnna Geller
12/20/2021, 9:14 PM