Ben Welsh
03/31/2022, 8:28 PMBen Welsh
03/31/2022, 8:28 PMBen Welsh
03/31/2022, 8:28 PMErik Schomburg
03/31/2022, 9:07 PMChu Lục Ninh
04/01/2022, 12:51 AMAtul Anand
04/01/2022, 1:12 AMKen Nguyen
04/01/2022, 3:27 AMAndrei Aldescu
04/01/2022, 7:47 AMIván Sánchez
04/01/2022, 7:49 AMDomenico Di Gangi
04/01/2022, 8:49 AMAndrei Aldescu
04/01/2022, 11:14 AMPatrick Tan
04/01/2022, 12:00 PMwith Flow("parent-flow") as flow:
for i in range(2):
flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
print(i)
Matthew Seligson
04/01/2022, 12:14 PMMaria
04/01/2022, 3:09 PMWei Mei
04/01/2022, 3:30 PMtoday, dir = get_data(endpoints=api_endpoints)
upload_to_s3(today, dir, endpoints=api_endpoints)
snowflake_load(today, schema="statistics", endpoints=api_endpoints)
Donnchadh McAuliffe
04/01/2022, 3:51 PMPOST <https://api-beta.prefect.io/api/accounts/123/workspaces/456/deployments/789/create_flow_run>
PAYLOAD
{
"state": {
"type": "SCHEDULED",
"message": "Quick run through UI"
}
}
Headers {Authorization: Bearer api_key}
Getting a SSL Error: Certificate has expired
. Any ideas?Joshua Greenhalgh
04/01/2022, 4:00 PMCollecting flows...
Processing 'flows':
Building `Module` storage...
Building 'first_flow'... Done
Writing output to 'flows.json'
========================== 1 built ==========================
Collecting flows...
Processing 'flows.json':
Registering 'first_flow'... Skipped (metadata unchanged)
================== 0 registered, 1 skipped ==================
Amogh Kulkarni
04/01/2022, 9:49 PMKen Nguyen
04/02/2022, 9:05 AMwith Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
dbt_run_flow_run_id = create_flow_run(
flow_name="test-dbt-run-flow",
project_name="data_quality_tracking"
)
flow_run = wait_for_flow_run(
dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
)
flow_logs = get_logs(
dbt_run_flow_run_id,
task_args={"name": "Getting logs", "trigger": all_finished},
upstream_tasks=[flow_run],)
get_logs
is a function that takes in a flow_run_id
, create a FlowRunView
, then get the logs of that FlowRunView
. My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the get_logs
task, I’m still getting logs that are incomplete.
Do you have any suggestions for why my get_logs
function is prematurely retrieving logs of the child flow/retrieving incomplete logs?R Zo
04/02/2022, 11:42 AMHi prefect team,
I am trying to spin up a flow of flows, so I started a prefect server and agent with label "test". Below is a snippet of code that should run child_flow_1 which is a flow with multiple tasks. However child_flow_1 does not run while prefect quickly returns a few success logs and there is no error reported. What I am missing? I have run child_flow_1 without the use of flow of flows and it works fine. Following is a snippet of the log using Dask, but I have tried LocalDaskExecutor as well.
test_flows created
Flow URL: <http://localhost:8080/default/flow/cd6dc2bb-8f54-4f5a-b046-d90e5c853dbe>
└── ID: e405d852-5c75-4001-815e-619687e592d3
└── Project: test_flows
└── Labels: ['mymachinexxx', 'test']
[2022-04-02 22:30:01+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flows'
[2022-04-02 22:30:01+1100] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at <tcp://192.168.20.9:8786>
[2022-04-02 22:30:02+1100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
test_flows = make_three_flows()
for tfl in test_flows:
tfl.register("test_flows")
# tfl.executor = LocalDaskExecutor(scheduler="threads",num_workers=10)
with Flow("parent_flow", run_config=LocalRun(labels=["test"])) as parent_flow_complete:
working_dir = Parameter("working_dir") #
create_flow_run(
flow_name="child_flow_1",parameters={"working_dir": working_dir})
parent_flow_complete.executor = DaskExecutor(address="<tcp://localhost:8786>")
# parent_flow_complete.register("test_flows")
parent_flow_complete.run(parameters={"working_dir": working_dir},)
Jai P
04/02/2022, 5:06 PMflow
needs to be defined up front, but correct me if i'm wrong. I'll try to add a few more details in the thread, and thanks in advance for your help!Shiyu Gan
04/03/2022, 2:36 AMShiyu Gan
04/03/2022, 2:36 AMAtul Anand
04/03/2022, 1:13 PMJai P
04/03/2022, 6:44 PMflow.visualize()
in prefect 1.0 (not using radar)? If not, are there plans to add that functionality?Bruno Nunes
04/03/2022, 6:54 PMConfigure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
Check out the dashboard at <http://0.0.0.0:4200>
12:51:20.774 | WARNING | prefect.agent - No work queue found named 'kubernetes'
I've edited the service to be type LoadBalanced and used the external IP to set the PREFECT_API_URL
prefect config set PREFECT_API_URL=<http://xx.xx.xx.xx:4200/api>
I've updated the sqlalchemy as suggested in here and created a new work-queue called kubernetes.
I've created a new storage pointing to my azure blob storage and created the new deployment and run it. Everything finishes without errors but I don't see any activity in my cluster nor nothing being created in the UI.
Can you give me some guidance on what I might be missing?Atul Anand
04/04/2022, 2:46 AMShiyu Gan
04/04/2022, 5:56 AMJeff Kehler
04/04/2022, 6:18 AMcreate_flow_run
function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?Konstantin
04/04/2022, 6:56 AMKonstantin
04/04/2022, 6:56 AMAnna Geller
04/04/2022, 9:12 AMKonstantin
04/04/2022, 3:38 PMKevin Kho
04/04/2022, 5:42 PMKonstantin
04/05/2022, 6:45 PMKevin Kho
04/05/2022, 8:27 PMKonstantin
04/06/2022, 8:14 AMKevin Kho
04/06/2022, 2:07 PM