Hugo Polloli
09/14/2021, 7:35 AM@task(state_handlers=[my_handler])
def my_task(my_input):
self.message = f"Input {my_input} wasn't ok"
assert input_is_ok(my_input)
self.message = f"Failed to do_some_things for {my_input}"
res = do_some_things(my_input)
self.message = f"Failed to do_a_few_more for {my_input}"
res2 = do_a_few_more(res)
return res2
I understand I could use signals, using the value property of a signal to store what I want to print, but that would mean wrapping each row I'm performing an action in inside a try/except block, then raising the signal inside the except block, I've done that and it gets really cluttered
I maybe missed something obvious and am going in the wrong direction though...Issam Assafi
09/14/2021, 7:43 AMshekhar koirala
09/14/2021, 8:01 AM@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def fetch_data(count,check):
data = []
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(data)
return data
def run():
with Flow("fetch-data") as local_flow:
count = Parameter("count", default=100)
check = Parameter("limit", default=10)
fetch_data(count,check)
local_flow.register(project_name="test")
id = local_flow.run()
print(id)
if __name__ == "__main__":
run()
Lucas Beck
09/14/2021, 8:20 AMETL
like jobs to run, some on schedule and some on demand. We usually break down big computing tasks in smaller parts, each contained in its own docker
container. These tasks, when combined, form a computation DAG. This DAG can mostly be parallelized (mapped). We then leverage an Azure Kubernetes Service (AKS) cluster to horizontally scale the compute jobs into hundrends, sometimes thousands of containers. So far we have been using our own internal tool to orchestrate tasks and manage dependencies, and that is what we are currently investigating if we can replace with prefect. Our prefect setup has a self hosted server instead being the cloud solution.
Challenge 1
The first and most important challenge we are facing regards scaling jobs. I will use the flow we tried to migrate to prefect as the example here. I am attaching the DAG schematic as a picture. In this flow, each task use prefect's RunNamespacedJob
to spin up a new job that has exactly one pod, which will perform a given computation. We then use other functions from the prefect SDK to read the logs after completion and delete the kubernetes jobs. The whole DAG can be run in parallel (mapped) for multiple inputs. Usually the job works fine for 3-5 inputs, but as soon as we try to scale to as little as 30 inputs we start seeing a ton of heartbeat errors or no pod statuses (also attaching an image here) . Once running with our internal tool, we already scaled up to around 500 inputs in parallel in the same cluster setup. Anyone else has experienced this? By the way, we are using the DaskExecutor
for that with default parameters
Challenge 2
We currently have a single repo where we want to keep all prefect flows under version control. In order to reuse code, we also made it a package that can be imported around. The structure looks like this:
prefect-flows/
setup.py
requirements.txt
prefectflows/
flows/
flow1.py
flow2.py
...
core/
kubernetes.py
...
The idea is that the actual flows sit under the flow
folder and can import boiler plate/ utils from the core
folder. For instance, under kubernetes.py
are all functions that use prefect's SDK to interact with kubernetes. An example function is run_job
which will run a job, read logs and eventually delete that job.
This all works fine if running locally, but fails once registering a flow and trying to run it through the server. The error is :
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'prefectflows.core\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
It is important to mention that I have dockerized the package using the prefect base image, and I am telling prefect to run the flow with that image. Something like the following:
Flow(
flow_name,
storage=Azure(
container=container,
stored_as_script=False,
connection_string=azure_connection_string,
),
run_config=KubernetesRun(
image="imagewithourpackageinstalled:latest",
...
Maybe I misunderstood where the unpickling is happening, but would expect that if the flow uses an image that has all packages used when registering that flow, then that should work.
I think that summarizes it, but please let us know if you need further description or to share more of the code.
Thanks in advance for your help!Nadav Nuni
09/14/2021, 8:21 AMunmapped
? (code added in a reply)Marko Herkaliuk
09/14/2021, 8:50 AMMaikel Penz
09/14/2021, 9:16 AMWARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7ffb221c2160>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')': /simple/pip/
see the Temporary failure in name resolution
In summary: I’m not sure how much this is Kubernetes or a Prefect issue. I’m throwing it here as there could be a tweak on Prefect we could do to make it work.
Any ideas ?Abhishek
09/14/2021, 12:02 PMChris McPherson
09/14/2021, 1:53 PMConstantino Schillebeeckx
09/14/2021, 4:45 PMKathryn Klarich
09/14/2021, 5:41 PMmax_retries
on a task from the UI (or without having to upload new code)? I was hoping I could template the value and pass it to the @task decorator, but it doesn't look like you can use templates with the max_retries
argumentVasudevan Balakrishnan
09/14/2021, 6:10 PMJoe Schmid
09/14/2021, 6:35 PMLucas Hosoya
09/14/2021, 6:56 PMException: Java gateway process exited before sending its port number
Sean Talia
09/14/2021, 9:34 PMfrom prefect.utilities.logging import get_logger
from prefect import task
@task
def log_value(val: str) -> None:
logger = get_logger()
<http://logger.info|logger.info>(val)
Abhas P
09/14/2021, 10:28 PMwith Flow("example") as flow:
credentials = {'id': ... , 'pass': ...}
apply_map(transform ,result ,unmapped(credentials)) # can we do this ?
apply_map(transform ,result ,id =unmapped(credentials['id']), pass =unmapped(credentials['pass'])) # or is this the standard way?
Kathryn Klarich
09/14/2021, 10:55 PMBastian Röhrig
09/15/2021, 9:35 AMstate_handler
that uses the set_flow_run_name
method on the prefect client to rename itself when it moves to the running state. Now, I have set automations in prefect cloud that alert me when a flow run fails. I am now experiencing the following behaviour: If a flow run fails within a few minutes (approx. 5-10 minutes), I see the prefect generated run name (e.g. neat-kudu) in the alerts. If the flow run fails after a longer time (50 minutes), I see my custom flow run name in the alert.
Is this expected? Is there anything I can do on my site to fix it?Jérémy Trudel
09/15/2021, 4:00 PMQin XIA
09/15/2021, 4:34 PMLuca Schneider
09/15/2021, 6:34 PMquery GetLatest {
project(where: {name: {_eq: "project-test"}}) {
flows(
where: {name: {_eq: "flow-test"}, archived: {_eq: false}}
order_by: {version: desc}
limit: 1
) {
id
}
}
}
mutation TriggerLatest {
create_flow_run(input: {flow_id: "xxx"}) {
id
}
}
Daniel Manson
09/15/2021, 8:01 PMAbhas P
09/15/2021, 9:25 PMNadav
09/15/2021, 9:45 PMJason Kim
09/15/2021, 11:07 PMJacob Blanco
09/16/2021, 12:30 AMpg_terminate_backend
it would be nice if the task getting cancelled would be able to call pg_terminate_backend
.Jacob Blanco
09/16/2021, 12:35 AMMartim Lobao
09/16/2021, 12:54 AMCA Lee
09/16/2021, 3:30 AMZac Chien
09/16/2021, 7:56 AMZac Chien
09/16/2021, 7:56 AMZach Angell
09/16/2021, 1:25 PMZac Chien
09/17/2021, 8:58 AMwit-labeling
which accepts a list of label-name
as input, also we have a python function consists of three steps(call boto3 S3, Athena function) for a single labeling logic. In wit-labeling
, threading is used for speedup purpose, meanwhile we want to know the details for each of labeling step, including for debug purpose when there is any following job doesn’t work properly.
Now we are in development state and we found that we rely heavily on the log. 🥺Zach Angell
09/20/2021, 5:15 PMwit-labeling
?Jessica Smith
09/21/2021, 4:05 PMZach Angell
09/21/2021, 4:23 PMJessica Smith
09/21/2021, 4:25 PMwith ThreadPool(20) as pool:
results = pool.map(run_extract_process,records)
<http://logger.info|logger.info>(f"Starting extract for {v}")
from multiprocessing.dummy import Pool as ThreadPool
# if its not during a backend flow run, don't emit
if not context.get("running_with_backend"):
return
with prefect.context({"running_with_backend": True}):
<http://logger.info|logger.info>(f"Starting extract for {v}")
Zach Angell
09/21/2021, 8:35 PMprefect.context
is threadsafe, so the context is lost when running in a thread poolprefect.context
to run_extract_process
as an arg and then re-initialize in run_extract_proces
def run_extract_process(record, prefect_context):
with prefect.context(prefect_context):
# ... everything else
Jessica Smith
09/22/2021, 2:21 PMZach Angell
09/22/2021, 2:36 PMJessica Smith
09/22/2021, 2:37 PM