David Yang
03/10/2022, 7:02 PMGabriel Milan
03/10/2022, 8:13 PMbot_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(hours=1),
start_date=datetime(2021, 1, 1),
labels=[
...,
],
parameter_defaults={
...,
},
),
],
)
At first, we thought it was an UI issue. But then, we've confirmed that the flow wasn't being scheduled at all for this time window. Any ideas?Stephen Herron
03/10/2022, 9:38 PMRio McMahon
03/10/2022, 10:33 PMNo heartbeat detected from the flow run; marking the run as failed.
. The SQL query takes ~45 seconds to run on my local machine so I am curious what interval the zombie killer process polls at and if you have any suggestions for debugging this interruption. The flow/associated scripts run fine on my local machine. Thanks.Brett Naul
03/11/2022, 1:02 AMkevin
03/11/2022, 4:08 AMselect * from table
to snowflake using a SnowflakeQuery task. It appears that this task uses fetchall()
to return this data into memory: https://github.com/PrefectHQ/prefect/blob/5d2732a30563591410cb11fe0f7e7dfe65cc5669/src/prefect/tasks/snowflake/snowflake.py#L186
I expect that this causes performance issues with extremely large queries so I am wondering what Prefect Cloud’s tolerance for this. Ideally I think it would be preferable to lazy load query results and/or allow for query pagination? Perhaps there’s an architecture limitation I’m overlooking? I’d appreciate any insight 🙂satyapal reddy
03/11/2022, 6:00 AMsatyapal reddy
03/11/2022, 6:01 AMsatyapal reddy
03/11/2022, 6:01 AMLiezl Puzon
03/11/2022, 6:20 AMJ. Martins
03/11/2022, 9:39 AMale
03/11/2022, 10:36 AMwith raw_data as
(
select "start_time"::date as task_run_date,
state,
count(id) as task_run_per_date_and_state
from public.task_run
group by 1,2
)
select state,
avg(task_run_per_date_and_state)::int as avg_task_runs_per_day
from raw_data
where state = 'Success'
group by 1
order by 1
Vipul
03/11/2022, 11:27 AMLalit Pagaria
03/11/2022, 12:09 PMJean-Baptiste Six
03/11/2022, 12:42 PMAdi Gandra
03/11/2022, 3:32 PM"cpu_request": "4",
On describe pod that is spun up:
Requests:
cpu: 6
memory: 16Gi
Why is this happening on the restarted flow run that i’m trying?Constantino Schillebeeckx
03/11/2022, 3:45 PMStartFlowRun(...).run()
- when this executes in the cloud I'm seeing: 🧵Chris Reuter
03/11/2022, 4:11 PMAndreas Nord
03/11/2022, 4:34 PMflow.run_configs = DockerRun(image="myrepo/image")
flow.register(project_name)
But it shows up incorrectly as UniversalRun in cloud UI.
If I add the runconfig when I define the flow it works perfectly:
with Flow("myflow", DockerRun(image="myrepo/image") as flow:
Any suggestion to what I am doing wrong in the first approach would be appreciatedTim Enders
03/11/2022, 8:54 PMBradley Hurley
03/11/2022, 9:25 PMDavid Beck
03/11/2022, 9:27 PMkensuke matsuura
03/12/2022, 10:41 AMMukamisha jocelyne
03/12/2022, 11:22 AMJames Harr
03/12/2022, 2:20 PMprefect deployment run test-flow/test-deployment
, the UI shows that the flow run is Pending. When I look at the logs in Kubernetes with kubectl logs -l job-name=voracious-antelope2b89g
, I see a FileNotFound stack trace
Thank you for your help!Tomer Cagan
03/13/2022, 12:21 PMDekel R
03/13/2022, 4:50 PMCREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'
with Flow('comparable-products-data-extraction-development',
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: # , schedule=daily_schedule
raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
# raw_credentials = get_local_credentials()
google_credentials = parse_credentials(credentials=raw_credentials)
And here is the parse_credentials task -
from prefect import task
from google.oauth2 import service_account
@task()
def parse_credentials(credentials):
return service_account.Credentials.from_service_account_info(credentials)
This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask….
When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment)
Any solution for this? what am I missing?
ThanksRaymond Yu
03/14/2022, 2:23 AMAyah Safeen
03/14/2022, 6:42 AMrun prefect server start
I get the following error
Pulling postgres ... error
Pulling hasura ... error
Pulling graphql ... error
Pulling apollo ... error
Pulling towel ... error
Pulling ui ...
richard-social 812
03/14/2022, 8:35 AM@task(result=LocalResult(dir=f'{os.getcwd()}/storage/pre_delinquency_models',
serializer=H2OModelSerializer()),
checkpoint=True, target='{date:%Y}-Week {date:%U}', log_stdout=True)
def train_new_model(data: pd.DataFrame):
I have it running on prefect cloud with a run_config that looks like below:
run_config = LocalRun(env={'PREFECT__FLOWS__CHECKPOINTING':os.environ.get('PREFECT__FLOWS__CHECKPOINTING', 'true')})
However, looking at the results folder I see that the task result file is created a new with each daily run . What am I missing?