jcozar
01/27/2021, 5:01 PMECSRun
run configuration for the run_config
argument in the Flow. If I use the env
argument to provide the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
it works! However, I don’t want to put my credentials in the source code. I am trying to use the task_definition_arn
argument, but I am not sure if it is the correct way, because the image of the task should be the Flow docker image. Can you give me any tip or advise? Thank you very much!Josh Greenhalgh
01/27/2021, 5:24 PMp1_res = extract_raw_from_api.map(
asset_location=asset_locations,
extract_date=unmapped(extract_date),
extract_period_days=unmapped(extract_period_days),
)
p2_res = process_2.map(p1_res)
p3_res = process_3.map(p2_res)
Then as soon as task 1 from p1_res is done then the corresponding task should start in process_2?
As it stands no process_2 tasks start until many of the extract_raw_from_api tasks are already complete...Felipe Saldana
01/27/2021, 5:31 PMSean Talia
01/27/2021, 5:40 PMDbtShellTask
to execute a dbt project under the hood, and am not supplying the necessary ENV vars that dbt
needs to run. If I have prefect run my flow that calls this shell task, the only failure message I see is the very last line of what dbt spits out, i.e.
[2021-01-27 16:34:13+0000] ERROR - prefect.run_dbt | Command failed with exit code 2
[2021-01-27 16:34:13+0000] ERROR - prefect.run_dbt | Could not run dbt
this is obviously not terribly informative, and I'd like for prefect to forward to me the entirety of the error message that dbt generates, which is:
Encountered an error while reading profiles:
ERROR Runtime Error
Compilation Error
Could not render {{ env_var('DB_USERNAME') }}: Env var required but not provided: 'DB_USERNAME'
Defined profiles:
- default
For more information on configuring profiles, please consult the dbt docs:
<https://docs.getdbt.com/docs/configure-your-profile>
Encountered an error:
Runtime Error
Could not run dbt
what kind of additional steps need to be taken in order for me to have prefect forward all the lines of output rather than just the final one? it's not clear to me where or how I would need to adjust the default prefect logger to set this upNikul
01/27/2021, 5:51 PMDaniel Black
01/27/2021, 6:42 PMMarco Petrazzoli
01/27/2021, 7:51 PMMatthew Blau
01/27/2021, 9:01 PMValueError: Your docker image failed to build! Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
The full traceback is
Step 12/22 : WORKDIR /app
---> Running in b820b907ef61
Removing intermediate container b820b907ef61
---> fee83981b109
Step 13/22 : RUN groupadd --gid ${GID} ${USERNAME} && useradd --uid ${UID} --gid ${GID} --shell /bin/bash -M ${USERNAME} && chown -R ${USERNAME} /app && chgrp -R ${USERNAME} /app
---> Running in 7f6c8b47c9e3
groupadd: invalid group ID 'rainbow'
Removing intermediate container 7f6c8b47c9e3
The command '/bin/sh -c groupadd --gid ${GID} ${USERNAME} && useradd --uid ${UID} --gid ${GID} --shell /bin/bash -M ${USERNAME} && chown -R ${USERNAME} /app && chgrp -R ${USERNAME} /app' returned a non-zero code: 3
Traceback (most recent call last):
File "integration.py", line 11, in <module>
from change_password import *
File "/home/mblau/projects/experian-integration/change_password.py", line 5, in <module>
from integration import modify_config
File "/home/mblau/projects/experian-integration/integration.py", line 338, in <module>
flow.register(project_name="test")
File "/home/mblau/.local/lib/python3.8/site-packages/prefect/core/flow.py", line 1665, in register
registered_flow = client.register(
File "/home/mblau/.local/lib/python3.8/site-packages/prefect/client/client.py", line 782, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/home/mblau/.local/lib/python3.8/site-packages/prefect/core/flow.py", line 1451, in serialize
storage = self.storage.build() # type: Optional[Storage]
File "/home/mblau/.local/lib/python3.8/site-packages/prefect/storage/docker.py", line 360, in build
self._build_image(push=push)
File "/home/mblau/.local/lib/python3.8/site-packages/prefect/storage/docker.py", line 427, in _build_image
raise ValueError(
ValueError: Your docker image failed to build! Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
what am I doing wrong?matta
01/27/2021, 10:03 PMPeyton Runyan
01/27/2021, 10:36 PMprefect server start
. I'll post the logs in the thread.Marco Petrazzoli
01/27/2021, 11:10 PMMarco Petrazzoli
01/27/2021, 11:12 PMdef say_hello():
container = CreateContainer(
image_name="nonexistentname",
)
StartContainer(container_id=container, docker_server_url="arandomtext")
WaitOnContainer(container_id=container)
and it's always successful. Even with correct data i don't see my image executedArnulfo Soriano
01/27/2021, 11:26 PMFina Silva-Santisteban
01/27/2021, 11:55 PMdocker-compose build app
, then docker-compose run app
.
• Prefect Agent runs flows. The Local Agent runs flows with the local setup. This is not feasible for production since we’d need to manually make sure that whichever machine is hosting the flow has all dependencies installed. Better: run flows within a docker container. This way we don’t have to worry about dependencies, since every machine will simply be running the container. To run flows in containers we need to use the Docker Agent.
• The Docker Agent DOES NOT REQUIRE DOCKER STORAGE, but can work with Docker Storage if desired. The Docker Agent can run with a locally available image (image="example/image-name:with-tag"). If Docker Storage is used, you can provide a url (<http://registry_url|registry_url>
)to where your image is hosted, e.g. Docker Hub, and that url is used to get the image.
• I should be able to run the Docker Agent with the image I had previously created with docker-compose. (It doesn’t seem to be able to find the image, even though I specify the absolute path. )Martin Shin
01/28/2021, 2:20 AMThis tenant already has the maximum number of flows
. Don't' see there's any mention of upper limit on flows? Any help will be appreciated!Marco Petrazzoli
01/28/2021, 10:03 AMHenrik Väisänen
01/28/2021, 11:39 AMNikul
01/28/2021, 12:24 PM@task
def get_mean():
import dask.array as da
array = da.ones((1000, 1000, 1000))
return array.mean().compute()
@task
def output_value(value):
print(value)
with Flow("Static Dask Cluster Example") as flow:
res = get_mean()
output_value(res)
flow.run_config = KubernetesRun(labels=["dev"], image="my-image")
def get_cluster(n):
pod_spec = make_pod_spec(image="my-image",
memory_limit='1G', memory_request='1G',
cpu_limit=1, cpu_request=1)
return KubeCluster(pod_spec, n_workers=n)
flow.executor = DaskExecutor(cluster_class=get_cluster, cluster_kwargs={'n': 2})
flow.storage = GCS(bucket="my-bucket")
flow.register(project_name="tutorial")
However, I get a strange error:
[2021-01-28 12:11:41+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: AttributeError("can't set attribute")
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/opt/conda/lib/python3.8/site-packages/prefect/executors/dask.py", line 211, in start
with self.cluster_class(**self.cluster_kwargs) as cluster: # type: ignore
File "k8_flow.py", line 33, in get_cluster
File "/opt/conda/lib/python3.8/site-packages/dask_kubernetes/core.py", line 414, in __init__
super().__init__(**self.kwargs)
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 274, in __init__
super().__init__(
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 69, in __init__
self.name = str(uuid.uuid4())[:8]
AttributeError: can't set attribute
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 651, in close_clusters
if cluster.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
Exception ignored in: <function Cluster.__del__ at 0x7f99f6b89e50>
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 113, in __del__
if self.status != Status.closed:
AttributeError: 'KubeCluster' object has no attribute 'status'
This is running on a KubernetesAgent on GKE with the RBAC permissions mentioned in the documentation.
The docker images I'm using are running dask-kubernetes==0.11.0, distributed==2020.12.0, prefect==0.14.5.
Thank you in advanceale
01/28/2021, 2:07 PMMaurits de Ruiter
01/28/2021, 2:09 PMMatthew Blau
01/28/2021, 2:42 PMUnexpected error: ValueError('Could not infer an active Flow context.')
and I am struggling to find relevant information on how best to debug this. I thank you all in advance for helping!Marc Lipoff
01/28/2021, 2:54 PMPeter Roelants
01/28/2021, 3:14 PMany_failed
?
I'm not sure which is the most idiomatic way of dealing with this failure, since updating a database or sending a message to Kafka itself is prone to failure (and "negative engineering" to deal with that failure). Does Prefect have an opinion/guidelines how to deal with this?Levi Leal
01/28/2021, 3:17 PMManuel Mourato
01/28/2021, 5:04 PMBK Lau
01/28/2021, 5:05 PMmodel
that one can export out from Prefect? I intended to use it to auto generate Python code from itSean Talia
01/28/2021, 5:30 PMtask_definition_arn
, task_definition_path
, task_definition
) to the flow considered to be canonical? i'm trying to weigh the pros and cons of having some other process like terraform handle management of the task definition (thereby only leaving the ARN to be supplied to prefect) vs. using prefect to handle thatJosh Greenhalgh
01/28/2021, 7:09 PMraaid
01/28/2021, 9:39 PMFelipe Saldana
01/28/2021, 9:57 PMUnexpected error: TypeError("Object of type 'TestTask' is not JSON serializable",)
TestTask is a custom task type I created and the run method returns a custom return type I created as well.