William Smith
08/24/2020, 9:16 AMTrever Mock
08/24/2020, 1:15 PMRichard Hughes
08/24/2020, 3:42 PMKyle McEntush
08/24/2020, 4:19 PMKyle McEntush
08/24/2020, 7:30 PMflow.add_task()
and task.set_upstream()
. Specifically, I want to make sure that any of my triggers are dependent only the tasks that I think they really are. For example, in my pipeline (image attached), I want the trigger for the next task that will come off of the valid_unit_reducer to be triggered by any_successful
for the valid_unit_reducer and not by any_successful
on the invalid_unit_reducer. Maybe this is the default behavior in Prefect, but my current understanding is that triggers are related to all tasks and not just the upstream task immediately before itMinakshi
08/25/2020, 12:50 AMModuleNotFoundError: No module named 'dask.system'
while importing File "/Users/mkorad/PycharmProjects/altruistic-armadillo/src/**", line 1, in <module>
from prefect import task, Flow, Parameter
Any idea about this error?Alfie
08/25/2020, 4:15 AMRobin
08/25/2020, 8:42 AMEKS cluster
using pulumi
and created an agent locally using prefect agent start kubernetes --token <token_id> --label k8s
.
It seems like the kubernetes agent is running correctly (see attached images). However, when I submit a flow that has the same label k8s
, it does not execute (see image)…
1. How do I make sure the kubernetes agent is set up properly?
2. Which tests does prefect cloud already perform by itself, to make sure that an agent is set up properly?William Smith
08/25/2020, 10:11 AMManuel Mourato
08/25/2020, 2:10 PMfrom prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from datetime import timedelta
from default_task_handler import tasks_notifications_handler
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
command="ls /home/my-user/")
a.run()
The task runs, and the weekly_execution
directory is created, but nothing is persisted.
What am I doing wrong?
Is it mandatory that the task be part of a flow?
Thank you
UPDATE
Indeed if I run the task inside a flow, checkpointing works.
Is there a way to do it for individual tasks?William Smith
08/25/2020, 2:30 PMLukas
08/25/2020, 3:38 PM[2020-08-25 15:34:14] DEBUG - prefect.CloudFlowRunner | Flow 'Fetch-Authors': start_time has not been reached; ending run.
. The flow is Submitted for execution
but fargate basically shuts down and stops the task. Any idea why this could happen? I ran this flow lot of times before, never experienced this.Jason Oban
08/25/2020, 5:01 PMKyle McEntush
08/25/2020, 6:26 PMvalidate_unit
tasks all get TriggerFailed
. My invalid_unit_reducer
is set to trigger for any failures. Is there a way I can set it to trigger for any failures that aren't a trigger failure, but rather a hard-fail? In my setup, a trigger failure should be treated differently than a true failure.Kyle McEntush
08/25/2020, 6:26 PMMarwan Sarieddine
08/25/2020, 7:39 PMstatus = client.get_flow_run_info(flow_run_id)
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 990, in get_flow_run_info
result = self.graphql(query).data.flow_run_by_pk # type: ignore
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 281, in graphql
retry_on_api_error=retry_on_api_error,
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 237, in post
retry_on_api_error=retry_on_api_error,
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _request
token = self.get_auth_token()
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 503, in get_auth_token
self._refresh_access_token()
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 630, in _refresh_access_token
token=self._refresh_token,
File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['refresh_token'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
Marwan Sarieddine
08/25/2020, 8:19 PMAn Hoang
08/25/2020, 11:07 PMTask
object has a version and a hash that is a combination of the version of the current Task object and all Task before it. The output file will contain the final/flow's hash. Any ideas on how to approach this differently?Riley Hun
08/26/2020, 1:19 AMHoward Cornwell
08/26/2020, 2:12 PMPREFECT__SERVER__DATABASE__HOST
but it doesn’t appear to work. ThanksSlackbot
08/26/2020, 4:38 PMPaweł
08/26/2020, 6:11 PMjosh
08/26/2020, 7:47 PM0.13.4
has been released and here are a few notable changes:
📚 New databricks task library task (I couldn’t find the brick emoji)
🕵️ Custom YAML for k8s agents
🔗 Coupled versioning for Core / Server / UI
A big thank you to our contributors who helped out with this release! Full changelog:Rob Fowler
08/26/2020, 10:34 PMRob Fowler
08/26/2020, 10:42 PMfrom prefect import Flow, task, Parameter
from flows.freq import request
@task(name="Creating an order")
def create_order(opts, user_agent, auth, js):
return request(opts, 'POST', f"{opts.try_url}/orders", user_agent, auth, js)
def create_service_requests(delivery_type):
return {'dt': delivery_type}
with Flow("azure_subscription_change_smacc") as flow:
opts = Parameter('opts')
user_agent = Parameter('user_agent')
auth = Parameter('auth')
regular_order_id_js = create_service_requests("Regular")
accelerated_order_id_js = create_service_requests("Accelerated")
regular_result = create_order(opts, user_agent, auth, regular_order_id_js)
accelerated_result = create_order(opts, user_agent, auth, accelerated_order_id_js)
if __name__ == '__main__':
state = flow.run(opts={}, user_agent="blah", auth="blahauth")
Rob Fowler
08/26/2020, 10:44 PMRob Fowler
08/26/2020, 11:12 PMdef check(state):
print(f"regular result: {state.result[regular_result].result}")
Bob Colner
08/27/2020, 3:17 AM0.11.2
to 0.13.4
my flow is failing with an error related to 'Cloud' -which is strange since I'm not using prefect cloud (or local server/UI). Looks like it is related to the slack notifications. Any ideas? (full logs posted in the thread)Alfie
08/27/2020, 3:58 AMSandeep Aggarwal
08/27/2020, 1:32 PMdelete_flow_run
mutation is clashing with Hasura's auto-generated mutation schemas.
I am self hosting Prefect and as part of data retention policy, need to cleanup old data objects. I am using below mutation to cleanup old flow/task runs:
mutation($created_before: timestamptz) {
delete_flow_run(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_flow_run_state(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_log(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_task_run(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_task_run_state(where: {created: {_lt: $created_before}}) {
affected_rows
}
}
The request fails with below error:
2020-08-27T12:40:17.586Z {"message":"Unknown argument \"where\" on field \"delete_flow_run\" of type \"Mutation\".","locations":[{"line":3,"column":37}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Cannot query field \"affected_rows\" on type \"success_payload\".","locations":[{"line":7,"column":9}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Field \"delete_flow_run\" argument \"input\" of type \"delete_flow_run_input!\" is required, but it was not provided.","locations":[{"line":3,"column":21}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
When I remove delete_flow_run
from above mutation, everything works fine.Sandeep Aggarwal
08/27/2020, 1:32 PMdelete_flow_run
mutation is clashing with Hasura's auto-generated mutation schemas.
I am self hosting Prefect and as part of data retention policy, need to cleanup old data objects. I am using below mutation to cleanup old flow/task runs:
mutation($created_before: timestamptz) {
delete_flow_run(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_flow_run_state(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_log(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_task_run(where: {created: {_lt: $created_before}}) {
affected_rows
}
delete_task_run_state(where: {created: {_lt: $created_before}}) {
affected_rows
}
}
The request fails with below error:
2020-08-27T12:40:17.586Z {"message":"Unknown argument \"where\" on field \"delete_flow_run\" of type \"Mutation\".","locations":[{"line":3,"column":37}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Cannot query field \"affected_rows\" on type \"success_payload\".","locations":[{"line":7,"column":9}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Field \"delete_flow_run\" argument \"input\" of type \"delete_flow_run_input!\" is required, but it was not provided.","locations":[{"line":3,"column":21}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
When I remove delete_flow_run
from above mutation, everything works fine.Dylan
08/27/2020, 1:54 PMaffected_rows
does look like something from Hasura’s schema, as our mutations only have success
and error
affected_rows
appear in the schema of the Interactive API?Sandeep Aggarwal
08/27/2020, 2:00 PMDylan
08/27/2020, 2:07 PMSandeep Aggarwal
08/27/2020, 2:10 PMDylan
08/27/2020, 2:10 PMSandeep Aggarwal
08/27/2020, 2:14 PMDylan
08/27/2020, 2:15 PMSandeep Aggarwal
08/27/2020, 2:16 PMDylan
08/27/2020, 2:16 PM