Marvin
09/14/2020, 3:34 AMRizky Eko Putra
09/14/2020, 6:39 AMMatias Godoy
09/14/2020, 7:01 AMMarvin
09/14/2020, 7:01 AMVitaly Shulgin
09/14/2020, 7:07 AMJoe Lyman
09/14/2020, 8:28 AMMarvin
09/14/2020, 8:28 AMDaniel Manson
09/14/2020, 9:13 AMMarvin
09/14/2020, 9:13 AMRichard Hughes
09/14/2020, 11:56 AMMarvin
09/14/2020, 11:56 AMShaun Cutts
09/14/2020, 1:23 PMvalue
as a property which loads and caches the object when called.
However, the first thing I thought of is that I should use an S3Result
. But this doesn’t seem to be the right sort of object, as it just takes a bucket, not a key. It seems more of a “result storage location” than a Result
to me. I’m wondering why it has an isa
relation to Result
. Maybe I’m not understanding the object hierarchy properly, and/or I should be trying to achieve my goal differently? [Update had overlooked that base Result
takes location
. Still S3Result
has no ability to load results on demand, except via read
, which takes a new location.]
A related question is how I should trigger an already-created flow, that lives on the prefect server, with a new runner? Client.create_flow_run
allows me to create a new run, but not a dictionary of initial states. It would seem that I am abusing flows a little… Perhaps I need to create initial tasks that take a serialized specification for where their “real” inputs live, that can be passed in parameters. Thinking about it, this seems more the intended way I should be doing things, but it also feels that I am duplicating what Prefect is doing somehow. Guidance on what the best practice should be would be greatly appreciated.Marko Herkaliuk
09/14/2020, 2:35 PMMarvin
09/14/2020, 2:35 PMManuel Mourato
09/14/2020, 2:58 PMCould not infer an active Flow context.
1) I have task1, which outputs a result that is a list , say : ["a","z"]
2) Now, I have a task2, that depends on the result of task1, but should only receive the first element of the result list as an input
Example:
@task
def replace_str(x):
return x.replace("a","A")
@task
def create_str_list():
return ["a","b","c"]
flow= Flow("test")
flow.set_dependencies(create_str_list)
--> THIS IS WHAT I WANT TO DO
flow.set_dependencies(replace_str, keyword_tasks={"x": create_str_list[0]})
Is this possible? Thank youMaxwell Dylla
09/14/2020, 6:33 PMMarvin
09/14/2020, 6:33 PMTom Augspurger
09/14/2020, 8:02 PM$ prefect run flow --name=terraclimate --project=pangeo-forge
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/cli/run.py", line 125, in flow
no_url=no_url,
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/cli/run.py", line 377, in _run_flow
flow_id=flow_id, parameters={**file_params, **string_params}, run_name=run_name
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/client/client.py", line 964, in create_flow_run
res = self.graphql(create_mutation, variables=dict(input=inputs))
File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
https://prefect-community.slack.com/archives/CL09KU1K7/p1596043664390000 looks similar, and seemed to resolve itself.
Any debugging suggestions?Chandra Manginipalli
09/14/2020, 8:10 PMsark
09/15/2020, 3:29 AMFailed to load and execute Flow's environment: ValueError('unsupported pickle protocol: 5',)
prefect and cloudpickle versions of local and remote are the same
$ pipenv graph
...
prefect==0.13.6
- click [required: >=7.0, installed: 7.1.2]
- cloudpickle [required: >=0.6.0, installed: 1.6.0]
...
(prefect-env) [prefect@ghpr-dev-prefect-usc1-a-1 ~]$ prefect version
0.13.6
(prefect-env) [prefect@ghpr-dev-prefect-usc1-a-1 ~]$ pip show cloudpickle
Name: cloudpickle
Version: 1.6.0
Summary: Extended pickling support for Python objects
Home-page: <https://github.com/cloudpipe/cloudpickle>
Author: Cloudpipe
Author-email: <mailto:cloudpipe@googlegroups.com|cloudpipe@googlegroups.com>
License: BSD 3-Clause License
Location: /opt/prefect/prefect-env/lib/python3.6/site-packages
Requires:
Erick Rodriguez
09/15/2020, 3:49 AMMarvin
09/15/2020, 3:49 AMsark
09/15/2020, 3:58 AMFailed to load and execute Flow's environment: TypeError('an integer is required (got type bytes)')
sark
09/15/2020, 4:25 AMUnexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
httplib_response = self._make_request(
File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/lib/python3.8/http/client.py", line 1255, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.8/http/client.py", line 1301, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.8/http/client.py", line 1250, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.8/http/client.py", line 1010, in _send_output
self.send(msg)
File "/usr/local/lib/python3.8/http/client.py", line 950, in send
self.connect()
File "/usr/local/lib/python3.8/site-packages/docker/transport/unixconn.py", line 43, in connect
sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory
i’m using GCS storage with LocalEnvironmentEugene Smal
09/15/2020, 7:04 AMMarvin
09/15/2020, 7:04 AMsark
09/15/2020, 7:15 AMdef container(p):
return CreateContainer(image_name="image", command=f"--p={p}")
start = StartContainer()
with Flow("flow") as flow:
p = Parameter('param', required=True)
c = container(p)
start_container = start(container_id=c)
i get <Parameter: p>
interpolated for p
instead of the actual value for p
but if i make container
a @task
it also gives weird errors i think because CreateContainer
is itself already a task?Rob Fowler
09/15/2020, 8:41 AMunmapped
for the constant parameters, MIND BLOWN!Rodrigo Neves
09/15/2020, 9:50 AMMarvin
09/15/2020, 9:50 AM