Wolfgang Kerzendorf
03/06/2020, 5:49 PMManuel Aristarán
03/06/2020, 6:51 PMFlow
can have?Manuel Aristarán
03/06/2020, 6:53 PMprefect
seems to ignore the 3rd declared `Parameter`:
with Flow("Salesforce to Cassandra") as f:
tap_config = Parameter("tap_config")
tap_catalog = Parameter("tap_catalog")
target_config = Parameter("target_config")
salesforce_result = run_salesforce_tap(config=tap_config, catalog=tap_catalog)
result = run_cassandra_target(config=target_config, input=salesforce_result)
Manuel Aristarán
03/06/2020, 6:54 PM{<Parameter: tap_catalog>, <Parameter: tap_config>}
Manuel Aristarán
03/06/2020, 9:24 PMtask
return other task? My use case is creating a docker.CreateContainer
, but I need to build the command
parameter.Manuel Aristarán
03/06/2020, 11:48 PMChristian
03/08/2020, 4:17 PMBernhard
03/09/2020, 10:41 AMJohn Ramirez
03/09/2020, 4:37 PMJohn Ramirez
03/09/2020, 6:30 PMTraceback (most recent call last):
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 186, in _validate_json
json.dumps(value)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "main.py", line 176, in <module>
main()
File "main.py", line 163, in main
labels=[args.env],
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/core/flow.py", line 1412, in register
no_url=no_url,
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/client/client.py", line 616, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/core/flow.py", line 1209, in serialize
serialized = schema(exclude=["storage"]).dump(self)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 556, in dump
result = self._serialize(processed_obj, many=many)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 520, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 316, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 220, in _serialize
return super()._serialize(value, attr, obj, **kwargs)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 571, in _serialize
return schema.dump(nested_obj, many=many)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 556, in dump
result = self._serialize(processed_obj, many=many)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 516, in _serialize
for d in typing.cast(typing.Iterable[_T], obj)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 516, in <listcomp>
for d in typing.cast(typing.Iterable[_T], obj)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 520, in _serialize
value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 316, in serialize
return self._serialize(value, attr, obj, **kwargs)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 181, in _serialize
self._validate_json(value)
File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 188, in _validate_json
raise ValidationError("Value is not JSON-compatible")
marshmallow.exceptions.ValidationError: Value is not JSON-compatible
John Ramirez
03/09/2020, 6:30 PMLaura Lorenz (she/her)
03/09/2020, 10:09 PMArsenii
03/10/2020, 1:53 AMDaskExecutor
can be used for parallelization inside the flow between tasks, but what about parallelization between flows themselves? I see that there's DaskKubernetes
environment that spawns pods for flows, each with a temporary Dask cluster inside, which makes sense to me on the surface but Kubernetes is not currently an option for us.
Would setting up something like `FargateEnvironment`/`Fargate Agent` bring significant improvements compared to, say, regular DockerAgent
? If a flow is run as a Fargate Task
with a specified remote DaskExecutor
, where does it actually ""run"" the flow? Does it make more sense to have a dedicated remote Dask cluster somewhere, or start up a local one for each flow?
Thanks again for all the help!Mark McDonald
03/10/2020, 5:26 PMBraun Reyes
03/10/2020, 10:05 PM@task(trigger=<boolean based off value of parameter)
Braun Reyes
03/10/2020, 10:05 PMBraun Reyes
03/10/2020, 10:06 PMAmit Singh
03/11/2020, 11:40 AMJeff Brainerd
03/11/2020, 3:14 PMCab Maddux
03/11/2020, 6:18 PMflow.environment = KubernetesJobEnvironment(job_spec_file='...')
prior to flow.register()
and have the k8s agent use my job_spec_file
to create the job? My job spec file is basically just the example here: https://docs.prefect.io/cloud/execution/k8s_job_environment.html#examplesJohn Ramirez
03/11/2020, 7:40 PMswitch
and ifelse
Scott Zelenka
03/11/2020, 9:03 PMJeff Brainerd
03/11/2020, 11:08 PMMark Williams
03/12/2020, 9:30 PMThomas La Piana
03/13/2020, 9:56 AMbardovv
03/13/2020, 1:02 PMKostas Chalikias
03/13/2020, 1:10 PMScott Zelenka
03/13/2020, 2:29 PMcache
or checkpoint
, where you could specify how long to persist the output of a given step in the pipeline to disk. So that, when you re-run the entire pipeline locally when debugging, it would simply read in the cached data, rather than re-compute each step that previously completed successfully. It was also nice in production, as it allowed commonly used tasks to share their output with other pipelines without needing to be re-computed.
It seems Prefect has a concept of output caching, but only stores this in-memory for local runs when debugging.. which is useless for this use case of iterating on logic changes and re-running the entire pipeline again.
https://docs.prefect.io/core/concepts/persistence.html#output-caching
There's mention in this Slack channel to 'use Prefect Cloud', but I cannot find any tutorials or examples of how to accomplish this. So I'm looking for guidance.
How would you use cache
in Prefect Cloud to speed up the debugging iteration process of a local Flow?Nathan Molby
03/13/2020, 2:41 PMNathan Molby
03/13/2020, 2:57 PMNathan Molby
03/13/2020, 2:57 PMZachary Hughes
03/13/2020, 2:58 PMhttpx
, but if you prefer vanilla requests
you can run that in a threadpool!Nathan Molby
03/13/2020, 3:02 PMZachary Hughes
03/13/2020, 3:08 PM