Dylan
06/11/2020, 5:41 PMNazeer Hussain
06/11/2020, 6:04 PMNazeer Hussain
06/11/2020, 6:06 PMChris Vrooman
06/11/2020, 7:03 PMjorwoods
06/11/2020, 9:49 PMtarget
kwarg to the task caches the result as expected. I will try to contribute back to the documentation soon now that I have a better understanding of that vs a Result's location
.
Here is a modified version of the toy example I used before, this time I am trying to get it to forcibly ignore the cache and recompute. It is not considering the tasks' cache as still valid, despite a short cache_for
and a validator of never_use
. I know that cache_for
and cache_validator
in Task
are deprecated. I tried specifying the validator on the LocalResult
as you will see in the example.
from prefect import Flow, task, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.executors import DaskExecutor
from prefect.engine.cache_validators import all_parameters, never_use
import datetime
delta=datetime.timedelta(seconds=5)
lr = LocalResult(validators=[never_use])
@task(log_stdout=True, checkpoint=True,
target='{flow_name}-{task_name}-{parameters[x]}-{parameters[y]}.pkl',
cache_validator=never_use, cache_for=delta)
def add(x, y):
print(f'add ran with {x} {y}')
try:
return sum(x) + y
except TypeError:
return x + y
with Flow('iterated map', result=lr) as flow:
y = unmapped(Parameter('y', default=8))
x = Parameter('x', default=[1,2,3])
mapped_result = add.map(x, y=y)
out = add(mapped_result, y)
if __name__ == "__main__":
flow.run(executor=DaskExecutor(),)
Howard Cornwell
06/12/2020, 8:26 AMimport prefect
from prefect import Flow, task
from prefect.engine.results import PrefectResult
from random import random
@task
def make_data():
return list(range(5))
@task
def randomly_raise(x):
x = random()
if x <= 0.5:
raise Exception()
with Flow("test retries", result=PrefectResult()) as flow:
data = make_data()
randomly_raise.map(data)
When the above runs, it creates 5 mapped tasks, each of which randomly fail with 50% chance. When there are failed runs, if I click re-run on the failed tasks, I can get them to individually re-run & succeed. But, re-running on the entire flow never attempts the mapped tasks again. Here’s what’s logged
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise': task is mapped, but run will proceed so children are generated."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise': Handling state change from Mapped to Mapped"
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[0]': Starting task run..."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[0]': task is already finished."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[0]': finished task run for task with final state: 'Success'"
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[1]': Starting task run..."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[1]': task is already finished."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[1]': finished task run for task with final state: 'Success'"
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[2]': Starting task run..."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[2]': task is already finished."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[2]': finished task run for task with final state: 'Failed'"
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[3]': Starting task run..."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[3]': task is already finished."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[3]': finished task run for task with final state: 'Failed'"
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[4]': Starting task run..."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,DEBUG,"Task 'randomly_raise[4]': task is already finished."
June 12th 2020,9:22:35am ,prefect.CloudTaskRunner,INFO,"Task 'randomly_raise[4]': finished task run for task with final state: 'Failed'"
Is there a way to re-run a whole group of mapped tasks?Nazeer Hussain
06/12/2020, 9:09 AMNazeer Hussain
06/12/2020, 9:10 AMNazeer Hussain
06/12/2020, 9:11 AMNazeer Hussain
06/12/2020, 9:11 AMNazeer Hussain
06/12/2020, 9:11 AMNazeer Hussain
06/12/2020, 2:51 PMNazeer Hussain
06/12/2020, 2:51 PMBen Davison
06/12/2020, 4:19 PMpython my_flow.py
with the register method to respect that?Aaron Y
06/12/2020, 5:57 PMDan DiPasquo
06/12/2020, 6:39 PMUnexpected error: Forbidden("POST <https://storage.googleapis.com/upload/storage/v1/b>...
I don't see logging about having retrieved the GCP_CREDENTIALS as I might if I were explicitly using a Secret task, so it's a bit opaqueDan DiPasquo
06/12/2020, 9:31 PMDigo
06/15/2020, 7:13 AMTomas Thornquist
06/15/2020, 7:23 AMemre
06/15/2020, 10:13 AMFargateTaskEnvironment
seems to only support Docker
as the underlying flow storage. Is that the case, and if so, are there any plans to support S3
storage as well?Howard Cornwell
06/15/2020, 10:37 AMgraphql_1 | ERROR: State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
graphql_1 |
graphql_1 | GraphQL request:6:7
graphql_1 | 5 | status
graphql_1 | 6 | id
graphql_1 | | ^
graphql_1 | 7 | }
graphql_1 | Traceback (most recent call last):
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 668, in complete_value_catching_error
graphql_1 | return_type, field_nodes, info, path, result
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
graphql_1 | raise result
graphql_1 | File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
graphql_1 | task_run_id=state_input["task_run_id"], state=state,
graphql_1 | File "/prefect-server/src/prefect_server/api/states.py", line 91, in set_task_run_state
graphql_1 | f"State update failed for task run ID {task_run_id}: provided "
graphql_1 | ValueError: State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
graphql_1 |
graphql_1 | The above exception was the direct cause of the following exception:
graphql_1 |
graphql_1 | Traceback (most recent call last):
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 668, in complete_value_catching_error
graphql_1 | return_type, field_nodes, info, path, result
graphql_1 | File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
graphql_1 | raise result
graphql_1 | File "/prefect-server/src/prefect_server/graphql/states.py", line 73, in set_state
graphql_1 | task_run_id=state_input["task_run_id"], state=state,
graphql_1 | File "/prefect-server/src/prefect_server/api/states.py", line 91, in set_task_run_state
graphql_1 | f"State update failed for task run ID {task_run_id}: provided "
graphql_1 | graphql.error.graphql_error.GraphQLError: State update failed for task run ID 191a019d-4e0a-4692-a05a-823659c3f535: provided a running state but associated flow run 635f983d-8082-49c3-9dcd-af694e400ced is not in a running state.
graphql_1 |
graphql_1 | GraphQL request:6:7
graphql_1 | 5 | status
graphql_1 | 6 | id
graphql_1 | | ^
graphql_1 | 7 | }
Darragh
06/15/2020, 10:59 AMitay livni
06/15/2020, 2:08 PMLocalExcecutor
using a FargateAgent
. The error I am getting is.
Error while deploying flow: InvalidParameterException("An error occurred (InvalidParameterException) when calling the RunTask operation: Network Configuration must be provided when networkMode 'awsvpc' is specified.")
The FargateAgent
NetworkMode
is configured like this:
networkMode={
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"subnets": ["subnet-****"],
"securityGroups": ["sg-****"]
}
},
Any ideas? Thanks.jeff sadler
06/15/2020, 3:17 PMRafal
06/15/2020, 4:07 PMraise AuthorizationError("No agent API token provided.")
prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
- I've got this error even though server is up and running. Any ideas?Ben Davison
06/15/2020, 4:34 PMRafal
06/15/2020, 4:46 PMUnexpected error: ImageNotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create')>)
- this error occurs when I am trying to run my local docker image. I was trying to change docker_server_url but nothing worked. Error Unexpected error: DockerException('Invalid bind address format: /var/run/docker.sock')
. Any ideas?Jackson Maxfield Brown
06/15/2020, 6:07 PMids = [some_iterable_of_data_ids_to_retrieve]
with Flow() as flow:
big_mixed_result = get_data.map(ids)
filtered_result = filter_data(big_mixed_result)
processed = process_data.map(filtered_result)
I run the flow using a distributed.LocalCluster
/ DaskExecutor
and when it hits the process_data
task, I get the:
"UserWarning: Large object ... detected in task graph... consider using Client.scatter"
Prefect / Dask tries to continue on but fails and restarts the Flow after workers hit memory limit.
I guess what is confusing to me is that I am doing a map
operation on the task so I wouldn't expect any large object being transferred between workers. I would have assumed that the map
call only sends each small iteration but I guess that's not the case?
One idea I was considering was instead of using a List[Dict]
instead using a dask.bag
to split the memory across workers maybe? I don't really know, any and all ideas welcome.Adam Kelleher
06/15/2020, 9:13 PMDan Kerrigan
06/15/2020, 9:43 PMDan Kerrigan
06/15/2020, 9:43 PMimport time
from prefect import Client, task, Flow
client = Client()
@task
def dummy_task():
return 42
with Flow("meaning of life") as flow:
life_meaning = dummy_task()
flow_id = flow.register()
flow_run_id = client.create_flow_run(flow_id)
state = client.get_flow_run_info(flow_run_id).state
while not state.is_finished():
print(f"{state.message} Sleeping 5.")
time.sleep(5)
state = client.get_flow_run_info(flow_run_id).state
print(f"Flow finished. {state.message}")
assert state.is_successful()
print(f"Result is None? {state.result is None}")
Result check: OK
Flow: <http://localhost:8080/flow/015ce980-762a-48cf-a864-1f9c5bafb2a8>
Flow run scheduled. Sleeping 5.
Flow run scheduled. Sleeping 5.
Flow finished. All reference tasks succeeded.
Result is None? True
Kyle Moon-Wright
06/15/2020, 10:25 PMtask_run
of your flow. In this case, we can do something like this:
from prefect import Client
c = Client()
# Getting task runs
flow_run = c.get_flow_run_info("YOUR_FLOW_RUN_ID")
task_runs = flow_run.task_runs
# Grabbing the location of a task's result
# (in this case the first task in the list)
location = task_runs[0].state._result.location
# Grabbing the result of a specific task (based on location)
task_runs[0].state._result.read(location)
I hope that makes sense, let me know if we can clarify this to suit your provided flow.Dan Kerrigan
06/16/2020, 12:36 PM