Sukanta Roy
08/08/2024, 6:18 PMMarvin
08/08/2024, 6:18 PMprefect.yamlprefect.yamlprefect.yamlyaml
name: async-flow-deployment
prefect-version: 2.13.1
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/flow
deployments:
  - name: async_flow_deploy
    entrypoint: path/to/your/flow.py:your_async_flow_function
    work_pool:
      name: your-work-poolprefectsh
prefect deployprefect.yamlSukanta Roy
08/08/2024, 6:23 PMMarvin
08/08/2024, 6:23 PMpython
from prefect import flow, task
@task
async def async_task():
    return 1
@flow
async def async_flow():
    task_result = await async_task()
    return task_result + 1
if __name__ == "__main__":
    # A quick way to run your async flow
    import asyncio
    asyncio.run(async_flow())prefect.yamlyaml
name: async-flow-deployment
prefect-version: 2.13.1
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/flow
deployments:
  - name: async_flow_deploy
    entrypoint: path/to/your/flow.py:async_flow
    work_pool:
      name: your-work-poolsh
prefect deployasyncio.run()prefect deploySukanta Roy
08/08/2024, 6:31 PMMarvin
08/08/2024, 6:31 PMprefect.yamlpython
from prefect import flow, task
@task
async def async_task(x):
    return x
@flow
async def async_flow(x: int):
    task_result = await async_task(x)
    return task_result + 1
if __name__ == "__main__":
    import asyncio
    asyncio.run(async_flow(5))  # Example parameter valueprefect.yamlyaml
name: async-flow-deployment
prefect-version: 2.13.1
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/flow
deployments:
  - name: async_flow_deploy
    entrypoint: path/to/your/flow.py:async_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolsh
prefect deploySukanta Roy
08/08/2024, 7:11 PMMarvin
08/08/2024, 7:11 PMSukanta Roy
08/08/2024, 7:41 PMMarvin
08/08/2024, 7:41 PMsubflow.submit()prefect.yamlpython
from prefect import flow, task
@task
async def async_task(x):
    return x
@flow(name="subflow")
async def subflow(x: int):
    task_result = await async_task(x)
    return task_result + 1
@flow(name="main_flow")
async def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        subflow_runs.append(subflow.submit(x + i))
    results = await subflow_runs
    return results
if __name__ == "__main__":
    import asyncio
    asyncio.run(main_flow(5))prefect.yamlyaml
name: main-flow-deployment
prefect-version: 2.13.1
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/main_flow
deployments:
  - name: main_flow_deploy
    entrypoint: path/to/your/main_flow.py:main_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolprefect.yamlyaml
name: subflow-deployment
prefect-version: 2.13.1
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/subflow
deployments:
  - name: subflow_deploy
    entrypoint: path/to/your/subflow.py:subflow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolsh
prefect deploy -n main_flow_deploy
prefect deploy -n subflow_deploySukanta Roy
08/08/2024, 7:47 PMMarvin
08/08/2024, 7:48 PMsubmitrun_deploymentrun_deploymentrun_deploymentprefect.yamlprefect.yamlpython
from prefect import flow, task
@task
async def async_task(x):
    return x
@flow(name="subflow")
async def subflow(x: int):
    task_result = await async_task(x)
    return task_result + 1
if __name__ == "__main__":
    import asyncio
    asyncio.run(subflow(5))python
from prefect import flow
from prefect.deployments import run_deployment
@flow(name="main_flow")
def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        subflow_run = run_deployment(name="subflow-deployment", parameters={"x": x + i})
        subflow_runs.append(subflow_run)
    return subflow_runs
if __name__ == "__main__":
    main_flow(5)prefect.yamlyaml
name: main-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/main_flow
deployments:
  - name: main_flow_deploy
    entrypoint: path/to/your/main_flow.py:main_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolprefect.yamlyaml
name: subflow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/subflow
deployments:
  - name: subflow_deploy
    entrypoint: path/to/your/subflow.py:subflow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolsh
prefect deploy -n main_flow_deploy
prefect deploy -n subflow_deployrun_deploymentSukanta Roy
08/09/2024, 6:31 AMMarvin
08/09/2024, 6:31 AMmain_flowrun_deploymentpython
from prefect import flow
from prefect.deployments import run_deployment
@flow(name="main_flow")
def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        # This triggers the subflow asynchronously and immediately returns
        subflow_run = run_deployment(name="subflow-deployment", parameters={"x": x + i})
        subflow_runs.append(subflow_run)
    return subflow_runs
if __name__ == "__main__":
    main_flow(5)run_deploymentpython
from prefect import flow
from prefect.deployments import run_deployment
import time
@flow(name="main_flow")
def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        # Trigger the subflow
        subflow_run = run_deployment(name="subflow-deployment", parameters={"x": x + i})
        
        # Polling mechanism to wait for the subflow to complete
        while subflow_run.state.name not in ["Completed", "Failed", "Cancelled"]:
            time.sleep(10)  # Poll every 10 seconds
            subflow_run.refresh()
        
        subflow_runs.append(subflow_run)
    return subflow_runs
if __name__ == "__main__":
    main_flow(5)time.sleepsubflow_run.refreshmain_flowSukanta Roy
08/09/2024, 6:38 AMrun_deploymentMarvin
08/09/2024, 6:38 AMrun_deploymentprefect.yamlsubflow.pypython
from prefect import flow, task
@task
async def async_task(x):
    return x
@flow
async def subflow(x: int):
    task_result = await async_task(x)
    return task_result + 1
if __name__ == "__main__":
    import asyncio
    asyncio.run(subflow(5))subflow/prefect.yamlyaml
name: subflow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/your/subflow
deployments:
  - name: subflow_deploy  # This is the deployment name
    entrypoint: path/to/your/subflow.py:subflow
    parameters:
      x: 5
    work_pool:
      name: your-work-poolrun_deploymentmain_flow.pypython
from prefect import flow
from prefect.deployments import run_deployment
@flow
def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        # Use the deployment name specified in the subflow's prefect.yaml
        subflow_run = run_deployment(name="subflow_deploy", parameters={"x": x + i})
        subflow_runs.append(subflow_run)
    return subflow_runs
if __name__ == "__main__":
    main_flow(5)"subflow_deploy"run_deploymentsubflow/prefect.yamlnamerun_deploymentprefect.yamlrun_deploymentSukanta Roy
08/09/2024, 7:24 AMrun_deploymentrun_deploymentEncountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1780, in read_deployment_by_name
    response = await self._client.get(f"/deployments/name/{name}")
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1801, in get
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 344, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 167, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect-server-2.dev-prefect.svc.cluster.local:4200/api/deployments/name/prefect-poc>'
Response: {'detail': 'Not Found'}Marvin
08/09/2024, 7:24 AMHow doesI did usework? Do you need prefect cloud for that or self hosted prefect server can also be used?run_deployment
run_deploymentSukanta Roy
08/09/2024, 7:25 AMrun_deploymentrun_deploymentFile "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1780, in read_deployment_by_name
    response = await self._client.get(f"/deployments/name/{name}")
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1801, in get
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 344, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 167, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect-server-2.dev-prefect.svc.cluster.local:4200/api/deployments/name/prefect-poc>'
Response: {'detail': 'Not Found'}Marvin
08/09/2024, 7:25 AMHow doesI did usework? Do you need prefect cloud for that or self hosted prefect server can also be used?run_deployment
run_deploymentSukanta Roy
08/09/2024, 7:26 AMrun_deploymentprefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect-server-local:4200/api/deployments/name/prefect-poc>'
Response: {'detail': 'Not Found'}Marvin
08/09/2024, 7:27 AMrun_deploymentrun_deploymentrun_deploymentprefect.yamlPREFECT_API_URLsh
export PREFECT_API_URL="<http://prefect-server-local:4200/api>"sh
prefect deployment lssh
export PREFECT_API_URL="<http://prefect-server-local:4200/api>"sh
echo $PREFECT_API_URLsh
prefect deploy -n subflow_deploy
prefect deploy -n main_flow_deployrun_deploymentpython
from prefect import flow
from prefect.deployments import run_deployment
@flow
def main_flow(x: int):
    subflow_runs = []
    for i in range(3):
        subflow_run = run_deployment(name="subflow_deploy", parameters={"x": x + i})
        subflow_runs.append(subflow_run)
    return subflow_runs
if __name__ == "__main__":
    main_flow(5)namerun_deploymentprefect.yamlPREFECT_API_URLrun_deploymentprefect.yamlprefect deployment ls404 Not FoundSukanta Roy
08/09/2024, 7:37 AMMarvin
08/09/2024, 7:37 AMflow_run_namepython
from prefect import flow
@flow(flow_run_name="custom-static-name")
def my_flow(name: str):
    print(name)
if __name__ == "__main__":
    my_flow(name="Hello, Marvin!")python
import datetime
from prefect import flow
def generate_flow_run_name():
    date = datetime.datetime.now(datetime.timezone.utc)
    return f"{date:%A}-is-a-nice-day"
@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str):
    print(name)
if __name__ == "__main__":
    my_flow(name="Hello, Marvin!")python
from prefect import flow
from prefect.runtime import flow_run
def generate_flow_run_name():
    flow_name = flow_run.flow_name
    parameters = flow_run.parameters
    name = parameters["name"]
    return f"{flow_name}-run-with-{name}"
@flow(flow_run_name=generate_flow_run_name)
def my_flow(name: str):
    print(name)
if __name__ == "__main__":
    my_flow(name="Marvin")flow_runflow_run_nameflow_runflow_run_nameSukanta Roy
08/09/2024, 12:36 PMMarvin
08/09/2024, 12:36 PMprefect.yamlprefect_docker.deployments.steps.build_docker_imageprefect.yamlrequirements.txtprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.run_shell_script:
      id: get-commit-hash
      script: git rev-parse --short HEAD
      stream_output: false
  - prefect_docker.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: "{{ get-commit-hash.stdout }}"
      dockerfile: path/to/your/Dockerfile
      build_args:
        REQUIREMENTS_FILE: "path/to/your/requirements.txt"
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-poolrun_shell_scriptbuild_docker_imageimage_nametagdockerfilebuild_argsREQUIREMENTS_FILEREQUIREMENTS_FILEDockerfile
ARG REQUIREMENTS_FILE
FROM python:3.9-slim
# Use the build argument
COPY ${REQUIREMENTS_FILE} /app/requirements.txt
WORKDIR /app
RUN pip install -r requirements.txt
COPY . /app
CMD ["python", "-m", "your_main_module"]prefect.yamlprefect_docker.deployments.steps.build_docker_imagebuild_argsbuild_docker_imageSukanta Roy
08/19/2024, 10:47 AMMarvin
08/19/2024, 10:48 AMprefect.yamlprefect.yamlprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-pool
      job_variables:
        resources:
          limits:
            memory: "512Mi"
            cpu: "0.5"
          requests:
            memory: "256Mi"
            cpu: "0.25"work_pooljob_variablesresourceslimitsrequestsprefect.yamlyaml
...
work_pool:
  name: your-k8s-work-pool
  job_variables:
    resources:
      limits:
        memory: "512Mi"
        cpu: "0.5"
      requests:
        memory: "256Mi"
        cpu: "0.25"
...requirements.txtDockerfile
ARG REQUIREMENTS_FILE
FROM python:3.9-slim
# Use the build argument
COPY ${REQUIREMENTS_FILE} /app/requirements.txt
WORKDIR /app
RUN pip install -r requirements.txt
COPY . /app
CMD ["python", "-m", "your_main_module"]prefect.yamlSukanta Roy
08/19/2024, 11:31 AMMarvin
08/19/2024, 11:32 AMjob_variablesprefect.yamlprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: your-work-pool
      job_variables:
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.25"
          limits:
            memory: "512Mi"
            cpu: "0.5"work_pooljob_variablesresourcesrequestslimitsprefect.yamlSukanta Roy
08/20/2024, 6:32 AMdefaultMarvin
08/20/2024, 6:33 AMprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: custom-work-pool
      work_queue_name: high-memory-tasks  # Specify custom work queue
      job_variables:
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.25"
          limits:
            memory: "512Mi"
            cpu: "0.5"Sukanta Roy
08/20/2024, 6:44 AMSukanta Roy
08/20/2024, 11:05 AMMarvin
08/20/2024, 11:05 AMprefect.yamlyaml
name: high-memory-job
prefect-version: 2.16.9
deployments:
  - name: high_memory_deploy
    entrypoint: path/to/flow.py:high_memory_flow
    work_pool:
      name: custom-work-pool
      work_queue_name: high-memory-tasks
    job_variables:
      resources:
        requests:
          memory: "512Mi"
          cpu: "0.5"
        limits:
          memory: "1024Mi"
          cpu: "1"yaml
name: high-cpu-job
prefect-version: 2.16.9
deployments:
  - name: high_cpu_deploy
    entrypoint: path/to/flow.py:high_cpu_flow
    work_pool:
      name: custom-work-pool
      work_queue_name: high-cpu-tasks
    job_variables:
      resources:
        requests:
          memory: "256Mi"
          cpu: "1"
        limits:
          memory: "512Mi"
          cpu: "2"sh
prefect worker start -p custom-work-pool -q high-memory-taskssh
prefect worker start -p custom-work-pool -q high-cpu-tasksSukanta Roy
08/22/2024, 10:34 AMMarvin
08/22/2024, 10:35 AMsh
   pip install prefect-daskpython
   from prefect import flow, task
   from prefect_dask import DaskTaskRunner
   @task
   def process_data(x):
       return x * 2
   @flow(task_runner=DaskTaskRunner)
   def my_flow(data):
       results = process_data.map(data)
       return results
   if __name__ == "__main__":
       data = list(range(10))
       my_flow(data)python
   from dask.distributed import Client
   client = Client("<tcp://scheduler-address:port>")
   @flow(task_runner=DaskTaskRunner(cluster=client))
   def my_flow(data):
       results = process_data.map(data)
       return resultssh
   pip install prefect-raypython
   from prefect import flow, task
   from prefect_ray import RayTaskRunner
   @task
   def process_data(x):
       return x * 2
   @flow(task_runner=RayTaskRunner)
   def my_flow(data):
       results = process_data.map(data)
       return results
   if __name__ == "__main__":
       data = list(range(10))
       my_flow(data)python
   import ray
   ray.init(address="auto")
   @flow(task_runner=RayTaskRunner(address="auto"))
   def my_flow(data):
       results = process_data.map(data)
       return resultsSukanta Roy
09/03/2024, 6:32 AMEncountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/app/src/google_ads/main.py", line 81, in task_read_stream_data
    run_deployment(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper
    return call()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 78, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments/deployments.py", line 217, in run_deployment
    flow_run = await client.read_flow_run(flow_run_id)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2000, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1814, in get
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1585, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 344, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 167, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://prefect-server-2.dev-prefect.svc.cluster.local:4200/api/flow_runs/2e0663bf-be5e-48c1-9f6c-f919e4cb156c>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>Marvin
09/03/2024, 6:32 AMHey , In which cases prefect server can raise this error within a flow run?```Encountered exception during execution: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2148, in orchestrate_task_run result = await call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) File "/app/src/google_ads/main.py", line 81, in task_read_stream_data run_deployment( File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper return call() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 432, in call return self.result() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) File "/usr/local
Sukanta Roy
09/03/2024, 6:33 AMEncountered exception during execution:
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://prefect-server-2.dev-prefect.svc.cluster.local:4200/api/flow_runs/2e0663bf-be5e-48c1-9f6c-f919e4cb156c>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>Marvin
09/03/2024, 6:33 AM500 Internal Server Errorpython
from prefect import flow, task
from prefect.exceptions import PrefectHTTPStatusError
@task(retries=0)
def failing_task():
    raise ValueError("Simulated task failure")
@flow()
def my_flow():
    try:
        failing_task()
    except PrefectHTTPStatusError as e:
        print(f"Prefect error occurred: {e}")
if __name__ == "__main__":
    my_flow()retry_delay_secondsSukanta Roy
09/03/2024, 6:35 AMMarvin
09/03/2024, 6:36 AMprefect.yamljob_variablesprefect.yamlprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: custom-work-pool
      work_queue_name: custom-work-queue
      job_variables:
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.25"
          limits:
            memory: "512Mi"
            cpu: "0.5"work_poolnamework_queue_namejob_variablesresourcesrequestsmemorycpulimitsmemorycpush
prefect deploy -q custom-work-queue -p custom-work-poolsh
kubectl describe pod <pod-name>requestslimitsprefect.yamlresourcesjob_variablesprefect.yamlSukanta Roy
09/03/2024, 6:39 AMMarvin
09/03/2024, 6:39 AMprefect.yamljob_variablesprefect.yamlprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: custom-work-pool
      work_queue_name: custom-work-queue
      job_variables:
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.25"
          limits:
            memory: "512Mi"
            cpu: "0.5"sh
   prefect deploy -q custom-work-queue -p custom-work-poolsh
   prefect worker start -p custom-work-pool -q custom-work-queuesh
   kubectl describe pod <pod-name>requestslimitsjob_variablesjob_variablespython
from prefect import flow, task
from prefect.infrastructure.kubernetes import KubernetesJob
k8s_job = KubernetesJob(
    image="my-repo/my-image:latest",
    job_cpu_request="0.25",
    job_cpu_limit="0.5",
    job_memory_request="256Mi",
    job_memory_limit="512Mi"
)
@task
def my_task(x):
    return x * 2
@flow(infrastructure=k8s_job)
def my_flow(x):
    result = my_task(x)
    return result
if __name__ == "__main__":
    my_flow(5)Sukanta Roy
09/03/2024, 10:19 AMError: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "prefect": executable file not found in $PATH: unknownMarvin
09/03/2024, 10:20 AMprefect$PATHDockerfile
   FROM python:3.9-slim
   # Install Prefect
   RUN pip install prefect==2.16.9
   # Copy your flow code
   COPY . /app
   WORKDIR /app
   # Set the entrypoint to Prefect
   ENTRYPOINT ["prefect"]sh
   docker build -t my-repo/my-image:latest .
   docker push my-repo/my-image:latestprefect.yamlyaml
   name: my-flow-deployment
   prefect-version: 2.16.9
   build:
     - prefect.deployments.steps.build_docker_image:
         requires: prefect-docker>=0.4.0
         image_name: my-repo/my-image
         tag: latest
         dockerfile: path/to/your/Dockerfile
         push: true
   deployments:
     - name: my_flow_deploy
       entrypoint: path/to/your/flow.py:my_flow
       parameters:
         x: 5  # Example parameter value
       work_pool:
         name: custom-work-pool
         work_queue_name: custom-work-queue
         job_variables:
           image: my-repo/my-image:latest
           command: "python -m prefect.engine"job_variablesprefect.yamlyaml
   job_variables:
     command: "prefect run flow --name my_flow"sh
   docker run -it --rm my-repo/my-image:latest prefect --helpsh
   kubectl logs <pod-name>sh
   kubectl describe pod <pod-name>Dockerfile
FROM python:3.9-slim
# Install Prefect
RUN pip install prefect==2.16.9
# Copy your flow code
COPY . /app
WORKDIR /app
# Set the entrypoint to Prefect
ENTRYPOINT ["prefect"]prefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    work_pool:
      name: custom-work-pool
      work_queue_name: custom-work-queue
      job_variables:
        image: my-repo/my-image:latest
        command: "prefect flow run --name my_flow"Sukanta Roy
09/04/2024, 8:34 AMprefect deploy --prefect-file ... --allMarvin
09/04/2024, 8:34 AMprefect deployprefect deployprefect deploysh
   deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
   echo "$deploy_output"deploy_outputsh
   deployment_name=$(echo "$deploy_output" | grep -oP "Deployment name: \K\S+")curlsh
   deployment_info=$(curl -X GET "http://<prefect-server-url>/api/deployments/name/$deployment_name")
   deployment_id=$(echo "$deployment_info" | jq -r '.id')
   flow_id=$(echo "$deployment_info" | jq -r '.flow_id')sh
   curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
     -d '{
       "flow_id": "'"$flow_id"'",
       "deployment_id": "'"$deployment_id"'"
     }'sh
#!/bin/bash
# Deploy the flow and capture the output
deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
echo "$deploy_output"
# Extract the deployment name from the output
deployment_name=$(echo "$deploy_output" | grep -oP "Deployment name: \K\S+")
# Query Prefect Server API for deployment information
deployment_info=$(curl -X GET "http://<prefect-server-url>/api/deployments/name/$deployment_name")
deployment_id=$(echo "$deployment_info" | jq -r '.id')
flow_id=$(echo "$deployment_info" | jq -r '.flow_id')
# Register flow and deployment IDs via API
curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
  -d '{
    "flow_id": "'"$flow_id"'",
    "deployment_id": "'"$deployment_id"'"
  }'<prefect-server-url><metadata-api-url>jqapt-get install jqyum install jqbrew install jqSukanta Roy
09/04/2024, 9:57 AMDeployment name:Marvin
09/04/2024, 9:58 AMprefect deployDeployment name:prefect deployprefect deployment lsprefect deploysh
#!/bin/bash
# Assuming you have jq installed for JSON parsing
set -e
# Step 1: Deploy the flow and capture the output
deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
echo "$deploy_output"
# Step 2: Use the Prefect CLI to list deployments
# Filter the deployments that match the deployed flow name or recent creation time
deployment_list=$(prefect deployment ls --json)
# Step 3: Parse the deployment list to find the relevant deployment IDs
# Assuming you have specific details to filter by (e.g., flow name)
flow_name="my-flow-name"
# Use jq to filter deployments for the specific flow name
deployment_info=$(echo "$deployment_list" | jq -c --arg flow_name "$flow_name" '[.[] | select(.flow_name == $flow_name)]')
# Select the latest deployment (if multiple, adjust filters if needed)
deployment_id=$(echo "$deployment_info" | jq -r '.[-1].id')
flow_id=$(echo "$deployment_info" | jq -r '.[-1].flow_id')
# Step 4: Register flow and deployment IDs via API
curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
  -d '{
    "flow_id": "'"$flow_id"'",
    "deployment_id": "'"$deployment_id"'"
  }'sh
   deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
   echo "$deploy_output"sh
   deployment_list=$(prefect deployment ls --json)jqsh
   deployment_info=$(echo "$deployment_list" | jq -c --arg flow_name "$flow_name" '[.[] | select(.flow_name == $flow_name)]')
   deployment_id=$(echo "$deployment_info" | jq -r '.[-1].id')
   flow_id=$(echo "$deployment_info" | jq -r '.[-1].flow_id')sh
   curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
     -d '{
       "flow_id": "'"$flow_id"'",
       "deployment_id": "'"$deployment_id"'"
     }'Sukanta Roy
09/04/2024, 12:54 PMMarvin
09/04/2024, 12:55 PMprefect deployment inspectsh
prefect deployment inspect <deployment-id>sh
   prefect deployment lssh
   prefect deployment inspect 2e0663bf-be5e-48c1-9f6c-f919e4cb156csh
$ prefect deployment inspect 2e0663bf-be5e-48c1-9f6c-f919e4cb156c
{
  "id": "2e0663bf-be5e-48c1-9f6c-f919e4cb156c",
  "name": "my_flow_deployment",
  "flow_name": "my_flow",
  "parameters": {
    "x": 5
  },
  "tags": [],
  "description": null,
  "schedule": null,
  "created_at": "2023-03-04T12:34:56.000Z",
  ...
}sh
#!/bin/bash
# Deploy the flow and capture the output
deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
echo "$deploy_output"
# Extract the deployment ID from the output (if possible)
# Assuming you had the deployment name or extracted it
deployment_id=$(echo "$deploy_output" | grep -oP "Deployment ID: \K\S+")
# Alternatively, fetch all deployments and find the latest one
deployment_id=$(prefect deployment ls --json | jq -r '.[-1].id')
# Fetch the deployment details
deployment_details=$(prefect deployment inspect "$deployment_id")
echo "$deployment_details"
# Register the deployment details via API
flow_id=$(echo "$deployment_details" | jq -r '.flow_id')
curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
  -d '{
    "flow_id": "'"$flow_id"'",
    "deployment_id": "'"$deployment_id"'"
  }'prefect deployment inspect <deployment-id>Sukanta Roy
09/04/2024, 12:57 PMSukanta Roy
09/04/2024, 12:58 PMMarvin
09/04/2024, 12:58 PMprefect deployment inspect<flow-name>/<deployment-name>prefect deployment inspectprefect deployment lsprefect deployment inspectsh
   prefect deployment ls┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
   ┃ Name                           ┃ ID                                   ┃
   ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
   │ my-flow/my-deployment          │ 2e0663bf-be5e-48c1-9f6c-f919e4cb156c │
   └────────────────────────────────┴──────────────────────────────────────┘<flow-name>/<deployment-name>prefect deployment inspectsh
   prefect deployment inspect my-flow/my-deploymentprefect deployment lsjqsh
#!/bin/bash
# Step 1: Deploy the flow and capture the output
deploy_output=$(prefect deploy --prefect-file path/to/prefect.yaml --all)
echo "$deploy_output"
# Step 2: Use the Prefect CLI to list deployments and find the latest one
deployment_list=$(prefect deployment ls --json)
# Extract flow name and deployment name using jq
latest_deployment=$(echo "$deployment_list" | jq -r '.[-1]')
flow_name=$(echo "$latest_deployment" | jq -r '.flow_name')
deployment_name=$(echo "$latest_deployment" | jq -r '.name')
# Step 3: Fetch deployment details
full_deployment_name="$flow_name/$deployment_name"
deployment_details=$(prefect deployment inspect "$full_deployment_name")
echo "$deployment_details"
# Extract flow ID
flow_id=$(echo "$deployment_details" | jq -r '.flow_id')
# Step 4: Register flow and deployment IDs via API
curl -X POST "http://<metadata-api-url>/register" -H "Content-Type: application/json" \
  -d '{
    "flow_id": "'"$flow_id"'",
    "deployment_id": "'"$deployment_name"'"
  }'<flow-name>/<deployment-name>prefect deployment inspectprefect deployment lsprefect deployment inspect <flow-name>/<deployment-name>Sukanta Roy
09/04/2024, 1:15 PMMarvin
09/04/2024, 1:16 PMprefect.yamlcomponent_typeflow_typeprefect.yamlyaml
name: my-flow-deployment
prefect-version: 2.16.9
build:
  - prefect.deployments.steps.build_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: my-repo/my-image
      tag: latest
      dockerfile: path/to/your/Dockerfile
      push: true
deployments:
  - name: my_flow_deploy
    entrypoint: path/to/your/flow.py:my_flow
    parameters:
      x: 5  # Example parameter value
    tags: ["component_type:analytics", "flow_type:batch"]
    description: "Deployment for analytics batch processing"
    work_pool:
      name: custom-work-pool
      work_queue_name: custom-work-queue
      job_variables:
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.25"
          limits:
            memory: "512Mi"
            cpu: "0.5"prefect deployment inspectsh
deployment_details=$(prefect deployment inspect my-flow/my-flow-deploy)
echo "$deployment_details"
# Extract component_type and flow_type from tags
component_type=$(echo "$deployment_details" | grep -oP 'component_type:\K[^"]*')
flow_type=$(echo "$deployment_details" | grep -oP 'flow_type:\K[^"]*')
echo "Component Type: $component_type"
echo "Flow Type: $flow_type"yaml
...
description: '{"component_type": "analytics", "flow_type": "batch"}'
...sh
deployment_details=$(prefect deployment inspect my-flow/my-flow-deploy --output-json)
echo "$deployment_details"
# Extract metadata from the description field
description=$(echo "$deployment_details" | jq -r '.description')
custom_metadata=$(echo "$description" | jq -r '.')
component_type=$(echo "$custom_metadata" | jq -r '.component_type')
flow_type=$(echo "$custom_metadata" | jq -r '.flow_type')
echo "Component Type: $component_type"
echo "Flow Type: $flow_type"prefect.yamlSukanta Roy
09/16/2024, 11:39 AMMarvin
09/16/2024, 11:39 AMmain.pymain.pymain.pypython
from prefect import flow
@flow
def my_flow(x: int):
    print(f"Running with parameter x={x}")
if __name__ == "__main__":
    my_flow(x=5)python
from prefect.deployments import Deployment
from main import my_flow
# Define the deployment
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    parameters={"x": 5},
)
# Apply the deployment
deployment.apply()python
import os
from prefect.deployments import Deployment
from main import my_flow
# Define the deployment
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    parameters={"x": 5},
)
# Apply the deployment
deployment.apply()
# Optionally, you can print deployment ID or other details
print(f"Deployment '{deployment.name}' applied.")deploy.py.github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Deploy Prefect Flow
      run: |
        python deploy.pyJenkinsfilegroovy
pipeline {
    agent any
    stages {
        stage('Checkout') {
            steps {
                git '<https://your-repo-url.git>'
            }
        }
        stage('Setup Python') {
            steps {
                sh 'python3 -m venv venv'
                sh './venv/bin/pip install --upgrade pip'
                sh './venv/bin/pip install -r requirements.txt'
            }
        }
        stage('Deploy Prefect Flow') {
            steps {
                sh './venv/bin/python deploy.py'
            }
        }
    }
}Sukanta Roy
09/19/2024, 1:57 PMraise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': 'Default Param 1', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': 'Default Param 2', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, 'infrastructure_document_id': None, 'description': 'Sample flow 1 for POC', 'path': NonMarvin
09/19/2024, 1:57 PMExplain what is this error, I am using prefect-3 server.``` raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/' Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': 'Default Param 1', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': 'Default Param 2', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': '
Sukanta Roy
09/19/2024, 1:58 PMprefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': 'Default Param 1', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': 'Default Param 2', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, 'infrastructure_document_id': None, 'description': 'Sample flow 1 for POC', 'path': NonMarvin
09/19/2024, 1:58 PMExplain what is this error, I am using prefect-3 server.```prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/' Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': 'Default Param 1', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': 'Default Param 2', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, '
Sukanta Roy
09/19/2024, 1:59 PMprefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': '', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': '', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, 'infrastructure_document_id': None, 'description': '', 'path': NonMarvin
09/19/2024, 1:59 PMExplain what is this error, I am using prefect-3 server.```prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/' Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'param_1': {'default': '', 'position': 0, 'title': 'param_1', 'type': 'string'}, 'param_2': {'default': '', 'position': 1, 'title': 'param_2', 'type': 'string'}}, 'definitions': {}}, 'parameters': {'param_1': 'Hello', 'param_2': 'World'}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, 'infrastructure_document_id': None, 'description':
Sukanta Roy
09/19/2024, 2:17 PMprefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/' Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'concurrency_limit'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'sample_flow_1', 'flow_id': '<masked>', 'schedules': [], 'concurrency_limit': None, 'enforce_parameter_schema': True, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {}, 'definitions': {}}, 'parameters': {}, 'tags': ['sample_flow_1', 'poc'], 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/app'}}], 'work_queue_name': 'default', 'work_pool_name': 'dev', 'storage_document_id': None, 'infrastructure_document_id': None, 'description': '', 'path': NonMarvin
09/19/2024, 2:18 PMPrefectHTTPStatusError: Client error '422 Unprocessable Entity'422 Unprocessable Entityconcurrency_limitjson
{
  "loc": [
    "body",
    "concurrency_limit"
  ],
  "msg": "extra fields not permitted",
  "type": "value_error.extra"
}concurrency_limitconcurrency_limitconcurrency_limitpython
from prefect.deployments import Deployment
from main import my_flow
# Define the deployment without concurrency_limit
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="sample_flow_1",
    work_pool_name="dev",
    parameters={},
    tags=["sample_flow_1", "poc"],
    pull_steps=[{"prefect.deployments.steps.set_working_directory": {"directory": "/app"}}],
    work_queue_name="default",
    description=""
)
# Apply the deployment
deployment.apply()422 Unprocessable Entityconcurrency_limitconcurrency_limitSukanta Roy
09/20/2024, 11:44 AMMarvin
09/20/2024, 11:45 AM@flowmain.pypython
from prefect import flow, task
class MyFlow:
    def __init__(self, x):
        self.x = x
    @task
    def process(self):
        print(f"Processing value: {self.x}")
        return self.x * 2
    @flow
    def run(self):
        result = self.process()
        print(f"Result: {result}")
if __name__ == "__main__":
    my_flow_instance = MyFlow(x=5)
    my_flow_instance.run()deploy.pypython
from prefect.deployments import Deployment
from main import MyFlow
# Instantiate the class
my_flow_instance = MyFlow(x=5)
# Define the deployment
deployment = Deployment.build_from_flow(
    flow=my_flow_instance.run,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    parameters={},  # Assuming no parameters are needed
)
# Apply the deployment
deployment.apply().github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Deploy Prefect Flow
      run: |
        python deploy.pyJenkinsfilegroovy
pipeline {
    agent any
    stages {
        stage('Checkout') {
            steps {
                git '<https://your-repo-url.git>'
            }
        }
        stage('Setup Python') {
            steps {
                sh 'python3 -m venv venv'
                sh './venv/bin/pip install --upgrade pip'
                sh './venv/bin/pip install -r requirements.txt'
            }
        }
        stage('Deploy Prefect Flow') {
            steps {
                sh './venv/bin/python deploy.py'
            }
        }
    }
}Deployment.build_from_flowSukanta Roy
09/20/2024, 2:25 PMMarvin
09/20/2024, 2:26 PMprefect.yamlDockerfile
   FROM python:3.9-slim
   # Install Prefect
   RUN pip install prefect==2.16.9
   # Copy your flow code
   COPY . /app
   WORKDIR /app
   # Install additional dependencies
   RUN pip install -r requirements.txt
   # Set the entrypoint to Prefect
   ENTRYPOINT ["prefect"]deploy.pypython
   from prefect.deployments import Deployment
   from prefect.infrastructure.docker import DockerContainer
   from main import my_flow  # Import your flow
   # Define the deployment including the custom Dockerfile
   deployment = Deployment.build_from_flow(
       flow=my_flow,
       name="my_deployment",
       work_pool_name="custom-work-pool",
       infrastructure=DockerContainer(
           image="my-repo/my-image:latest",  # Replace with your Docker image name
           image_build_kwargs={
               "path": ".",
               "dockerfile": "Dockerfile"
           },
           # Push the docker image after it's built
           build=True,
           push=True,
           registry_url="your-registry-url",  # Replace with your docker registry URL
           registry_username="your-username",  # Replace with your docker registry username
           registry_password="your-password"  # Replace with your docker registry password
       )
   )
   # Apply the deployment
   deployment.apply().github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Deploy Prefect Flow
      env:
        DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
        DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
      run: |
        docker login -u $DOCKERHUB_USERNAME -p $DOCKERHUB_PASSWORD
        python deploy.pyJenkinsfileMarvin
09/20/2024, 2:26 PMSukanta Roy
09/20/2024, 2:38 PMMarvin
09/20/2024, 2:39 PMDockerContainerAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYdeploy.pypython
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from main import my_flow  # Import your flow
# AWS ECR registry URL and repository name
ecr_registry = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
repository_name = "my-repo/my-image"
# Full image name including ECR registry
image_name = f"{ecr_registry}/{repository_name}:latest"
# Define the deployment including the custom Dockerfile
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    infrastructure=DockerContainer(
        image=image_name,
        image_build_kwargs={
            "path": ".",
            "dockerfile": "Dockerfile"
        },
        # Push the docker image after it's built
        build=True,
        push=True,
        registry_url=ecr_registry,
        registry_username="AWS",  # AWS CLI uses 'AWS' as the username for ECR
        # It's better to use environment variables for sensitive information
        registry_password=os.getenv("AWS_SECRET_ACCESS_KEY")
    )
)
# Apply the deployment
deployment.apply()
print(f"Deployment '{deployment.name}' applied.").github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Login to ECR
      run: |
        aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    - name: Deploy Prefect Flow
      run: |
        python deploy.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}Marvin
09/20/2024, 2:39 PMSukanta Roy
09/20/2024, 2:44 PMMarvin
09/20/2024, 2:45 PMPREFECT_API_URLPREFECT_API_URLdeploy.pypython
import os
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from main import my_flow  # Import your flow
# Set the API URL (replace with your Prefect server URL)
os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
# AWS ECR registry URL and repository name
ecr_registry = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
repository_name = "my-repo/my-image"
# Full image name including ECR registry
image_name = f"{ecr_registry}/{repository_name}:latest"
# Define the deployment including the custom Dockerfile
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    infrastructure=DockerContainer(
        image=image_name,
        image_build_kwargs={
            "path": ".",
            "dockerfile": "Dockerfile"
        },
        # Push the docker image after it's built
        build=True,
        push=True,
        registry_url=ecr_registry,
        registry_username="AWS",  # AWS CLI uses 'AWS' as the username for ECR
        # It's better to use environment variables for sensitive information
        registry_password=os.getenv("AWS_SECRET_ACCESS_KEY")
    )
)
# Apply the deployment
deployment.apply()
print(f"Deployment '{deployment.name}' applied.")PREFECT_API_URL.github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Set Prefect API URL
      run: echo "PREFECT_API_URL=<http://127.0.0.1:4200/api>" >> $GITHUB_ENV
    - name: Login to ECR
      run: |
        aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    - name: Deploy Prefect Flow
      run: |
        python deploy.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}JenkinsfileMarvin
09/20/2024, 2:45 PMPREFECT_API_URLSukanta Roy
09/20/2024, 3:22 PMMarvin
09/20/2024, 3:23 PMjob_variablesDeployment.build_from_flowjob_variablesDeployment.build_from_flowjob_variablesjob_variablesmain.pypython
from prefect import flow, task
@task
def my_task(x):
    print(f"Processing value: {x}")
    return x * 2
@flow
def my_flow(x: int):
    result = my_task(x)
    print(f"Result: {result}")
if __name__ == "__main__":
    my_flow(x=5)deploy.pypython
import os
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from main import my_flow  # Import your flow
# Set the API URL (replace with your Prefect server URL)
os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
# AWS ECR registry URL and repository name
ecr_registry = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
repository_name = "my-repo/my-image"
# Full image name including ECR registry
image_name = f"{ecr_registry}/{repository_name}:latest"
# Define the deployment including the custom Dockerfile and job_variables
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my_deployment",
    work_pool_name="custom-work-pool",
    infrastructure=DockerContainer(
        image=image_name,
        image_build_kwargs={
            "path": ".",
            "dockerfile": "Dockerfile"
        },
        # Push the docker image after it's built
        build=True,
        push=True,
        registry_url=ecr_registry,
        registry_username="AWS",  # AWS CLI uses 'AWS' as the username for ECR
        registry_password=os.getenv("AWS_SECRET_ACCESS_KEY")
    ),
    job_variables={
        "env": {
            "EXECUTION_ENVIRONMENT": os.environ.get("EXECUTION_ENVIRONMENT", "development"),
            "MY_CONFIG": os.environ.get("MY_CONFIG", "default_value")
        },
        "resources": {
            "requests": {
                "memory": "256Mi",
                "cpu": "0.25"
            },
            "limits": {
                "memory": "512Mi",
                "cpu": "0.5"
            }
        }
    }
)
# Apply the deployment
deployment.apply()
print(f"Deployment '{deployment.name}' applied.")flownamework_pool_nameinfrastructurejob_variables.github/workflows/deploy.ymlMarvin
09/20/2024, 3:23 PMjob_variablesSukanta Roy
09/20/2024, 3:35 PMSukanta Roy
09/20/2024, 3:35 PMMarvin
09/20/2024, 3:35 PMflow.deployDockerfileDeploymentImageflow.deployDeploymentImageDockerfile
FROM python:3.9-slim
# Install Prefect
RUN pip install prefect==2.16.9
# Copy your flow code
COPY . /app
WORKDIR /app
# Install additional dependencies if any
RUN pip install -r requirements.txt
# Set the entrypoint to Prefect
ENTRYPOINT ["prefect"]main.pypython
from prefect import flow
@flow(log_prints=True)
def my_flow(x: int):
    print(f"Running with parameter x={x}")
if __name__ == "__main__":
    my_flow(x=5)flow.deploypython
from prefect import flow
from prefect.deployments import DeploymentImage
@flow(log_prints=True)
def my_flow(x: int):
    print(f"Running with parameter x={x}")
if __name__ == "__main__":
    my_flow.deploy(
        name="my-custom-dockerfile-deployment",
        work_pool_name="my-docker-pool",
        image=DeploymentImage(
            name="my_image",
            tag="latest",
            dockerfile="Dockerfile"
        ),
        build=True,  # Ensure the image is built
        push=True,   # Push the image to the registry
        registry_url="<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>",  # Replace with your AWS ECR registry URL
        registry_username="AWS",  # AWS CLI uses 'AWS' as the username for ECR
        registry_password="<AWS_SECRET_ACCESS_KEY>"  # Replace with your AWS secret access key
    ).github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Set Prefect API URL
      run: echo "PREFECT_API_URL=<http://127.0.0.1:4200/api>" >> $GITHUB_ENV
    - name: Login to ECR
      run: |
        aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    - name: Deploy Prefect Flow
      run: |
        python main.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        PREFECT_API_URL: "<http://127.0.0.1:4200/api>"flow.deployDeploymentImageMarvin
09/20/2024, 3:35 PMflow.deploySukanta Roy
09/23/2024, 7:27 AMfetch_stream_data.deploy(
        name=f"raw-google-ads-fetch-{stream}",
        parameters={
            "account_id": None,
            "start_date": None,
            "end_date": None,
            "stream_name": stream,
            "kafka_topic": None,
            "batch_id": None 
        },
        work_pool_name="dataplatform-dev",
        work_queue_name="default",
        image=DeploymentImage(
            name=f"{AWS_ECR_URL}/{AWS_ECR_REPOSITORY_NAME}",
            tag=f"raw-google-ads-fetch-{stream}",
            dockerfile="Dockerfile"
        ),
        build=True,
        push=True
    )pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /appMarvin
09/23/2024, 7:28 AMpullflow.deploypull_stepsprefect.yamlpullpullpull_stepsdeploy.pypython
import os
from prefect import flow
from prefect.deployments import DeploymentImage
@flow(log_prints=True)
def fetch_stream_data(account_id, start_date, end_date, stream_name, kafka_topic, batch_id):
    print(f"Fetching data for stream: {stream_name}")
if __name__ == "__main__":
    # AWS ECR config
    AWS_ECR_URL = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
    AWS_ECR_REPOSITORY_NAME = "my-repo"
    # Set Prefect API URL if required
    os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
    fetch_stream_data.deploy(
        name=f"raw-google-ads-fetch-{stream}",
        parameters={
            "account_id": None,
            "start_date": None,
            "end_date": None,
            "stream_name": stream,
            "kafka_topic": None,
            "batch_id": None 
        },
        work_pool_name="dataplatform-dev",
        work_queue_name="default",
        image=DeploymentImage(
            name=f"{AWS_ECR_URL}/{AWS_ECR_REPOSITORY_NAME}",
            tag=f"raw-google-ads-fetch-{stream}",
            dockerfile="Dockerfile"
        ),
        build=True,
        push=True,
        pull_steps=[
            {"prefect.deployments.steps.set_working_directory": {"directory": "/app"}}
        ]
    )pull_stepspullprefect.yaml.github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Set Prefect API URL
      run: echo "PREFECT_API_URL=<http://127.0.0.1:4200/api>" >> $GITHUB_ENV
    - name: Login to ECR
      run: |
        aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    - name: Deploy Prefect Flow
      run: |
        python deploy.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        PREFECT_API_URL: "<http://127.0.0.1:4200/api>"pull_stepspull_stepsflow.deploypull_stepsSukanta Roy
09/23/2024, 7:29 AMMarvin
09/23/2024, 7:30 AMflow.deploypull_stepspullDeployment.build_from_flowpull_stepsDeployment.build_from_flowpull_stepspull_stepspython
import os
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from main import fetch_stream_data  # Make sure to import your flow
# Set the API URL (replace with your Prefect server URL)
os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
# AWS ECR registry URL and repository name
AWS_ECR_URL = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
AWS_ECR_REPOSITORY_NAME = "my-repo"
# Full image name including ECR registry
image_name = f"{AWS_ECR_URL}/{AWS_ECR_REPOSITORY_NAME}:raw-google-ads-fetch-{stream}"
# Define the deployment including the custom Dockerfile and pull_steps
deployment = Deployment.build_from_flow(
    flow=fetch_stream_data,
    name=f"raw-google-ads-fetch-{stream}",
    parameters={
        "account_id": None,
        "start_date": None,
        "end_date": None,
        "stream_name": stream,
        "kafka_topic": None,
        "batch_id": None 
    },
    work_pool_name="dataplatform-dev",
    work_queue_name="default",
    infrastructure=DockerContainer(
        image=image_name,
        image_build_kwargs={
            "path": ".",
            "dockerfile": "Dockerfile"
        },
        # Push the docker image after it's built
        build=True,
        push=True,
        registry_url=AWS_ECR_URL,
        registry_username="AWS",  # AWS CLI uses 'AWS' as the username for ECR
        registry_password=os.getenv("AWS_SECRET_ACCESS_KEY")
    ),
    pull=[
        {
          "prefect.deployments.steps.set_working_directory": {
            "directory": "/app"
          }
        }
    ]
)
# Apply the deployment
deployment.apply()
print(f"Deployment '{deployment.name}' applied.").github/workflows/deploy.ymlyaml
name: Deploy Prefect Flow
on: [push]
jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.x'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install prefect
        pip install -r requirements.txt
    - name: Set Prefect API URL
      run: echo "PREFECT_API_URL=<http://127.0.0.1:4200/api>" >> $GITHUB_ENV
    - name: Login to ECR
      run: |
        aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin <aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    - name: Deploy Prefect Flow
      run: |
        python deploy.py
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        PREFECT_API_URL: "<http://127.0.0.1:4200/api>"Deployment.build_from_flowpull_stepsDeployment.build_from_flowpull_stepsMarvin
09/23/2024, 7:30 AMSukanta Roy
09/23/2024, 7:34 AMMarvin
09/23/2024, 7:34 AMflow.deploy()pull_stepspullflow.deploy()Dockerfile
FROM python:3.9-slim
# Install Prefect
RUN pip install prefect==2.16.9
# Copy your flow code and set working dir
COPY . /app
WORKDIR /app
# Install additional dependencies
RUN pip install -r requirements.txt
# Set the entrypoint to Prefect
ENTRYPOINT ["prefect"]python
from prefect import flow
@flow(log_prints=True)
def fetch_stream_data(account_id, start_date, end_date, stream_name, kafka_topic, batch_id):
    print(f"Fetching data for stream: {stream_name}")
if __name__ == "__main__":
    fetch_stream_data(x=5)python
import os
from prefect import flow
from prefect.deployments import DeploymentImage
# Assuming 'fetch_stream_data' is defined in main.py
@flow(log_prints=True)
def fetch_stream_data(account_id, start_date, end_date, stream_name, kafka_topic, batch_id):
    print(f"Fetching data for stream: {stream_name}")
if __name__ == "__main__":
    # AWS ECR config
    AWS_ECR_URL = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
    AWS_ECR_REPOSITORY_NAME = "my-repo"
    # Set Prefect API URL if required
    os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
    fetch_stream_data.deploy(
        name=f"raw-google-ads-fetch-{stream}",
        parameters={
            "account_id": None,
            "start_date": None,
            "end_date": None,
            "stream_name": stream,
            "kafka_topic": None,
            "batch_id": None 
        },
        work_pool_name="dataplatform-dev",
        work_queue_name="default",
        image=DeploymentImage(
            name=f"{AWS_ECR_URL}/{AWS_ECR_REPOSITORY_NAME}",
            tag=f"raw-google-ads-fetch-{stream}",
            dockerfile="Dockerfile"
        ),
        build=True,
        push=True
    )pull_stepsflow.deploy()Sukanta Roy
09/23/2024, 7:41 AMMarvin
09/23/2024, 7:42 AMDeploymentTriggerTypesTriggerTypesDeploymentTriggerTypesTriggerTypesDeploymentTriggerTypesTriggerTypesDeploymentTriggerTypesflow.deployflow.deployflow.deployDeploymentEventTriggermain.pypython
from prefect import flow
@flow(log_prints=True)
def fetch_stream_data(account_id, start_date, end_date, stream_name, kafka_topic, batch_id):
    print(f"Fetching data for stream: {stream_name}")
if __name__ == "__main__":
    fetch_stream_data(x=5)deploy.pypython
import os
from prefect import flow
from prefect.deployments import DeploymentImage
from prefect.events import DeploymentEventTrigger
# Define the flow
@flow(log_prints=True)
def fetch_stream_data(account_id, start_date, end_date, stream_name, kafka_topic, batch_id):
    print(f"Fetching data for stream: {stream_name}")
if __name__ == "__main__":
    # AWS ECR config
    AWS_ECR_URL = "<aws-account-number>.dkr.ecr.<region>.<http://amazonaws.com|amazonaws.com>"
    AWS_ECR_REPOSITORY_NAME = "my-repo"
    # Set Prefect API URL if required
    os.environ['PREFECT_API_URL'] = "<http://127.0.0.1:4200/api>"
    # Define the trigger
    event_trigger = DeploymentEventTrigger(
        enabled=True,
        match={"prefect.resource.id": "my-external-resource"},
        expect=["external.resource.pinged"],
        parameters={
            "param_1": "{{ event }}",
        }
    )
    # Deploy the flow
    fetch_stream_data.deploy(
        name=f"raw-google-ads-fetch-{stream}",
        parameters={
            "account_id": None,
            "start_date": None,
            "end_date": None,
            "stream_name": stream,
            "kafka_topic": None,
            "batch_id": None 
        },
        work_pool_name="dataplatform-dev",
        work_queue_name="default",
        image=DeploymentImage(
            name=f"{AWS_ECR_URL}/{AWS_ECR_REPOSITORY_NAME}",
            tag=f"raw-google-ads-fetch-{stream}",
            dockerfile="Dockerfile"
        ),
        build=True,
        push=True,
        triggers=[event_trigger]
    )DeploymentEventTriggerdeploytriggersflow.deployMarvin
09/23/2024, 7:42 AMSukanta Roy
09/25/2024, 2:00 PMMarvin
09/25/2024, 2:00 PMSukanta Roy
09/25/2024, 2:01 PMSukanta Roy
09/25/2024, 2:01 PMMarvin
09/25/2024, 2:02 PMdefinitions:
  work_pools:
    my_docker_work_pool: &my_docker_work_pool
      name: my-docker-work-pool
      work_queue_name: default
  schedules:
    every_ten_minutes: &every_10_minutes
      interval: 600
deployments:
  - name: deployment-1
    entrypoint: flows/hello.py:my_flow
    schedule: *every_10_minutes
    work_pool: *my_docker_work_pool
    parameters:
      number: 42
      message: "Don't panic!"
  - name: deployment-2
    entrypoint: flows/goodbye.py:my_other_flow
    work_pool: *my_docker_work_pool&*deployments/
  ├── flow1_deployment.yaml
  ├── flow2_deployment.yaml
  └── flow3_deployment.yamlprefect deployment apply deployments/flow1_deployment.yaml
prefect deployment apply deployments/flow2_deployment.yaml
prefect deployment apply deployments/flow3_deployment.yamlprefect deployment create -n my_deployment -p path/to/flow.py -q my_work_queue
prefect deployment build path/to/flow.py:flow_func --name my_deployment --output path/to/deployment.yamldeployment.yamldeployments:
  - name: example-deployment
    entrypoint: flows/example.py:example_flow
    parameters:
      param1: value1
      param2: value2Sukanta Roy
09/27/2024, 10:17 AMMarvin
09/27/2024, 10:18 AMsh
pip install prefectget_deploymentPrefectClientpython
from prefect.client import get_client
async def fetch_deployment_details(deployment_name_or_id: str):
    async with get_client() as client:
        # Assuming `deployment_name_or_id` is the deployment name or ID
        deployment = await client.read_deployment_by_id(deployment_name_or_id)
        return deployment
# Usage example
import asyncio
deployment_details = asyncio.run(fetch_deployment_details("<your_deployment_name_or_id>"))
print(deployment_details)<your_deployment_name_or_id>Sukanta Roy
09/30/2024, 12:52 PMMarvin
09/30/2024, 12:52 PMpython
from prefect import flow, task
@task
def task_a():
    print("Task A is running")
    return "Result from A"
@task
def task_b():
    print("Task B is running")
    return "Result from B"
@task
def orchestrator_task():
    result_a = task_a.submit()  # Submit task A
    result_b = task_b.submit()  # Submit task B
    # Wait for results if needed
    return result_a.result(), result_b.result()
@flow
def orchestrator_flow():
    result = orchestrator_task()
    print(result)
if __name__ == "__main__":
    orchestrator_flow()orchestrator_tasktask_atask_bpython
from prefect import flow, task
@task
def print_hello():
    print("Hello from the nested flow!")
@flow
def nested_flow():
    print_hello()
@flow
def main_flow():
    nested_flow()
if __name__ == "__main__":
    main_flow()Sukanta Roy
10/04/2024, 1:44 PMdeployments:
- name: google_sheet_sync
  version: 0.0.1
  tags: ["google_sheet"]
  description: Flow responsible for fetching Google Sheet data
  schedule: null
  entrypoint: entrypoint.py:google_sheet_sync_flow
  work_pool: *common_work_pool19:11:58.380 | DEBUG   | asyncio - Using selector: KqueueSelector
19:11:58.381 | DEBUG   | asyncio - Using selector: KqueueSelector
19:11:58.396 | DEBUG   | httpx - load_ssl_context verify='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem' cert=None trust_env=True http2=False
19:11:58.396 | DEBUG   | httpx - load_verify_locations cafile='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem'
19:11:58.401 | DEBUG   | httpx - load_ssl_context verify='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem' cert=None trust_env=True http2=False
19:11:58.401 | DEBUG   | httpx - load_verify_locations cafile='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem'
19:11:58.406 | DEBUG   | httpx - load_ssl_context verify='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem' cert=None trust_env=True http2=False
19:11:58.407 | DEBUG   | httpx - load_verify_locations cafile='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem'
Expecting value: line 1 column 1 (char 0)Marvin
10/04/2024, 1:44 PMI have a very simple prefect deployment as follows:
deployments:
- name: google_sheet_sync
  version: 0.0.1
  tags: ["google_sheet"]
  description: Flow responsible for fetching Google Sheet data
  schedule: null
  entrypoint: entrypoint.py:google_sheet_sync_flow
  work_pool: *common_work_poolSukanta Roy
10/04/2024, 1:45 PMdeployments:
- name: google_sheet_sync
  version: 0.0.1
  tags: ["google_sheet"]
  description: Flow responsible for fetching Google Sheet data
  schedule: null
  entrypoint: entrypoint.py:google_sheet_sync_flow
  work_pool: *common_work_pool19:11:58.406 | DEBUG   | httpx - load_ssl_context verify='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem' cert=None trust_env=True http2=False
19:11:58.407 | DEBUG   | httpx - load_verify_locations cafile='/Users/uditchaudhary/.local/share/virtualenvs/google_sheet_sync--f7YjGLn/lib/python3.11/site-packages/certifi/cacert.pem'
Expecting value: line 1 column 1 (char 0)Marvin
10/04/2024, 1:45 PMExpecting value: line 1 column 1 (char 0)prefect.yamlentrypointwork_poolpython
import prefect
prefect.context.config.logging.level = "DEBUG"Sukanta Roy
10/09/2024, 10:53 AMMarvin
10/09/2024, 10:53 AMcancel_flow_runpython
import anyio
from prefect.client import get_client
from prefect.states import StateType
async def cancel_flow_run(flow_run_id: str):
    async with get_client() as client:
        print(f"Cancelling flow run with ID '{flow_run_id}'")
        
        # Retrieve the current state of the flow run
        flow_run = await client.read_flow_run(flow_run_id)
        
        # Set the state to "CANCELLED"
        state_updates = {"name": "Cancelled", "type": StateType.CANCELLED}
        new_state = flow_run.state.copy(update=state_updates)
        
        # Force the cancellation of the flow run
        await client.set_flow_run_state(flow_run_id, new_state, force=True)
# Example usage
flow_run_id = "<your_flow_run_id>"
anyio.run(cancel_flow_run, flow_run_id)set_flow_run_stateforce=Trueflow_run_idSukanta Roy
10/09/2024, 11:03 AMMarvin
10/09/2024, 11:04 AMflow_run_idpython
import anyio
from prefect.client import get_client
async def get_task_run_ids_for_flow(flow_run_id: str):
    async with get_client() as client:
        task_runs = await client.read_task_runs(flow_run_id=flow_run_id)
        task_run_ids = [task_run.id for task_run in task_runs]
        return task_run_ids
# Example usage
flow_run_id = "<your_flow_run_id>"
task_run_ids = anyio.run(get_task_run_ids_for_flow, flow_run_id)
print(task_run_ids)flow_run_ididSukanta Roy
10/09/2024, 1:50 PMMarvin
10/09/2024, 1:50 PMflow_run_idrun_deploymentpython
import anyio
from prefect.client import get_client
async def trigger_flow_run(deployment_id: str, parameters: dict = None):
    async with get_client() as client:
        response = await client.create_flow_run_from_deployment(
            deployment_id=deployment_id,
            parameters=parameters
        )
        
        flow_run_id = response.id
        print(f"Flow run triggered. Flow Run ID: {flow_run_id}")
        return flow_run_id
# Example usage
deployment_id = "<your_deployment_id>"
params = {"param1": "value1", "param2": "value2"}  # replace with actual parameters
flow_run_id = anyio.run(trigger_flow_run, deployment_id, params)flow_run_id"<your_deployment_id>"paramsSukanta Roy
10/09/2024, 2:42 PMMarvin
10/09/2024, 2:43 PMon_failurepython
from prefect import flow, task
from prefect.states import Failed
@task
def may_fail():
    raise ValueError("This task fails!")
@flow
def main_flow():
    try:
        may_fail()
    except Exception as e:
        return Failed(message=f"Flow failed due to error: {str(e)}")
if __name__ == "__main__":
    main_flow()retriesSukanta Roy
10/09/2024, 4:26 PMMarvin
10/09/2024, 4:26 PMflow_run_idpython
import anyio
import time
from prefect.client import get_client
from prefect.states import StateType
# Function that polls for the flow run status
async def poll_flow_run_status(flow_run_id: str, poll_interval: int = 10):
    async with get_client() as client:
        while True:
            # Fetch the current state of the flow run
            flow_run = await client.read_flow_run(flow_run_id)
            # Check the state of the flow run
            if flow_run.state.type in {StateType.COMPLETED, StateType.FAILED}:
                print(f"Flow run has completed. State: {flow_run.state.type}")
                return flow_run.state.type
            else:
                print(f"Flow run is still in state: {flow_run.state.type}. Polling again in {poll_interval} seconds...")
            
            # Wait for the specified poll interval before checking again
            await anyio.sleep(poll_interval)
# Example usage
flow_run_id = "<your_flow_run_id>"
anyio.run(poll_flow_run_status, flow_run_id, poll_interval=15)StateType.COMPLETEDStateType.FAILEDpoll_intervalpoll_intervalSukanta Roy
10/09/2024, 4:32 PMMarvin
10/09/2024, 4:33 PMrun_deploymentpython
import asyncio
import time
from prefect.client import get_client
from prefect.states import StateType
# Synchronous wrapper around async function
def poll_flow_run_status_sync(flow_run_id: str, poll_interval: int = 10):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(poll_flow_run_status(flow_run_id, poll_interval))
    loop.close()
# Async function definition remains the same
async def poll_flow_run_status(flow_run_id: str, poll_interval: int):
    async with get_client() as client:
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            if flow_run.state.type in {StateType.COMPLETED, StateType.FAILED}:
                print(f"Flow run has completed. State: {flow_run.state.type}")
                return flow_run.state.type
            else:
                print(f"Flow run is in state: {flow_run.state.type}. Polling in {poll_interval} seconds...")
                await asyncio.sleep(poll_interval)
# Example usage for synchronous polling
flow_run_id = "<your_flow_run_id>"
poll_flow_run_status_sync(flow_run_id, poll_interval=15)poll_flow_run_statusasyncio.runSukanta Roy
10/09/2024, 6:39 PMMarvin
10/09/2024, 6:39 PMtask.submit.result().result()python
from prefect import flow, task
@task
def example_task(number):
    return number * number
@flow
def concurrent_flow():
    # Submit tasks asynchronously
    task_results = [example_task.submit(i) for i in range(5)]
    # Gather results using the .result() method
    results = [task.result() for task in task_results]
    # Print the results
    print(f"Task results: {results}")
    return results
if __name__ == "__main__":
    concurrent_flow()task.submit().result().result().result().result()Sukanta Roy
10/10/2024, 2:05 PMMarvin
10/10/2024, 2:06 PM/flow_runspython
import httpx
# Prefect API URL
PREFECT_API_URL = "http://<your-prefect-server-url>/api"
# Example deployment ID
deployment_id = "<your_deployment_id>"
# Optional: Parameters for your flow
params = {
    "some_param": "value"
}
# API request to create a flow run
response = <http://httpx.post|httpx.post>(
    f"{PREFECT_API_URL}/flow_runs",
    json={
        "deployment_id": deployment_id,
        "parameters": params
    },
    headers={
        "Authorization": "Bearer <your_api_token>"
    }
)
# Check response
if response.status_code == 200:
    flow_run_info = response.json()
    flow_run_id = flow_run_info.get("id")
    print(f"Flow run created with ID: {flow_run_id}")
else:
    print(f"Failed to create flow run: {response.text}")<your-prefect-server-url><your_deployment_id>Sukanta Roy
10/10/2024, 2:55 PMMarvin
10/10/2024, 2:55 PMdeployment_iddeployment_idflow_iddeployment_idflow_idflow_idflow_idSukanta Roy
10/10/2024, 2:57 PM{
  "name": "sample_flow_triggered_from_api",
  "flow_id": "ff7d061f-1829-488a-8471-89f029bfbcb1",
  "deployment_id": "8375e081-179d-4000-8d07-298f6c0f35d7",
  "state": {
    "type": "SCHEDULED"
  },
  "parameters": {
    "param_1": "Hello",
    "param_2": "World"
  }
}Sukanta Roy
10/10/2024, 2:58 PM{
  "name": "sample_flow_triggered_from_api",
  "flow_id": "ff7d061f-1829-488a-8471-89f029bfbcb1",
  "deployment_id": "8375e081-179d-4000-8d07-298f6c0f35d7",
  "state": {
    "type": "SCHEDULED"
  },
  "parameters": {
    "param_1": "Hello",
    "param_2": "World"
  }
}Marvin
10/10/2024, 2:58 PMscheduled_start_timeSukanta Roy
10/17/2024, 10:16 AMMarvin
10/17/2024, 10:17 AMSukanta Roy
11/19/2024, 4:20 PMMarvin
11/19/2024, 4:21 PMprefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not availableSukanta Roy
11/21/2024, 2:34 PMMarvin
11/21/2024, 2:34 PMSukanta Roy
11/27/2024, 1:14 PMMarvin
11/27/2024, 1:14 PMname@taskpython
from prefect import flow, task
# Define a task with a custom name
@task(name="Custom Task Name")
def example_task(x):
    return x * x
@flow
def example_flow():
    result = example_task(5)
    print(f'Result: {result}')
if __name__ == "__main__":
    example_flow()Sukanta Roy
11/27/2024, 1:20 PMMarvin
11/27/2024, 1:20 PMtask_run_namepython
from prefect import flow, task
@task(task_run_name="Tracking Task - {request_id}")
def tracking_task(request_id):
    # Simulate tracking logic
    print(f"Tracking request ID: {request_id}")
@flow
def tracking_flow():
    request_ids = ["req-1", "req-2", "req-3"]
    for req_id in request_ids:
        tracking_task.submit(req_id)
if __name__ == "__main__":
    tracking_flow()task_run_name@tasktask_run_name="Tracking Task - {request_id}"request_idrequest_idSukanta Roy
11/27/2024, 1:27 PM@task(task_run_name="Tracking Task - {request_id}")Marvin
11/27/2024, 1:27 PMtask_run_namerequest_idpython
from prefect import flow, task
@task(task_run_name=lambda **kwargs: f"Tracking Task - {kwargs['request_id']}")
def tracking_task(request_id):
    # Simulate tracking logic
    print(f"Tracking request ID: {request_id}")
@flow
def tracking_flow():
    request_ids = ["req-1", "req-2", "req-3"]
    for req_id in request_ids:
        tracking_task.submit(req_id)
if __name__ == "__main__":
    tracking_flow()kwargsrequest_idkwargsSukanta Roy
03/26/2025, 8:34 AMMarvin
03/26/2025, 8:34 AM@task@task@taskpython
from prefect import task
import time
def track_execution_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Task {func.__name__} executed in {execution_time:.2f} seconds")
        return result
    return wrapper
@task
@track_execution_time
def my_task(x):
    # Simulate some work with sleep
    time.sleep(x)
    return x
# Usage in a flow
from prefect import flow
@flow
def my_flow():
    my_task(2)
my_flow()@task@taskSukanta Roy
03/26/2025, 8:36 AMMarvin
03/26/2025, 8:36 AM@task@taskpython
from prefect import task
import time
def track_execution_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Task {func.__name__} executed in {execution_time:.2f} seconds")
        return result
    return wrapper
@track_execution_time
@task
def my_task(x):
    # Simulate some work with sleep
    time.sleep(x)
    return x
# Usage in a flow
from prefect import flow
@flow
def my_flow():
    my_task(2)
my_flow()@task@track_execution_timeSukanta Roy
03/26/2025, 8:52 AMMarvin
03/26/2025, 8:52 AMloggingget_run_loggerget_run_loggerpython
from prefect import flow, task, get_run_logger
@task
def my_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("This is an info log from a task.")
    logger.debug("This is a debug log from a task.")
@flow
def my_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("This is an info log from the flow.")
    my_task()
my_flow()get_run_logger