Mohit Singhal
11/29/2022, 7:07 AMMohit Singhal
11/29/2022, 7:25 AMAndreas Nigg
11/29/2022, 8:19 AMJames Zhang
11/29/2022, 9:04 AMJavier Ruere
11/29/2022, 9:26 AMSeif Harrathi
11/29/2022, 9:38 AM@flow(name="Run Check", description="Flow that controls the data integrity")
def run_check(layers_data):
"""
1- Extract data from Arcgis databse
2- Transform data
3- Load result into Excel output
@param layers_data:
@return:
"""
# 1- Extract data from databse
state = extract_data(layers_data, return_state=True)
raw = state.result()
# 2- Transform data
state = DataTransformer.transform_results(raw, return_state=True)
data = state.result()
# 3- Load result into Excel output
state = DataWriter.load_results(data, return_state=True)
output = state.result()
return {"Result": {
"nb_errors": 12,
"nb_lines_checked": 456
},
"output_path": "path_to_s3"
}
My question is how to get the results returned by my flow run like I want to get
{
"Result": {
"nb_errors": 12,
"nb_lines_checked": 456
},
"output_path": "path_to_s3"
}
I used the Endpoint :/api/flow_runs/{id}
But I I dont the my result in the response 😕
Any help ? Any idea . I thought about saving the result in the S3 bucket than retrieve it but not sure if this is the best practice
Thaaaaaaaaaaaanks in advancealvin goh
11/29/2022, 10:23 AMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:30 PMMihai H
11/29/2022, 1:31 PMEncountered exception during execution:
Traceback (most recent call last):
File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 612, in orchestrate_flow_run
waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 1325, in wait_for_task_runs_and_report_crashes
if not state.type == StateType.CRASHED:
AttributeError: 'coroutine' object has no attribute 'type'
03:25:35 PM
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'
Mihai H
11/29/2022, 1:32 PMKelvin Garcia
11/29/2022, 1:51 PMon_failure
function handler either from the flow
variable or the state
variable are passed to that function?Mihai H
11/29/2022, 2:41 PMMihai H
11/29/2022, 2:42 PMMihai H
11/29/2022, 2:42 PMSlackbot
11/29/2022, 2:42 PMFuETL
11/29/2022, 2:45 PMcreate_flow_run(flow_id=flow_id, parameters=parameters)
But sometimes when theres nothing to run or running, the flow is scheduled to run in like 20~30 min, even when i'm not providing the scheduled_start_time parameter, how can i make my flows always get reschedule immediately? (I was think that this is the default behaviour), maybe is the amount of agent that i'm running? Thanks.Sunjay
11/29/2022, 3:04 PMMatt Delacour
11/29/2022, 3:34 PMJean-Michel Provencher
11/29/2022, 4:40 PMPatrick Tan
11/29/2022, 4:59 PMKalise Richmond
11/29/2022, 6:03 PMBo
11/29/2022, 10:01 PMRegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
errorsBradley Hurley
11/30/2022, 4:17 AMGithub-flavored Markdown
, but I don’t think <details>
are actually supported. I searched for existing GitHub issues, but wasn’t able to find anything.Tim Galvin
11/30/2022, 8:51 AMPREFECT_LOGGING_EXTRA_LOGGERS
mechanism? I am running a DaskTaskExecutor with a SLURMCluster
backend to create the set of dask-workers
.
I can see that the loggers in modules I am using in my prefect2 pipeline are beingreported in the slurm
stderr output that is written to disk through the sbatch --error log.err
argument. These messages are following the prefect log handler configurtion ("Time | State | _ name _ - Message"), but they are not being saved to the orion database / presented by the orion UI.Vadym Dytyniak
11/30/2022, 9:43 AMeddy davies
11/30/2022, 10:34 AMroady
11/30/2022, 10:44 AMChristopher
11/30/2022, 2:29 PMChristopher
11/30/2022, 2:29 PMBianca Hoch
11/30/2022, 10:04 PM