Omar Hamad
12/07/2023, 4:46 PMMarvin
12/07/2023, 4:47 PMpip install -U "prefect"
. Ensure that you have Prefect 2.x, which is the latest major version.
2. Prefect API Connection: Depending on whether you're self-hosting or using Prefect Cloud, you'll need to ensure your environment is connected to the Prefect API. If you're self-hosting, start the Prefect webserver and related services. If using Prefect Cloud, sign up and log in using the CLI with prefect cloud login
.
3. Docker Container Setup:
a. Create a Dockerfile for your Python script. This Dockerfile should install Python alongside any dependencies your script requires, and copy your script into the image.
b. Build the Docker image from your Dockerfile with a command similar to docker build -t my-prefect-image .
.
4. API URL Configuration: Docker containers need a way to communicate with the Prefect API. When running on Linux, you can set PREFECT_API_URL=<http://localhost:4200/api>
and use the --network="host"
option.
5. Running Your Prefect Flow: You can run your Prefect flow inside the Docker container by making sure the PREFECT_API_URL environment variable is set correctly as described above.
Here's an overall approach:
- Define a Prefect flow in a Python script which you want to run. This can be a simple script with tasks and flow definition using Prefect’s API.
- Create a Dockerfile that:
- Sets up the necessary Python environment.
- Installs Prefect if not already included in a custom base image.
- Copies the Python script into the image.
- Specifies the command to run the Prefect flow when the container starts.
- Build the Docker image.
- Run the Docker container with your image, making sure to set up the PREFECT_API_URL
and any necessary networking, especially if you are hosting Prefect Server locally or using a specific instance of Prefect Cloud. If running on Linux, remember to pass --network="host"
to give the container access to the host network.
If you have a specific Prefect environment or certain requirements around how the worker should operate, additional configuration may be necessary. If you encounter any issues or have more detailed requirements, please let me know to provide further assistance.Nate
12/07/2023, 6:10 PMfrom time import sleep
from prefect import flow, serve
@flow
def subflow():
pass
@flow(log_prints=True)
def foo(msg: str = "Hello marvin") -> str:
print(msg)
subflow()
sleep(30)
if __name__ == '__main__':
# serves the flow
serve(foo.to_deployment(__file__), limit=1)
i can do
FROM prefecthq/prefect:2-python3.11
COPY ./simple_serve.py /app/simple_serve.py
WORKDIR /app
CMD ["python", "simple_serve.py"]
Nate
12/07/2023, 6:12 PMdocker run
to give it a PREFECT_API_KEY
and url, so it can receive / execute flow runs for that deployment. you can also pass more <http://flow.to|flow.to>_deployment()
deployments to serve
if you wantOmar Hamad
12/07/2023, 7:39 PMOmar Hamad
12/07/2023, 7:57 PMOmar Hamad
12/07/2023, 7:57 PM@Flow
def batch_update_products():
netsuite_products = get_netsuite_products()
netsuite_products_mapped = map_netsuite_to_hubspot_for_update(netsuite_products)
update_hubspot_products(netsuite_products_mapped, netsuite_products)
@Flow
def batch_create_products():
logger = get_run_logger()
products = get_netsuite_products()
products = map_netsuite_to_hubspot_for_create(products)
hubspot_client = hubspot.Client.create()
try:
input = BatchInputSimplePublicObjectInputForCreate(inputs=products)
hubspot_client.crm.products.batch_api.create(input)
except ApiException as e:
logger.error("Exception when calling ProductsApi->batch_create: %s\n" % e)
Nate
12/07/2023, 8:22 PMyou can also pass morewas exactly thisdeployments to<http://flow.to|flow.to>_deployment()
if you wantserve
from prefect import flow, serve
...
@flow
def batch_update_products():
netsuite_products = get_netsuite_products()
netsuite_products_mapped = map_netsuite_to_hubspot_for_update(netsuite_products)
update_hubspot_products(netsuite_products_mapped, netsuite_products)
@flow
def batch_create_products():
logger = get_run_logger()
products = get_netsuite_products()
products = map_netsuite_to_hubspot_for_create(products)
hubspot_client = hubspot.Client.create()
try:
input = BatchInputSimplePublicObjectInputForCreate(inputs=products)
hubspot_client.crm.products.batch_api.create(input)
except ApiException as e:
logger.error("Exception when calling ProductsApi->batch_create: %s\n" % e)
if __name__ == "__main__":
serve(
batch_update_products.to_deployment("update"),
batch_create_products.to_deployment("create")
)
Omar Hamad
12/08/2023, 5:09 PM<http://batch_update_products.to|batch_update_products.to>_deployment("update"),
only in the code? because every time i rebuild and rerun the docker container i gotta set it in the ui everytime and i don't want that, i took a look at the documentations but no clue tbhNate
12/08/2023, 5:16 PMto_deployment
accepts all the standard deployment scheduling kwargs like interval
cron
etcOmar Hamad
12/08/2023, 5:17 PMOmar Hamad
12/08/2023, 5:17 PM