Daniel Komisar
02/16/2022, 8:16 PMwhere
filter with a task id, but it works when I query by flow run id.Daniel Komisar
02/16/2022, 9:23 PMcase(False)
?Farid
02/16/2022, 9:32 PMResult
using Prefect? I dump the dict objects to str
JSONs using json.dumps
and then save them using S3Result
or LocalResult
and noticed both of them get some extra characters added to the beginning or end of json object:
��QXQ >> to the beginning
�. >> to the end
which makes it un-parseable.
Saving the same object using with open()
on local machine does not bear those extra charactersKelly Huang
02/17/2022, 1:37 AMAn Hoang
02/17/2022, 5:02 AMtask_B
) downstream of a mapped task (task_A
). Some of the input for task_B
(output of task_A
) is prefect.engine.signals.SKIP(None)
. In task_B
, do I have to filter out these skip signals? If output of task_B
is mapped to task_C
... task_X
, do I have to do if isinstance(input, prefect.signals.SKIP): raise prefect.signals.SKIP
for every single task in the chain?Ayah Safeen
02/17/2022, 8:46 AMdamien michelle
02/17/2022, 10:50 AMDaniel Nilsen
02/17/2022, 12:47 PMdata
. But when I try to access this in the flow I get 'FunctionTask' object has no attribute 'data'
. How do I access the response.data?
with Flow('data_transformation') as flow:
response = myTask()
response.data //error
Matthias Roels
02/17/2022, 1:07 PMMarcel M
02/17/2022, 3:16 PMMarcel M
02/17/2022, 3:17 PMTim Enders
02/17/2022, 4:04 PMflow.run()
?Lorenzo Randazzo
02/17/2022, 4:26 PMAndrew Lawlor
02/17/2022, 5:37 PMDavid Yang
02/17/2022, 6:43 PMRio McMahon
02/17/2022, 8:54 PMState Message: {'_schema': 'Invalid data type: None'}
but I am unclear on what that means. Could you clarify what this might indicate or ways to get more informative error messages? Thanks.Heeje Cho
02/17/2022, 9:30 PMBilly McMonagle
02/17/2022, 9:50 PMLee Cullen
02/18/2022, 2:21 AMInvalid Project!
even though the project exists.Romain
02/18/2022, 7:21 AMDaniel Nilsen
02/18/2022, 10:12 AMMichael Hadorn
02/18/2022, 10:47 AMBogdan Bliznyuk
02/18/2022, 11:50 AMaz acr login
But, it seems that prefect docker agent only reads the token during start and stores it in memory. Unless we restart the prefect docker agent, it is unable to pull docker image flows after 3hrs (the acr token's expired)iñigo
02/18/2022, 1:57 PMAndrew Black
02/18/2022, 2:01 PMChristopher Schmitt
02/18/2022, 3:09 PMPedro Machado
02/18/2022, 4:00 PMprefect register
even though the code has not changed? The only thing I see that could change is the Docker tag which I am not setting explicitly. Would this cause it to increment the version number? Docker build log in thread.Olivér Atanaszov
02/18/2022, 4:06 PM[2022-02-17 12:29:19+0000] INFO - prefect.wait_for_flow_run | Flow 'affable-piculet-flow-build│
-dataset': Entered state <Failed>: Unexpected error while running flow: KeyError('Task slug do_someting-1 is not found in the current Flow. This is usually caused by a mismatch between th│
e flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did│
you change the flow without re-registering it?\n- Did you register the flow without updating │
it in your storage location (if applicable)?') │when trying to run a flow of flows like this:
from flows.a import flow as flow_a # flow's name is "flow-a"
from flows.b import flow as flow_b # flow's name is "flow-b"
kwargs = {
"project_name": "foo",
"labels": ["test"]
}
flow_a.register(**kwargs)
flow_b.register(**kwargs)
with Flow("flow-of-flows", run_config=run_config, storage=storage, result=result) as flow:
run_id_flow_a = create_flow_run(flow_name="flow-a", **kwargs)
wait_for_flow_a = wait_for_flow_run(run_id_flow_a, raise_final_state=True)
output_a = get_task_run_result(run_id_flow_a, task_slug="do_something-1")
run_id_flow_b = create_flow_run(flow_name="flow-b", parameters={"input": output_a}, **kwargs)
run_id_flow_b.set_upstream(wait_for_flow_a)
flow.register(**kwargs)
@George CoyneDonnchadh McAuliffe
02/18/2022, 4:08 PMPOST: <http://localhost:4200/api/flow_runs/>
PAYLOAD:
{
"name": "my_scheduled_flow",
"deployment_id": "0ff88fc8-a71c-4b71-b8dd-078d6a36fb39",
"flow_id": "94374bea-70ae-4e49-8988-e9077daa280a",
"flow_version": "1.0",
"state": {
"type": "SCHEDULED",
"name": "my_scheduled_flow",
"message": "a message",
"state_details": {
"scheduled_time": "2022-02-18T16:10:49.005Z"
}
}
}
The flow run then appears in the server UI (as yellow) but never gets ran at the correct time. Does anyone have any idea on this? Am I missing an important part of the payload.David Yang
02/18/2022, 4:27 PMpip install markupsafe==2.0.1
David Yang
02/18/2022, 4:27 PMpip install markupsafe==2.0.1
Kevin Kho
02/18/2022, 4:28 PMDavid Yang
02/18/2022, 4:29 PMKevin Kho
02/18/2022, 4:30 PMDavid Yang
02/18/2022, 4:30 PMMatthias
02/18/2022, 4:33 PMDavid Yang
02/18/2022, 6:19 PMMatthias
02/18/2022, 6:31 PMDavid Yang
02/18/2022, 6:32 PMMatthias
02/18/2022, 6:50 PM>0.23
so that’s why you explicitly had to downgrade it. Am alternative fix is to pin the version in the requirements of your application and use dbt 1.0.1.David Yang
02/18/2022, 8:19 PMMatthias
02/18/2022, 8:26 PM