Verun Rahimtoola
02/02/2021, 1:10 AMGiovanni Giacco
02/02/2021, 1:35 AMJimmy Le
02/02/2021, 1:52 AMJosh Pitts
02/02/2021, 1:56 AMlocation
since the task argument is what makes the task unique, but that’s not available to the context object as far as I can tell. Am I missing something obvious for this use case?Eric
02/02/2021, 3:52 AMprefect/cli/server.py
shows user can specify different ports to start the Prefect server services (i.e. graphql_port='xxxx', server_port='xxxx', etc.). But when I used the flow.register(project_name) in my script, the prefect/core/flow.py
seems like using default 4200 port to register the flow and had no param to adjust the registering port in register()
function, so it raised HTTPConnection error. Once I changed line 1666 - flow.py client = prefect.Client()
with client = prefect.Client(api_server='<http://localhost>:MODIFIED_PORT/')
and it works. Is this an issue here? Thank you 🙂Jitesh Khandelwal
02/02/2021, 10:49 AMDmitry Klionsky
02/02/2021, 11:45 AMimport random
import prefect
from prefect import task, Flow
from prefect import Parameter
from prefect.engine.signals import FAIL, LOOP, PAUSE
@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_ips() -> list:
if random.random() > 0.8:
raise FAIL("Failed to get IPs")
count = random.randint(2, 6)
ips = [f"IP-{i}" for i in range(1, count)]
print(ips)
return ips
@task
def seq_reduce(func, init: any, lst: list) -> any:
loop_payload = prefect.context.get("task_loop_result", {})
index = loop_payload.get("index", 0)
acc = loop_payload.get("acc", init)
if index >= len(lst):
return acc
acc = func(lst[index], acc)
raise LOOP(message=f"acc={acc} index={index}", result=dict(acc=acc, index=index+1))
def seq_map(func, lst: list) -> list:
def aux(x, acc: list) -> list:
acc.append(func(x))
return acc
return seq_reduce(aux, [], lst)
@task
def stop_ip(ip: str) -> None:
print("stop_ip: " + ip)
pass
@task
def wait_for_ip(ip: str) -> str:
print("wait_for_ip: " + ip)
return ip + "-new"
@task
def init_ip(ip: str) -> None:
print("init_ip: " + ip)
if random.random() > 0.5:
raise PAUSE("Initing IP failed. Waiting for human input")
return "Done"
with Flow("process_ip") as process_ip_flow:
old_ip = Parameter("old_ip")
stop_ip(old_ip)
new_ip = wait_for_ip(old_ip)
init_ip(new_ip)
def process_ip(old_ip):
state = process_ip_flow.run(old_ip=old_ip)
#process_ip_flow.visualize(flow_state=state)
return state.result[new_ip].result
with Flow("process_ips") as process_ips_flow:
old_ips = get_ips()
new_ips = seq_map(process_ip, old_ips)
if __name__ == "__main__":
state = process_ips_flow.run()
#process_ips_flow.visualize(flow_state=state)
print(state.result[new_ips].result)
Is the above what Prefect is designed for? Are there some other approaches?Arnoldas Bankauskas
02/02/2021, 12:54 PMSamuel Hinton
02/02/2021, 1:55 PMMatthew Blau
02/02/2021, 5:00 PMdef post_to_slack(task, old_state, new_state):
my_secret = PrefectSecret("SLACK_WEBHOOK_URL").run()
if new_state.is_retrying():
msg = "Task {0} failed and is retrying at {1}".format(task, new_state.start_time)
# replace URL with your Slack webhook URL
<http://requests.post|requests.post>(my_secret, json={"text": msg})
elif new_state.is_failed():
msg = "Task {0} failed".format(task)
# replace URL with your Slack webhook URL
<http://requests.post|requests.post>(my_secret, json={"text": msg})
return new_state
I specifically receive this message if I leave the code as it is above:
Exception raised while calling state handlers: ClientError('400 Client Error: Bad Request for url: <http://host.docker.internal:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n GRAPHQL_VALIDATION_FAILED: Cannot query field "secret_value" on type "Query".\n\nThe GraphQL query was:\n\n query($name: String!) {\n secret_value(name: $name)\n }\n\nThe passed variables were:\n\n {"name": "SLACK_WEBHOOK_URL"}\n')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
value = secrets[self.name]
KeyError: 'SLACK_WEBHOOK_URL'
Thank you in advance!Verun Rahimtoola
02/02/2021, 5:32 PMJan Marais
02/02/2021, 5:43 PMprefect agent ecs start
with any of the aws cli ways
• setting task_role_arn
on either agent start or ECSRun
• setting execution_role_arn
on either agent start or ECSRun
• In the task container
Any insight on the differences of these would also be appreciated.Adam Brusselback
02/02/2021, 5:45 PMMitchell Bregman
02/02/2021, 8:46 PMBK Lau
02/02/2021, 9:11 PMPrefect Agent
responsible for passing output of one task as input of another task in Prefect flow DAG since the Prefect server manages no code or input/output data. i'm trying to wrap my heads around what is the responsibility of the Agent. Maybe an architecture diagram might help.Mohammed Khan
02/02/2021, 9:16 PMpython a_flow.py
python b_flow.py
python parentflow.py
[2021-02-02 15:19:53-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'parent-flow'
[2021-02-02 15:19:53-0500] INFO - prefect.TaskRunner | Task 'Flow a': Starting task run...
BK Lau
02/02/2021, 10:34 PMArgo Workflow
, some random string is generated and appended to a workflow to disambiquate 2 workflows having the same name.Marwan Sarieddine
02/02/2021, 11:25 PMitay livni
02/03/2021, 12:21 AMVerun Rahimtoola
02/03/2021, 2:26 AMCloudTaskRunner
seems to use a LocalResult
to persist the results of every task... can someone point me to where this result object gets created?Verun Rahimtoola
02/03/2021, 2:46 AMVitaly Shulgin
02/03/2021, 6:21 AMprefecthq/prefect:all_extras-0.13.9
to prefecthq/prefect:0.14.5-python3.8
and now a lot of tasks a marked as failed because of State Message:
No heartbeat detected from the remote task; marking the run as failed.
Jitesh Khandelwal
02/03/2021, 8:00 AMLaura Vaida
02/03/2021, 12:33 PMJan Marais
02/03/2021, 1:21 PMFailed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'project\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Using ECSAgent and a custom image in ECR. I have no problem importing from the external module inside the container or in the environment where my agent is launched or where my flow is registered from.Samuel Hinton
02/03/2021, 1:50 PMSamuel Hinton
02/03/2021, 1:56 PM_____ _____ ______ ______ ______ _____ _______
| __ \| __ \| ____| ____| ____/ ____|__ __|
| |__) | |__) | |__ | |__ | |__ | | | |
| ___/| _ /| __| | __| | __|| | | |
| | | | \ \| |____| | | |___| |____ | |
|_| |_| \_\______|_| |______\_____| |_|
Thanks for using Prefect!!!
This is the official docker image for Prefect Core, intended for executing
Prefect Flows. For more information, please see the docs:
<https://docs.prefect.io/core/getting_started/installation.html#docker>
Samuel Hinton
02/03/2021, 2:46 PMprefect server start
locally (which launches fine and gets me to the Welcome to PREFECT SERVER ascii splash screen). Navigating to localhost:8080 shows me connect to the server (and pull down the favicon+title) but the web page is empty (completely white). I can see its got content and scripts in the html, but theres this error and nothing is every made visible in the UI:Mitchell Bregman
02/03/2021, 3:00 PMSamuel Hinton
02/03/2021, 3:21 PMprefect backend server
prefect server create-tenant --name default --slug default
And doing this does seem to fix the problem. Is there a way I can specify in the config options or somewhere in the docker-compose that I’d like to ensure this tenant exists on server startup?