JV
03/30/2023, 10:37 AMfrom prefect.engine.state import FAIL
from prefect.tasks.control_flow import ifelse
from prefect.utilities.tasks import defaults_from_attrs
environment details
Prefect Version: 2.8.7
Do we need to install any additional packages?JV
03/30/2023, 10:39 AMfrom prefect import Task
from jira import JIRA
from prefect import Flow, task
from prefect.engine.state import FAIL
from prefect.tasks.control_flow import ifelse
from prefect.utilities.tasks import defaults_from_attrs
class JiraTask(Task):
def __init__(self, url, username, password, project, issue_type, summary, description, **kwargs):
self.url = url
self.username = username
self.password = password
self.project = project
self.issue_type = issue_type
self.summary = summary
self.description = description
super().__init__(**kwargs)
@defaults_from_attrs("url", "username", "password", "project", "issue_type", "summary", "description")
def run(self, task, **kwargs):
if task.is_failed():
jira = JIRA(server=self.url, basic_auth=(self.username, self.password))
issue_dict = {
"project": {"key": self.project},
"summary": self.summary,
"description": self.description,
"issuetype": {"name": self.issue_type},
}
jira.create_issue(fields=issue_dict)
return None
jira_task = JiraTask(
url="<https://jira4project.atlassian.net>",
username="username",
password="password",
project="project",
issue_type="Bug",
summary="A task in my flow failed",
description="Task {task_name} in flow {flow_name} failed with error: {error_message}",
)
@task
def task1():
# Some task code here
raise FAIL("Task 1 failed")
@task
def task2():
# Some task code here
pass
with Flow("Jira-Flow") as flow:
res1 = task1()
res2 = ifelse(res1, task2, jira_task, upstream_tasks=[res1])
flow.run()
Zanie
JV
03/30/2023, 2:28 PMZanie
Zanie
JV
03/30/2023, 3:31 PM