Matthias Busch
05/16/2020, 7:18 PMNate Atkins
05/16/2020, 8:02 PMwith Flow("Existing") as flow:
file_b = file_builder("b.data")
file_b.trigger = upstream_dependency(src_fpaths=["a.data"])
Proposed:
with Flow("Proposed") as flow:
file_b = file_builder("b.data", trigger=upstream_dependency(src_fpaths="a.data"))
Example Trigger:
@curry
def upstream_dependency(
upstream_states: Callable[[Dict["core.Edge", "state.State"]], bool],
source_fpaths: List[Path] = None,
) -> bool:
if not all(s.is_successful() for s in _get_all_states_as_set(upstream_states)):
raise signals.TRIGGERFAIL(
'Trigger was "all_successful" but some of the upstream tasks failed.'
)
# Everything skipped. Check to see if any file dependencies require task to run.
run_required = file_dependency_skipper(source_fpaths)
if run_required:
return True
else:
raise signals.SKIP("All dependencies are up to date.")
I hacked this together with the following change. In prefect.core.task
def bind(
self,
*args: Any,
mapped: bool = False,
upstream_tasks: Iterable[Any] = None,
trigger: Callable[[Dict["core.Edge", "state.State"]], bool] = None,
flow: "Flow" = None,
**kwargs: Any
) -> "Task":
self.trigger = trigger
If I build the task by inheritance instead of the @task decorator, I can pass the trigger. I'm not sure if we want to dump all the parameters into the parameters for constructing a task as the list is pretty short now. upstream_tasks
being the most similar one that is already in the signature of bind.itay livni
05/16/2020, 9:19 PMS3Result
and receiving a
botocore.errorfactory.NoSuchKey: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist
Which upon further research - it can be anything including a permission error. (I tried different buckets with settings) The credentials are stored as AWS_CREDENTIALS in prefect cloud. With the config.toml set to use cloud secrets
[cloud]
use_local_secrets = false
Switching back to result_handler
argument with S3Result
subclass did work, . And combining result handler
with target
does not. Is there something different in the way that credentials are handled between result
and result_handler
?
The new prefect is really nice ๐Brad
05/17/2020, 12:45 AMlog_stdout
, it looks like maybe some references were changed from result
to value
here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L938Nate Atkins
05/17/2020, 1:22 AMtarget
parameter on tasks. This in conjunction with the addition of the upstream edges
on the trigger signature almost got me the ability to rebuild the current task target if the upstream was updated. I did a little trickery if the current task target needs to be rebuilt, I delete the target cache file and then raise a RETRY signal. When the task retries it can't find the cache file runs the task.
The only problem I have is that if the upstream task didn't run and update, and the current task doesn't need to run - what do I raise/return from the trigger to get the task to use the cached results?
True: The task will run
False: The flow will fail
SUCCESS: No cached results to pass on to the next task.Nate Atkins
05/17/2020, 5:52 PMcached
state to purple #800080. Now that I'm using a bunch of cached stuff with the new target
parameter, I want to know if things were run or used the cached results. I find the success
and cached
green colors hard to distinguish.Matthias
05/18/2020, 9:28 AMERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'message': "{'_result': {'type': ['Unsupported value: LocalResult']}}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
cache_for=self.task.cache_for,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
version=version,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 226, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': "{'_result': {'type': ['Unsupported value: LocalResult']}}", 'locations': [{'line': 4, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Florian K. (He/Him)
05/18/2020, 4:21 PMprefect server start
Pulling postgres ... pulling from library/postgres
Pulling hasura ... pulling from hasura/graphql-engine
Pulling graphql ... pulling from prefecthq/server
Pulling scheduler ... pulling from prefecthq/server
Pulling apollo ... pulling from prefecthq/apollo
Pulling ui ... pulling from prefecthq/ui
ERROR: for postgres no matching manifest for windows/amd64 10.0.17763 in the manifest list entries
ERROR: for ui image operating system "linux" cannot be used on this platform
ERROR: for scheduler image operating system "linux" cannot be used on this platform
ERROR: for graphql image operating system "linux" cannot be used on this platform
ERROR: for apollo image operating system "linux" cannot be used on this platform
ERROR: for hasura image operating system "linux" cannot be used on this platform
ERROR: image operating system "linux" cannot be used on this platform
Exception caught; killing services (press ctrl-C to force)
Had anyone else similar issues before?
Thanks!
p.S.: the Docker installation looks good. At least the Hello World image works flawlessly.
docker-compose --version
docker-compose version 1.25.5, build 8a1c60f6
(RodeoEnv) PS C:\windows\system32> docker --version
Docker version 19.03.5, build 2ee0c57608
(RodeoEnv) PS C:\windows\system32>
Will Milner
05/18/2020, 4:38 PMsecrets
argument in prefect docker storage object? I tried doing
flow.storage = Docker(....,secrets=["SECRET_NAME"])
But when I try running the flow It fails to start and I get this error message from apollo
{
"message": "Cannot query field \"secret_value\" on type \"Query\".",
"locations": [
{
"line": 2,
"column": 9
}
],
"extensions": {
"code": "GRAPHQL_VALIDATION_FAILED"
}
}
I want to avoid sending in my secret values as environment variables to my docker images so was hoping the secrets arg would solve this for meZach
05/18/2020, 7:44 PMMatthias Busch
05/18/2020, 8:12 PMNoah Nethery
05/18/2020, 8:14 PMfrom prefect import task, Flow, Parameter
from prefect import Client
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
return True
@task
def print_completed(status):
if status:
print('Success')
with Flow("Simple-Flow") as flow:
name = Parameter('name')
task1 = say_hello(name)
print_completed(task1)
client = Client()
flow.register(project_name="Hello K8s")
Nicolas Michel
05/19/2020, 7:44 AMAzuma
05/19/2020, 2:15 PMAlex Cano
05/19/2020, 3:33 PMAzuma
05/19/2020, 4:07 PMBarry Roszak
05/19/2020, 4:15 PMQuestionnaire
05/19/2020, 4:26 PMZach
05/19/2020, 4:45 PMTraceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 135, in <module>
flows = cloudpickle_deserialization_check(flow_file_path)
File "/opt/prefect/healthcheck.py", line 40, in cloudpickle_deserialization_check
flows.append(cloudpickle.load(f))
ModuleNotFoundError: No module named 'prefect.engine.results.prefect_result'
Zach
05/19/2020, 4:47 PMprefect_result
module does exist in the prefect package code that I have installed. So why is it saying it doesn't? I can't see how my code would cause this to break, since all the errors seem to be coming from the prefect package codeZach
05/19/2020, 4:49 PMMarwan Sarieddine
05/19/2020, 5:29 PM0.10.7
and I am getting the below error in the logs
19 May 2020,01:03:00 agent INFO Submitted for execution: Job prefect-job-d7e49926
19 May 2020,01:03:26 prefect.CloudFlowRunner INFO Beginning Flow run for 'Run a Prefect Flow in Docker'
19 May 2020,01:03:27 prefect.CloudFlowRunner INFO Starting flow run.
19 May 2020,01:03:27 prefect.CloudTaskRunner INFO Task 'PullImage': Starting task run...
19 May 2020,01:03:27 prefect.CloudTaskRunner ERROR Unexpected error: DockerException("Error while fetching server API version: HTTPConnectionPool(host='localhost', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8768ed7c10>: Failed to establish a new connection: [Errno 111] Connection refused'))")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/usr/local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
It seems that localhost:2375
(docker daemon address is not reachable for some reason) - anyone faced this issue before ?Sanjay Patel
05/19/2020, 7:12 PMchangesย =ย generator_task.map(
ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย sim,ย
ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย unmapped(x),ย
ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย unmapped(y),
ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย unmapped(z),
ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย ย **kwargs #need this unmapped
ย ย ย ย ย ย ย ย ย ย ย ย )
hakki cankaya
05/19/2020, 7:43 PMhakki cankaya
05/19/2020, 7:53 PMDan DiPasquo
05/19/2020, 8:03 PMTIMESTAMP LEVEL MESSAGE
2020-05-19T18:58:29.732839+00:00 INFO Submitted for execution: Job prefect-job-35b1423b
2020-05-19T19:04:28.107917+00:00 INFO Beginning Flow run for 'compute_**_flow'
2020-05-19T19:04:28.240831+00:00 INFO Starting flow run.
2020-05-19T19:04:28.241095+00:00 DEBUG Flow 'compute_**_flow': Handling state change from Scheduled to Running
2020-05-19T19:05:12.954233+00:00 INFO Task 'compute_**_task': Starting task run...
2020-05-19T19:05:12.95458+00:00 DEBUG Task 'compute_**_task': Handling state change from Pending to Running
2020-05-19T19:05:13.210047+00:00 DEBUG Task 'compute_**_task': Calling task.run() method...
2020-05-19T19:22:07.863766+00:00 INFO Beginning Flow run for 'compute_**_flow'
2020-05-19T19:22:08.577243+00:00 INFO Task 'compute_**_task': Starting task run...
2020-05-19T19:22:08.578027+00:00 DEBUG Task 'compute_**_task': task is already running.
2020-05-19T19:22:08.59477+00:00 INFO Task 'compute_**_task': finished task run for task with final state: 'Running'
2020-05-19T19:24:34.702197+00:00 ERROR Marked "Failed" by a Zombie Killer process.
2020-05-19T19:39:33.646426+00:00 INFO Rescheduled by a Lazarus process. This is attempt 1.
2020-05-19T19:39:56.331103+00:00 INFO Submitted for execution: Job prefect-job-21a433ec
2020-05-19T19:42:33.754737+00:00 INFO Beginning Flow run for 'compute_**_flow'
2020-05-19T19:42:33.869824+00:00 INFO Starting flow run.
...
Marwan Sarieddine
05/19/2020, 11:12 PMAttributeError: 'FunctionTask' object has no attribute 'result'
Wondering if you have encountered this before - One thing to note is if the flow is empty then it runs successfully โฆ I am using prefect version 0.10.7
19 May 2020,07:027 agent INFO Submitted for execution: Job prefect-job-d3fc6dc5
19 May 2020,07:02:12 prefect.CloudFlowRunner INFO Beginning Flow run for 'Static Dask Cluster'
19 May 2020,07:02:12 prefect.CloudFlowRunner INFO Starting flow run.
19 May 2020,07:02:12 prefect.CloudFlowRunner DEBUG Flow 'Static Dask Cluster': Handling state change from Scheduled to Running
19 May 2020,07:02:12 prefect.CloudFlowRunner ERROR Unexpected error: AttributeError("'FunctionTask' object has no attribute 'result'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 465, in get_flow_run_state
for t in final_tasks
File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 196, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1931, in gather
asynchronous=asynchronous,
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 347, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 331, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1790, in _gather
raise exception.with_traceback(traceback)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 569, in run_task
default_handler = task.result_handler or self.flow.result_handler
AttributeError: 'FunctionTask' object has no attribute 'result'
19 May 2020,07:02:12 prefect.CloudFlowRunner DEBUG Flow 'Static Dask Cluster': Handling state change from Running to Failed
Jacques Jamieson
05/20/2020, 3:49 AMAzuma
05/20/2020, 7:01 AMSandeep Aggarwal
05/20/2020, 1:47 PMTypeError: Object of type 'Parameter' is not JSON serializable
Below is a sample snippet:
with Flow("sample flow") as sample_flow:
param = Parameter("task_param")
FlowRunTask(flow_name="next flow", parameters={"task_param": param})()