lolafect/lolafect/slack.py
2022-12-30 11:35:31 +01:00

44 lines
1.4 KiB
Python

import json
from prefect.core import Task
import requests
class SendSlackMessageTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, webhook_url: str, text_to_send: str) -> None:
"""
Pass the details from the task to the actual function.
:param webhook_url: the URL of the Slack webhook that should receive the
message.
:param text_to_send: the text to send to the Slack channel.
:return: None
"""
send_message_to_slack_channel(webhook_url, text_to_send)
def send_message_to_slack_channel(webhook_url: str, text_to_send: str) -> None:
"""
Send an HTTP POST to a Slack webhook to deliver a message in a channel. Raise
an error if Slack does not reply with a 200 OK status.
:param webhook_url: the URL of the Slack webhook that should receive the
message.
:param text_to_send: the text to send to the Slack channel.
:return: None
"""
slack_data = {"text": text_to_send}
response = requests.post(
webhook_url,
data=json.dumps(slack_data),
headers={"Content-Type": "application/json"},
)
if response.status_code != 200:
raise ValueError(
"Request to slack returned an error %s, the response is:\n%s"
% (response.status_code, response.text)
)