Lucian Rosu
11/25/2021, 11:33 AMMarko Jamedzija
11/25/2021, 4:47 PMAnh Nguyen
11/26/2021, 7:38 AMM. Siddiqui
11/26/2021, 3:20 PMChristopher Chong Tau Teng
11/29/2021, 8:09 AMDockerRun
but am faced with issue importing my flow task from another module — when I run my flow registration script below, it complains that src
module is not found. I think I am doing something wrong because I am confused on how DockerRun works.
I have my Python tasks defined as task_test_flow
which reside in a different python module. I then built a docker image of that python module into <http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>
which I defined inside DockerRun(image='')
here. Then I created the following registration script where I believe I need to import the task (as done so in https://docs.prefect.io/orchestration/flow_config/docker.html#dependency-requirements). But here’s the problem, task_test_flow
is defined in the docker image <http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>
, and its not located in the same directory as this registration script below…
I guess my question is, as I want to define my task separately from my flow registration script, how can I import my task (that is already built into a docker image) into my flow registration script? Do I even need to do that in the first place?
from datetime import timedelta
from prefect import Flow
from prefect.schedules import IntervalSchedule
from prefect.storage import GCS
from prefect.run_configs import DockerRun
from src.test_flow import task_test_flow
schedule = IntervalSchedule(interval=timedelta(minutes=1))
with Flow("test-flow", schedule) as flow:
task_test_flow()
flow.storage = GCS(bucket="some-bucket")
flow.run_config = DockerRun(image="<http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>")
flow.register()
Christopher Chong Tau Teng
11/29/2021, 9:02 AMWilliam Clark
11/29/2021, 5:28 PMOmar Alvarado
11/30/2021, 1:08 AMCormac Long
11/30/2021, 8:30 AMSylvain Hazard
11/30/2021, 8:36 AM.sql
files as if they were .py
scripts.
Explanation and relevant code below.Gagan Singh Saluja
11/30/2021, 1:16 PMGagan Singh Saluja
11/30/2021, 1:20 PMPayam Vaezi
11/30/2021, 7:10 PMRUNNING
instead of SUCCESS
or FAILURE
and resulted in PENDING
status in all downstream tasks. Can you please advise what such behavior happened and what we need to do on our side to address such behavior?Bruno Kuasney
12/01/2021, 8:59 AMpushgateway_url
 parameter).
how is suppose to work that example since I’m not able to hit the endpoint? I also kinda forced the endpoint to be created by using
start_http_server(8000)
 from prometheus_client
 .
Like I said, is not breaking the code, so maybe is working, I’m just not able to see it somehow.William Clark
12/01/2021, 12:29 PMGagan Singh Saluja
12/01/2021, 3:18 PMLana Dann
12/01/2021, 7:37 PMflows
directory that registers the same flows in different runtime environments, but some flows I only want to run in staging
and not in production
, for example. I register all staging flows using:
prefect register -p lib/flows/ --project {project} --label staging
Christopher Chong Tau Teng
12/02/2021, 11:42 AM-flows/
---flow_1.py
---flow_2.py
-src/
---task_flow_1.py
---task_flow_2.py
-Dockerfile
and samples from each file:
flow_1.py
from datetime import timedelta
from prefect import Flow
from prefect.schedules import IntervalSchedule
from prefect.storage import GCS
from prefect.run_configs import DockerRun
import sys
sys.path.append('.../src')
from task_flow_1 import task_test_flow
schedule = IntervalSchedule(interval=timedelta(minutes=1))
with Flow("test-flow-1", schedule) as flow:
task_test_flow()
flow.storage = GCS(bucket="xxx")
flow.run_config = DockerRun(image="xxx/prefect:v1")
flow.register(project_name='docker-runner-01')
task_flow_1.py
import prefect
from prefect import task
import numpy as np
@task
def task_test_flow():
logger = prefect.context.get("logger")
test_arr = np.array([1, 2, 3])
<http://logger.info|logger.info>(f"{test_arr}")
Dockerfile
FROM prefecthq/prefect:latest
WORKDIR /app
ADD . .
RUN prefect backend server
Now assuming I have registered both flows with the server and they are running as expected.
One day suddenly, flow_1
breaks and I need to change task_flow_1.py
to fix the bug. I then updated the following image to v2 in flow_1.py
.
flow.run_config = DockerRun(image="xxx/prefect:v2")
I then built a new docker image v2 and pushed to xxx/prefect:v2
.
Here’s my question: before I register these 2 flows with the server, do I also need to update the image in flow_2.py
to use xxx/prefect:v2
, or can it continue to use xxx/prefect:v1
?Vaibhav Ariyur
12/02/2021, 8:10 PMon_failure
param of a flow? Should the function called by on_failure
be able to read your flow's results?Lana Dann
12/02/2021, 9:20 PMPYTHONPATH
for storage? for context, i’m using gitlab storage for my flow and flows are in lib/flows
but when the flow tries to run, it errors with ModuleNotFoundError: No module named 'lib'
when the flow tries to access the storage.Gagan Singh Saluja
12/03/2021, 7:23 AMjack
12/03/2021, 4:31 PMclient.create_flow_run()
to create several flow runs (using ECSRun), and then polling each with client.get_flow_run_state
to know when all the flows have completed.
When one of the flows fails (and prefect starts a new flow run to take its place), how can we check when the rerun is complete (and whether it succeeded)?jack
12/03/2021, 6:20 PMNo heartbeat detected from the remote task; marking the run as failed.
For 20+ minutes following that log message, fetching the flow run state from prefect cloud still shows <Running: "Running flow.">
Ideally, as soon as the flow run is marked as failed, state from prefect cloud would say Failed. Suggestions?Lukas N.
12/03/2021, 6:51 PMCloudFlowRunner
support graceful shutdown? We're running flows as Kubernetes jobs on EC2 spot instances, which get terminated from time to time. Let's say the job starts a pod a
on a node that terminates. Kubernetes will quickly reschedule it and spawn a pod b
, but b
does nothing because the state of the tasks is Running
, but they are not (a
is dead). We need to wait for the heartbeat to timeout for Prefect to reschedule it which takes a long time. Instead setting something like Pending
or even Failed
to tasks that are Running
when a SIGTERM is received would be nice.Benson Mwangi
12/05/2021, 1:10 AMSylvain Hazard
12/06/2021, 7:55 AMids = get_ids()
step_1 = do_step_1.map(ids)
step_2 = do_step_2.map(step_1)
step_3 = do_step_3.map(step_2)
The number of ids retrieved can vary by a few orders of magnitude and I cannot predict it.
The issue I see is that while the flow runs, the memory footprint keeps increasing which sometimes results in an OOM kill of the pod running the flow.
Is there any way to have the memory footprint be near constant with regard to the number of executed mapped tasks in the flow ? I understand that the initial mapping requires a bunch of memory and that there is no way around it.
I am running on K8S, using a LocalDaskExecutor (threaded) and had hoped that depth first execution would mean there would be some amount of garbage collecting with fully executed branches. Would setting up a Result
in the mapped tasks help in any way ?
I tried manually releasing memory inside the tasks code (with del
and gc
mostly) but saw no improvement.
Another solution I see would be to have steps 1-3 be executed in their own separate flow but that means we would spin up a bunch of ephemeral pods and lengthen the flow overall I suppose ?
Thanks !Robert Hales
12/06/2021, 10:03 AMPending
which I think lead to double runs. From what I understand Pending
shouldn’t be a terminal state?Guilherme Petris
12/06/2021, 10:13 AMfrom analytics_toolbox.analytics_toolbox import *
from analytics_toolbox.analytics_toolbox import PendoUtils as pendo_utils
from analytics_toolbox.analytics_toolbox import SnowflakeUtils as snowflake_utils
from prefect import task, Flow
from prefect.utilities.notifications import slack_notifier
@task ()
def main():
......
with Flow("pendo_featureEvents") as flow:
main()
flow.register(project_name="pendo_api")
Faizan Qazi
12/06/2021, 10:51 AMconfig.toml
file . But as i access it at <http://ip.add.re.ss:8080>
, the page displays but then redirects to the getting started page . So basically i want to access my prefect instance from any other machine.
[server]
[server.ui]
apollo_url="<http://ip.add.re.ss:4200/graphql>"
Lukas Brower
12/06/2021, 3:06 PMconcurrent.futures._base.CancelledError
We are trying to determine if there is something in Prefect or dask which kills the dask scheduler when execution nears the 12 hour mark. I don’t see any major resource constraints in workers at the time of the flow cancellations, so I figured I would start here and ask if there is anything special about the 12 hour mark in Prefect, or if this is likely a dask-specific issue.