Huw Ringer
10/23/2021, 11:01 PMmap
method: process_tasks.map(flow_description, start_task_id)
My problem is that if one of the mapped tasks fails, the other ones still subsequently run regardless, and I’d like for the whole flow to be cancelled programatically/automatically when that happens. I think the CancelFlowRun API method should be able to do this if I invoke it just before the failed task enters the failed state (via a raise exception). I’m struggling to figure out how to call this thing though, and was hoping someone might be able to help, please. If I try the following:
prefect.tasks.prefect.flow_run_cancel.CancelFlowRun.run(context.get("flow_run_id"))
My python engine says it can’t find the flow_run_cancel
method in any imported Prefect module, and I’ve already imported task, Flow, Parameter, context, Task, tasks
(for good measure). I’m clearly doing something very wrong and don’t know enough to figure out what it is. Would be very grateful for any help or suggestions to put me out of my misery. Thanks in advance!haf
10/24/2021, 7:37 AMGabi Pi
10/24/2021, 12:17 PMRunNamespacedJob
task over Kubernetes. I am using the following code:
storage = S3(bucket="gavriel-test", stored_as_script=True)
kubernetes_run_conf = KubernetesRun(
env={
"AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
},
labels=["prefect-poc-k8s-agent"]
)
body = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': 'echo'},
'spec':
{
'template':
{
'spec': {
'containers': [
{
'name': 'echo',
'image': 'alpine:3.7',
'command': ['sh -c "echo Hello!!!"; sleep 10']
}
]
}
},
'backoffLimit': 4
}
}
with Flow("kubernetes-CreateNamespacedJob", run_config=kubernetes_run_conf, storage=storage) as flow:
job = RunNamespacedJob(body=body, namespace="prefect", delete_job_after_completion=False)
But I keep getting the following error:
Error during execution of task: MaxRetryError("HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8c8adeb650>: Failed to establish a new connection: [Errno 111] Connection refused'))")
Do you have any idea how to resolve this?Royzac
10/24/2021, 11:08 PMTony Yun
10/25/2021, 3:09 AMLukáš Polák
10/25/2021, 7:32 AMGiovanni Giacco
10/25/2021, 10:15 AMRichard Hughes
10/25/2021, 2:07 PMPhilip MacMenamin
10/25/2021, 2:50 PMraise signals.FAIL('Didn't work')
I do not want the flow to proceed. However, other downstream tasks occur despite the exception being raised. Is this expected behavior?jcozar
10/25/2021, 4:07 PMKevin
10/25/2021, 4:20 PMIhor Bondartcov
10/25/2021, 5:26 PMTony Yun
10/25/2021, 6:56 PMalways_run/all_finished
trigger to this task, but I can’t simply just add it to the end of parameters. Because this is not a @task
way of definition, don’t know how to:
deletePod = kpod.DeleteNamespacedPod(kubernetes_api_key_secret=None)
Steve s
10/25/2021, 7:56 PMcreate_flow_run.map(...)
to run a single flow multiple times with different parameters. I have that working, but unfortunately I need the created flows to run in a specific order. is there a way to force that behavior?KhTan
10/25/2021, 11:37 PMdavzucky
10/26/2021, 12:40 AMorion agent start
and I set the following env variable to connect to the orion server
PREFECT_ORION_API_HOST: orion_server
PREFECT_ORION_API_PORT: 4200
the agent still try to create a database in this case using sqllite on the agent?davzucky
10/26/2021, 12:44 AMTara
10/26/2021, 4:30 AMGabi Pi
10/26/2021, 6:53 AMYou could override the automatic version promotion to unarchive and enable old versions (for example, for A/B testing)How can I do that?
Jai Deo
10/26/2021, 8:02 AMThomas Furmston
10/26/2021, 8:46 AMEric Feldman
10/26/2021, 8:56 AMget_flow_run_info
and until the state is finished?Will List
10/26/2021, 10:13 AMMilly gupta
10/26/2021, 11:53 AMToprak Nihat Deniz Öztürk
10/26/2021, 12:15 PMEric Feldman
10/26/2021, 3:04 PMdef fetch_result(data):
# data ?
raise
class GetData(Task):
def run(self):
return {'data': 7}
recipe_flow = StartFlowRun(flow_name="batch", project_name="proj", wait=True)
with Flow(name='schedule') as sched:
recipe_flow.set_upstream(GetData(), key='parameters')
FunctionTask(fetch_result)(data=recipe_flow)
but the only thing I get as data
is prefect.engine.signals.SUCCESS
object
calling data.state
isn’t really helpfull, and data.state.result
I get the prefect.engine.signals.SUCCESS
all over again
is there any way to get the real tasks/flowrunid/data of the the inner flow?Josh
10/26/2021, 3:51 PMdocker run -env FOO=BAR <my_docker_image>
I would like to be able to execute a flow on this image with a Docker Agent by calling the flow with the parameters
{
"FOO": "BAR"
}
Is this possible with Prefect? Or is there a Prefect idiom for such a concept?
Use cases for changing environment variables on container create/flow execution would be
1. defining which customer code path we want to trigger (database, configs, special methods and permissions)
2. Setting how we want to persist data (which database type to use, how to cache)haf
10/26/2021, 5:39 PM@task(result=...)
because I then have to specify the result type to be either local or GCS. How do you handle this?Dominic Pham
10/26/2021, 6:57 PM@task
def query_that_will_return_a_list(): -> list
@task
def scrapy_api_call_chunks(title_list):
loop_payload = prefect.context.get("task_loop_count", 0)
title_list_grouper = list(grouper(title_list, 10))
if loop_payload <= len(title_list_grouper):
# Each loop will be an iteration of 10 titles. # of loops * 10 will result in the total number of titles looped over so far
raise LOOP(message = 'Running the next 10 items in job titles list')
scraper_class = Scraper()
scraper_class.instantiate_web_scraper(title_list_grouper[loop_payload - 2])
I feel like I don't fully understand how to utilize LOOP in the context of passing information to another function inside the task.Samuel Hinton
10/26/2021, 10:43 PMSamuel Hinton
10/26/2021, 10:43 PMKevin Kho
10/26/2021, 10:46 PMSamuel Hinton
10/26/2021, 10:49 PMKevin Kho
10/26/2021, 10:52 PMSamuel Hinton
10/26/2021, 10:54 PMKevin Kho
10/26/2021, 11:00 PMSamuel Hinton
10/26/2021, 11:02 PMKevin Kho
10/26/2021, 11:12 PMClient().graphql()
to hit the API. The state handler would be applied on the scheduled -> running transition.
In this state handler, use the Client to query for the number of flows running.
query {
flow (where:
{name: {_eq: "..."},
project: {name: {_eq: "bristech"}},
archived: {_eq: false}}) {
name
project {
name
}
archived
flow_runs(where: {state: {_eq: "Running"}}){
id
state
}
}
}
And then use Python to count the number.
If the number of flows is at the concurrency you want for that flow, then just return SUCCESSSamuel Hinton
10/26/2021, 11:14 PMKevin Kho
10/26/2021, 11:15 PMSamuel Hinton
10/26/2021, 11:15 PMKevin Kho
10/26/2021, 11:16 PMSamuel Hinton
10/26/2021, 11:37 PMKevin Kho
10/27/2021, 1:12 AMAnna Geller
10/27/2021, 9:29 AMI would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”There is also a UI feature to clear late runs with one click.
Samuel Hinton
10/27/2021, 9:54 AMKevin Mullins
10/27/2021, 5:50 PMKevin Kho
10/27/2021, 5:52 PMKevin Mullins
10/27/2021, 5:53 PMSamuel Hinton
10/27/2021, 5:55 PMdef state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if old_state.is_pending() and new_state.is_running():
client = Client()
now = dt.now(tz=pytz.UTC)
result = client.graphql(
"""{
flow(where: {
archived: {_eq: false},
name: {_eq: "%s"}
}) {
name
archived
flow_runs (where: {
state: {_in: ["Scheduled", "Retrying", "Running"]},
scheduled_start_time:{_lte: "%s"}
}) {
scheduled_start_time
start_time
name
state
id
}
}"""
% (flow.name, now.isoformat())
)
# These flow runs will be everything thats scheduled to start in the past and
# might have built up.
flow_runs = result["data"]["flow"]["flow_runs"]
# I dont want to run another task if:
# 1. Theres already a flow in the running state
# 2. If there are multiple scheduled, only the latest one should be run
any_running = any([f["state"] == "Running" for f in flow_runs])
if any_running:
return Cancelled("Existing tasks are already running")
scheduled = [f for f in flow_runs if f["state"] in ("Pending", "Scheduled")]
if len(scheduled) > 1:
last_scheduled_time = max([dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled])
this_flow_run = None
# How do I get the flow run id? It doesnt seem to be in the Flow, and not in the State either
pass
Kevin Kho
10/29/2021, 2:09 PMprefect.context.get("flow_run_id")
inside itSamuel Hinton
10/29/2021, 2:10 PMflow.context.get(…)
work too, or is prefect.context
going to do some scope matching or similar to know whats going on?Kevin Kho
10/29/2021, 2:19 PMSamuel Hinton
10/29/2021, 2:23 PMflow_run_context
inside Flow itself, but its never persisted as an attribute, apologiesKevin Kho
10/29/2021, 2:42 PMSamuel Hinton
10/29/2021, 2:55 PMKevin Kho
10/29/2021, 2:57 PMSamuel Hinton
10/29/2021, 2:59 PMimport datetime
from datetime import timedelta, datetime as dt
import json
import os
import gc
from typing import Optional
import pytz
import requests
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from prefect.engine.signals import SKIP
from prefect.engine.state import Cancelled, State
from prefect.client import Client
def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if old_state.is_pending() and new_state.is_running():
client = Client()
now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
# Replacing microseconds because graphql api cant always handle the number of decimals
result = client.graphql(
"""{
flow(where: {
archived: {_eq: false},
name: {_eq: "%s"}
}) {
name
archived
flow_runs (where: {
state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]},
scheduled_start_time:{_lte: "%s"}
}) {
scheduled_start_time
start_time
name
state
id
}
}
}"""
% (flow.name, now.isoformat()) # Sorry for % operator, but those {} make it a pain
)
# These flow runs will be everything thats scheduled to start in the past and
# might have built up.
logger = prefect.context.get("logger")
# This might fail if the GraphQL cant find anything, but havent seen this in practise
flow_runs = result["data"]["flow"][0]["flow_runs"]
# I dont want to run another task if theres already more than one flow running
# For me, Im happy to have two running at once, as API issues means we can get timeouts and
# hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
# any running
num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
if num_running > 1:
msg = "Existing tasks are already running"
<http://logger.info|logger.info>(msg)
return Cancelled(msg)
# And if there are multiple scheduled, only the latest one should be run
scheduled = [
f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
]
if len(scheduled) > 1:
last_scheduled_time = max(
[dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
)
this_flow_run_id = prefect.context.get("flow_run_id")
matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
if not matching_runs:
<http://logger.info|logger.info>(f"Current id is {this_flow_run_id}")
<http://logger.info|logger.info>(f"Flow runs are: {scheduled}")
return Cancelled("Nope")
this_run = matching_runs[0]
this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
if this_run_time != last_scheduled_time:
msg = "Multiple scheduled tasks, this is not the last one"
<http://logger.info|logger.info>(msg)
return Cancelled(msg)
return new_state
Will allow a max of two concurrent running jobs (can easily change this to one), and if multiple jobs are scheduled (ie your agent was down for a while and is now back up), only the last one will execute and the others will log and cancel. Not all the imports are necessary, havent filtered out the imports from the other handlers and common functions in the file.Kevin Mullins
11/02/2021, 2:35 PMKevin Kho
11/02/2021, 2:36 PMSamuel Hinton
11/02/2021, 3:01 PMKevin Kho
11/02/2021, 3:04 PM