Felix Vadan
09/25/2023, 6:05 PMMarvin
09/25/2023, 6:06 PMMarvin
09/25/2023, 6:06 PMFelix Vadan
09/25/2023, 6:07 PMMarvin
09/25/2023, 6:08 PMpython
from prefect import Flow, Task, artifacts
from prefect.engine.results import Result
class CheckCount(Task):
def run(self, count: int, threshold: int):
# Store the count as an artifact
artifacts.set("count", count)
# Check if the count is below the threshold
if count < threshold:
return True
else:
return False
class SendNotification(Task):
def run(self, should_notify: bool):
if should_notify:
# Send a Slack notification
print("Sending Slack notification...")
else:
print("Count is above threshold, not sending notification.")
# Initialize tasks
check_count = CheckCount()
send_notification = SendNotification()
# Define flow
with Flow("My Flow") as flow:
count = Result(value=10) # This would be your actual count
should_notify = check_count.run(count, threshold=5)
send_notification.run(should_notify)
# Run flow
flow.run()
In this example, the CheckCount
task stores the count as an artifact. The SendNotification
task sends a Slack notification if the count is below a certain threshold. The next time your flow runs, it can retrieve the count from the artifacts and decide whether to send a notification.
Remember, this is just a simple example. Your actual implementation may be more complex, depending on your needs. But that's the basic idea.
I hope this helps. If not, well, I've been wrong before. It's one of the few things I'm good at.Felix Vadan
09/25/2023, 6:08 PMFelix Vadan
09/25/2023, 6:36 PMMarvin
09/25/2023, 6:37 PMpython
from prefect import Flow, Parameter, Task
import os
class CheckCount(Task):
def run(self, count: int, threshold: int):
# Check if the count is below the threshold
if count < threshold:
return True
else:
return False
class SendNotification(Task):
def run(self, should_notify: bool):
if should_notify:
# Send a Slack notification
print("Sending Slack notification...")
else:
print("Count is above threshold, not sending notification.")
class UpdateCount(Task):
def run(self, should_notify: bool, count: int):
if should_notify:
# Update the count in the external storage
with open('count.txt', 'w') as f:
f.write(str(count))
# Initialize tasks
check_count = CheckCount()
send_notification = SendNotification()
update_count = UpdateCount()
# Define flow
with Flow("My Flow") as flow:
count = Parameter('count', default=10) # This would be your actual count from an external source
threshold = Parameter('threshold', default=5)
should_notify = check_count.run(count, threshold)
send_notification.run(should_notify)
update_count.run(should_notify, count)
# Run flow
flow.run(parameters={'count': int(open('count.txt').read()) if os.path.exists('count.txt') else 10})
In this example, the CheckCount
task checks if the count is below a threshold. The SendNotification
task sends a Slack notification if the count is below the threshold. The UpdateCount
task updates the count in an external storage (a text file in this case) if a notification was sent. The next time your flow runs, it reads the count from the external storage.
I hope this helps. If not, well, I'm used to being wrong. It's one of the few things I'm good at.Felix Vadan
09/25/2023, 6:38 PMMarvin
09/25/2023, 6:38 PMpython
from prefect import Flow, Parameter, task
import os
@task
def get_count():
if os.path.exists('count.txt'):
with open('count.txt', 'r') as f:
return int(f.read())
else:
return 10
@task
def check_count(count: int, threshold: int):
return count < threshold
@task
def send_notification(should_notify: bool):
if should_notify:
print("Sending Slack notification...")
@task
def update_count(should_notify: bool, count: int):
if should_notify:
with open('count.txt', 'w') as f:
f.write(str(count))
with Flow("My Flow") as flow:
count = get_count()
threshold = Parameter('threshold', default=5)
should_notify = check_count(count, threshold)
send_notification(should_notify)
update_count(should_notify, count)
flow.run()
In this example, the get_count
task reads the count from an external storage (a text file in this case). The check_count
task checks if the count is below a threshold. The send_notification
task sends a Slack notification if the count is below the threshold. The update_count
task updates the count in the external storage if a notification was sent.
I hope this helps. If not, well, it wouldn't be the first time I've failed to meet expectations.Felix Vadan
09/25/2023, 6:39 PMMarvin
09/25/2023, 6:40 PMpython
from prefect import Flow, Parameter, task, artifacts
@task
def check_count(count: int, threshold: int):
# Store the count as an artifact
artifacts.create_link_artifact(
name="Count",
location=str(count),
kind="raw",
data_type="integer"
)
return count < threshold
@task
def send_notification(should_notify: bool):
if should_notify:
print("Sending Slack notification...")
with Flow("My Flow") as flow:
count = Parameter('count', default=10) # This would be your actual count
threshold = Parameter('threshold', default=5)
should_notify = check_count(count, threshold)
send_notification(should_notify)
flow.run()
In this example, the check_count
task stores the count as a link artifact. The send_notification
task sends a Slack notification if the count is below a certain threshold. The next time your flow runs, it can retrieve the count from the artifacts and decide whether to send a notification.
Please note that this will only store the count for the duration of the flow run. If you need to persist data across multiple runs, you will need to use an external storage system.
I hope this helps. If not, well, it wouldn't be the first time I've failed to meet expectations.