Iván Sánchez
04/01/2022, 7:49 AMDomenico Di Gangi
04/01/2022, 8:49 AMAndrei Aldescu
04/01/2022, 11:14 AMPatrick Tan
04/01/2022, 12:00 PMwith Flow("parent-flow") as flow:
for i in range(2):
flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
print(i)
Matthew Seligson
04/01/2022, 12:14 PMMia
04/01/2022, 3:09 PMWei Mei
04/01/2022, 3:30 PMtoday, dir = get_data(endpoints=api_endpoints)
upload_to_s3(today, dir, endpoints=api_endpoints)
snowflake_load(today, schema="statistics", endpoints=api_endpoints)
Donnchadh McAuliffe
04/01/2022, 3:51 PMPOST <https://api-beta.prefect.io/api/accounts/123/workspaces/456/deployments/789/create_flow_run>
PAYLOAD
{
"state": {
"type": "SCHEDULED",
"message": "Quick run through UI"
}
}
Headers {Authorization: Bearer api_key}
Getting a SSL Error: Certificate has expired
. Any ideas?Joshua Greenhalgh
04/01/2022, 4:00 PMCollecting flows...
Processing 'flows':
Building `Module` storage...
Building 'first_flow'... Done
Writing output to 'flows.json'
========================== 1 built ==========================
Collecting flows...
Processing 'flows.json':
Registering 'first_flow'... Skipped (metadata unchanged)
================== 0 registered, 1 skipped ==================
Amogh Kulkarni
04/01/2022, 9:49 PMKen Nguyen
04/02/2022, 9:05 AMwith Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
dbt_run_flow_run_id = create_flow_run(
flow_name="test-dbt-run-flow",
project_name="data_quality_tracking"
)
flow_run = wait_for_flow_run(
dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
)
flow_logs = get_logs(
dbt_run_flow_run_id,
task_args={"name": "Getting logs", "trigger": all_finished},
upstream_tasks=[flow_run],)
get_logs
is a function that takes in a flow_run_id
, create a FlowRunView
, then get the logs of that FlowRunView
. My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the get_logs
task, I’m still getting logs that are incomplete.
Do you have any suggestions for why my get_logs
function is prematurely retrieving logs of the child flow/retrieving incomplete logs?R Zo
04/02/2022, 11:42 AMHi prefect team,
I am trying to spin up a flow of flows, so I started a prefect server and agent with label "test". Below is a snippet of code that should run child_flow_1 which is a flow with multiple tasks. However child_flow_1 does not run while prefect quickly returns a few success logs and there is no error reported. What I am missing? I have run child_flow_1 without the use of flow of flows and it works fine. Following is a snippet of the log using Dask, but I have tried LocalDaskExecutor as well.
test_flows created
Flow URL: <http://localhost:8080/default/flow/cd6dc2bb-8f54-4f5a-b046-d90e5c853dbe>
└── ID: e405d852-5c75-4001-815e-619687e592d3
└── Project: test_flows
└── Labels: ['mymachinexxx', 'test']
[2022-04-02 22:30:01+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flows'
[2022-04-02 22:30:01+1100] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at <tcp://192.168.20.9:8786>
[2022-04-02 22:30:02+1100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
test_flows = make_three_flows()
for tfl in test_flows:
tfl.register("test_flows")
# tfl.executor = LocalDaskExecutor(scheduler="threads",num_workers=10)
with Flow("parent_flow", run_config=LocalRun(labels=["test"])) as parent_flow_complete:
working_dir = Parameter("working_dir") #
create_flow_run(
flow_name="child_flow_1",parameters={"working_dir": working_dir})
parent_flow_complete.executor = DaskExecutor(address="<tcp://localhost:8786>")
# parent_flow_complete.register("test_flows")
parent_flow_complete.run(parameters={"working_dir": working_dir},)
Jai P
04/02/2022, 5:06 PMflow
needs to be defined up front, but correct me if i'm wrong. I'll try to add a few more details in the thread, and thanks in advance for your help!Shiyu Gan
04/03/2022, 2:36 AMShiyu Gan
04/03/2022, 2:36 AMAtul Anand
04/03/2022, 1:13 PMJai P
04/03/2022, 6:44 PMflow.visualize()
in prefect 1.0 (not using radar)? If not, are there plans to add that functionality?Bruno Nunes
04/03/2022, 6:54 PMConfigure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
Check out the dashboard at <http://0.0.0.0:4200>
12:51:20.774 | WARNING | prefect.agent - No work queue found named 'kubernetes'
I've edited the service to be type LoadBalanced and used the external IP to set the PREFECT_API_URL
prefect config set PREFECT_API_URL=<http://xx.xx.xx.xx:4200/api>
I've updated the sqlalchemy as suggested in here and created a new work-queue called kubernetes.
I've created a new storage pointing to my azure blob storage and created the new deployment and run it. Everything finishes without errors but I don't see any activity in my cluster nor nothing being created in the UI.
Can you give me some guidance on what I might be missing?Atul Anand
04/04/2022, 2:46 AMShiyu Gan
04/04/2022, 5:56 AMJeff Kehler
04/04/2022, 6:18 AMcreate_flow_run
function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?Konstantin
04/04/2022, 6:56 AMJan Nitschke
04/04/2022, 9:52 AMfrom tasks import my_task
from prefect.storage import GitHub
from prefect import Flow
from prefect.run_configs import ECSRun
storage = GitHub(
repo="repo", # name of repo
path="path/to/myflow.py", # location of flow file in repo
access_token_secret="GITHUB_ACCESS_KEY", # name of personal access token secret
)
with Flow(name="foobar",
run_config=ECSRun(),
storage=storage) as flow:
my_task()
The problem seems to be that the GitHub storage only clones the single file and not the entire project which causes my import to fail. (ModuleNotFoundError("No module named 'tasks'")
) I've seen that there has been some discussion around this issue but it hasn't really helped me to solve the issue.... Is my only option to clone the repo into the custom image that I use for my ECS task? But that would mean that I would have to rebuild that image every time I change something to my underlying modules, right?Andres
04/04/2022, 11:54 AMSome reference tasks failed.
I was investigating a bit and seem that the state contains the result of all tasks while running it locally (state.result
) while on the server this is empty (i printed it using the logger) .
Any idea on how to address this?Atul Anand
04/04/2022, 12:37 PMTom Klein
04/04/2022, 1:16 PMShuchita Tripathi
04/04/2022, 1:59 PMRajan Subramanian
04/04/2022, 2:08 PMJoshua Weber
04/04/2022, 2:09 PMRajan Subramanian
04/04/2022, 2:44 PMprefect deployment create deployment_name
again for those new changes to take affect?
2) if above is true, then do i need to rerun the tasks again on the UI?
3) sometimes i inadverently press run twice and i have two running processes. Is there anyway to stop a process after it has been started?
4) when i delete the workspace, to start over, i notice when i type,
ps aux | grep python | wc -l
the python processes are still running and i have to do a
pkill python
to kill all the python processes. Is there any way that once a workspace is killed all the python processes are killed along with it?