Daniel Lomartra
10/22/2024, 6:14 PMDylan
Daniel Lomartra
10/22/2024, 6:56 PM2.19.8
. Result of running the deployment in the UI is attached as image.
Python file for flow tasks:
from prefect import flow, get_run_logger, deploy
from prefect.deployments import DeploymentImage
from prefect import task
@task(log_prints=True)
def child_task():
print("I am a child task")
@task(log_prints=True)
def parent_task():
print("I am a parent task")
return child_task()
@flow()
def test_flow_nested_task_mre():
return parent_task()
if __name__ == "__main__":
deployment_container_image = DeploymentImage(
name="{registry remove for confidentiality}/prefect-test-flow-nested-task-mre",
tag="latest",
dockerfile="Dockerfile",
platform="linux/amd64",
)
test_flow_nested_task_mre.deploy(
name=f"test-flow-nested-task-mre-primary",
work_pool_name="default",
image=deployment_container_image,
)
Dockerfile:
FROM prefecthq/prefect:2.20.10-python3.11
COPY requirements.txt /opt/prefect/prefect/requirements.txt
# Hotfix for oscrypto openssl bug (<https://community.snowflake.com/s/question/0D5Do000010NAVzKAO/hi-snowflake-teami-am-facing-issues-while-connecting-to-snowflake-using-snowflake-connector-with-oscryptoerrorslibrarynotfounderror-error-detecting-the-version-of-libcrypto>)
RUN python -m pip install oscrypto@git+https://github.com/wbond/oscrypto.git@d5f3437ed24257895ae1edd9e503cfb352e635a8
RUN python -m pip install -v -r /opt/prefect/prefect/requirements.txt
COPY . /opt/prefect/prefect/
WORKDIR /opt/prefect/prefect/
Requirements.txt
prefect>=2.17.1,<3.0.0
prefect-dbt>=0.4.1,<1.0.0
prefect-snowflake>=0.27.3,<1.0.0
dbt-core>=1.8.6,<2.0.0
dbt-snowflake>=1.8.3,<2.0.0
pygit2>=1.14.1,<2.0.0
azure-identity>=1.16.0,<2.0.0
azure-keyvault-secrets>=4.8.0,<5.0.0
msal>=1.28.0,<2.0.0
snowflake-connector-python>=3.8.1,<4.0.0
html-to-json>=2.0.0,<3.0.0
cryptography>=42.0.5,<43.0.0
black>=24.4.0,<25.0.0
Result in the UI:Daniel Lomartra
10/22/2024, 7:27 PM2.20.10
and still got the same resultDylan
Dylan
Dylan
Dylan
Dylan
Dylan
rom prefect import flow, task
@task(log_prints=True)
def child_task():
print("I am a child task")
return 1
@task(log_prints=True)
def parent_task():
print("I am a parent task")
result = child_task()
print(result)
return result
@flow()
def test_flow_nested_task_mre():
result = parent_task()
return result
if __name__ == "__main__":
test_flow_nested_task_mre()
Ilse Van Nieuwenhuyse
10/29/2024, 4:29 PMDaniel Lomartra
11/04/2024, 3:33 PMDylan
Daniel Lomartra
11/05/2024, 6:19 PMtest_arg_2
the UI renders it as a nested task again.
from prefect import flow, task, deploy
from prefect.docker.docker_image import DockerImage
@task(log_prints=True)
def child_task():
print("I am a child task")
return 1
@task(log_prints=True)
def parent_task(test_arg, test_arg_2):
print("I am a parent task")
result = child_task()
print(result)
return result
@flow()
def test_flow_nested_task_mre():
result = parent_task(
test_arg="hello",
test_arg_2="world",
)
return result
if __name__ == "__main__":
custom_docker_image = DockerImage(
name="[REDACTED]/test-nested-tasks-mre",
tag="latest",
dockerfile="Dockerfile",
)
flow_deployment = test_flow_nested_task_mre.to_deployment(
name=f"test-nested-tasks-mre",
)
deploy(
flow_deployment,
work_pool_name="kubernetes",
image=custom_docker_image,
)
Daniel Lomartra
11/05/2024, 6:21 PMDylan
Daniel Lomartra
11/06/2024, 5:07 PMDylan
Janet Carson
11/08/2024, 1:00 AM