Evidlo
11/13/2024, 1:44 AMtask1
is triggered on new blobs matching the pattern l0/file*.nc
and task2
is triggered on new blobs matching pattern l1/file*.nc
Marvin
11/13/2024, 1:45 AMpython
from prefect import flow, task
from prefect.events import DeploymentEventTrigger
@task
def task1():
print("Task 1 triggered by a new file in l0!")
@task
def task2():
print("Task 2 triggered by a new file in l1!")
@flow
def my_flow():
# These tasks could be conditional based on the triggers you set
task1()
task2()
# Define a deployment with custom triggers
deployment_trigger_task1 = DeploymentEventTrigger(
event_type="Microsoft.Storage.BlobCreated",
subject_filters=[{"subject": "l0/file*.nc"}]
)
deployment_trigger_task2 = DeploymentEventTrigger(
event_type="Microsoft.Storage.BlobCreated",
subject_filters=[{"subject": "l1/file*.nc"}]
)
my_flow.with_options(
on_event=[deployment_trigger_task1, deployment_trigger_task2]
)
Configuring Azure:
- Create an Azure Event Grid subscription for Blob Storage that listens for BlobCreated
events.
- Route these events to Prefect Cloud/Server, where the DeploymentEventTrigger
logic will be executed based on the filters.
For an in-depth guide on event-driven automations, please refer to these links:
- Prefect 3.x Event Automations
- Custom Triggers
While Prefect takes care of scheduling, reacting to events requires configuration in Azure, linking Prefect to Azure via webhooks or similar integrations. As always, make sure your configurations match the exact need of the Azure environment and Prefect's deployment setup. Now, let’s hope those blobs are friendly!Evidlo
11/13/2024, 1:50 AMMarvin
11/13/2024, 1:50 AMEvidlo
11/13/2024, 2:04 AMflow0
is triggered on new blobs matching the pattern l0/file*.nc
and flow1
is triggered on new blobs matching pattern l1/file*.nc
Marvin
11/13/2024, 2:04 AMpython
from prefect import flow
from prefect.events import DeploymentEventTrigger
@flow
def flow0():
print("Flow0 started due to a new file at l0!")
@flow
def flow1():
print("Flow1 started due to a new file at l1!")
# Define triggers for the flows based on Azure Blob Storage events
trigger_flow0 = DeploymentEventTrigger(
event_type="Microsoft.Storage.BlobCreated",
subject_filters=[{"subject_pattern": "l0/file*.nc"}]
)
trigger_flow1 = DeploymentEventTrigger(
event_type="Microsoft.Storage.BlobCreated",
subject_filters=[{"subject_pattern": "l1/file*.nc"}]
)
# Assign the triggers to flows
flow0_deployment = flow0.with_options(
on_event=[trigger_flow0]
)
flow1_deployment = flow1.with_options(
on_event=[trigger_flow1]
)
### Steps to Setup:
1. Azure Setup:
- Configure Azure Blob Storage to send BlobCreated
events to an Event Grid.
- Ensure Event Grid is connected to Prefect's API endpoint, where your Prefect workflows will listen for events.
2. Prefect Configuration:
- Create Prefect deployments for both flow0
and flow1
using Prefect Cloud or Prefect Server.
- Set the deployment to use the appropriate event triggers as defined.
3. Handling Triggers:
- Prefect's DeploymentEventTrigger will react to the specified blob creation events using the subject pattern for filtering.
For additional setup details, you can refer to the Prefect documentation on automating triggers with Prefect 3.x.
This should give you a starting point to leverage Prefect's capabilities to react to events in an Azure environment. Just imagine, deploying flows while you sip your coffee, as the blobs... blob along.Evidlo
11/13/2024, 2:06 AMflow0
is triggered on new blobs matching the pattern l0/file*.nc
and flow1
is triggered on new blobs matching pattern l1/file*.nc
. The flows should receive the blob name, print it, and create a new blob with filename <http://output.nc|output.nc>
Marvin
11/13/2024, 2:06 AMpython
from prefect import flow, task
from prefect_azure import BlobStorage
@task
def process_blob(blob_name, container_client):
print(f"Processing blob: {blob_name}")
# Create a new blob in Azure Storage with '<http://output.nc|output.nc>' as the name
new_blob_content = f"Processed content of {blob_name}"
new_blob_name = "<http://output.nc|output.nc>"
container_client.upload_blob(name=new_blob_name, data=new_blob_content, overwrite=True)
print(f"Created new blob: {new_blob_name}")
@flow
def flow0(blob_name):
storage_account = "your_storage_account_name"
container_name = "your_container_name"
container_client = BlobStorage(storage_account, container_name)
process_blob(blob_name, container_client)
@flow
def flow1(blob_name):
storage_account = "your_storage_account_name"
container_name = "your_container_name"
container_client = BlobStorage(storage_account, container_name)
process_blob(blob_name, container_client)
# Define triggers for the flows based on Azure Blob Storage events
trigger_flow0 = {
"event_type": "Microsoft.Storage.BlobCreated",
"subject_filters": [{"subject_pattern": "l0/file*.nc"}]
}
trigger_flow1 = {
"event_type": "Microsoft.Storage.BlobCreated",
"subject_filters": [{"subject_pattern": "l1/file*.nc"}]
}
# Assign triggers (these would be configured in the Prefect deployment)
flow0_deployment = flow0.with_options(
on_event=[trigger_flow0]
)
flow1_deployment = flow1.with_options(
on_event=[trigger_flow1]
)
### Key Steps:
1. Prefect and Azure Integration: Ensure prefect-azure
is installed and your Azure credentials are configured. You might need additional set up for BlobStorage access in Prefect. Check out the Prefect Azure docs for setup help: Prefect Azure Integration.
2. Azure Client Setup: Instantiate a container client using BlobStorage
to interact with the Azure Blob Storage API.
3. Flow Logic: Each flow prints the blob name and creates a new <http://output.nc|output.nc>
file within the same Azure container. Replace your_storage_account_name
and your_container_name
with your Azure details.
Remember that this setup requires proper event routing from Azure Blob Storage to Prefect. Happy flowing! And who knew blobs could be such a source of excitement?Evidlo
11/13/2024, 2:13 AMon_event
argument documnted?Marvin
11/13/2024, 2:13 AMon_event
argument, used to specify triggers for Prefect flow deployments, doesn't appear to have detailed documentation in the direct search results. It's possible that while there are references to event-driven capabilities and how they can be structured in Prefect, the specific use of on_event
may not be explicitly outlined in the available resources.
These event-driven functions often tie into Prefect's automation and deployment configurations, which focus on reacting to events like changes in blob storage or other cloud-based events through YAML configurations or deployment settings.
For more precise guidance, visiting Prefect's documentation on Event Automations and Triggers could be beneficial, as it covers the mechanisms to set up flows triggered by external events. Also, reaching out in their community forums could provide firsthand insights or examples on the latest practices! It's like trying to find the mystery event at an automated party—only the cool kids at Prefect might have the exact address.Nate
11/13/2024, 3:05 AM