【Ambari】Python调用Rest API 获取YARN HA状态信息并发送钉钉告警
import requests
import json
import sys
# 钉钉机器人的Webhook地址
DINGDING_WEBHOOK = 'https://oapi.dingtalk.com/robot/send?access_token=YOUR_ACCESS_TOKEN'
# 获取YARN HA状态的Ambari Rest API
AMBARI_API_URL = 'http://<ambari-server-ip>:8080/api/v1/clusters/<cluster-name>/services/YARN/components/YARN_RESOURCEMANAGER?fields=RootServiceComponents/properties/rm_web_service_address'
# 获取YARN HA状态的API
YARN_HA_STATUS_API = 'http://<rm1-ip>:8088/ws/v1/cluster/info'
# 发送钉钉机器人消息
def send_dingding_message(message):
headers = {'Content-Type': 'application/json', 'Charset': 'UTF-8'}
data = {
"msgtype": "text",
"text": {
"content": message
}
}
response = requests.post(DINGDING_WEBHOOK, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print('消息已发送至钉钉')
else:
print(f'消息发送失败, 状态码: {response.status_code}')
# 获取YARN HA状态并发送消息
def get_yarn_ha_status_and_send_message():
try:
# 获取YARN HA状态
response = requests.get(YARN_HA_STATUS_API)
if response.status_code == 200:
yarn_ha_status = response.json()
# 判断YARN HA状态是否正常
if yarn_ha_status['clusterInfo']['haState'] == 'ACTIVE':
message = 'YARN HA状态正常,当前状态为ACTIVE。'
else:
message = f'警告!YARN HA状态异常,当前状态为{yarn_ha_status["clusterInfo"]["haState"]}。'
# 发送钉钉消息
send_dingding_message(message)
else:
print(f'获取YARN HA状态失败, 状态码: {response.status_code}')
except Exception as e:
print(f'发生错误: {e}')
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = exc_tb.tb_frame.f_code.co_filename
lineno = exc_tb.tb_lineno
print(f'错误位置: {fname}, 行: {lineno}')
# 主函数
if __name__ == '__main__':
get_yarn_ha_status_and_send_message()
在这个代码实例中,我们首先定义了钉钉机器人的Webhook地址和需要访问的Ambari API地址。然后定义了send_dingding_message
函数来发送消息到钉钉机器人,并在get_yarn_ha_status_and_send_message
函数中实现了获取YARN HA状态并根据状态发送消息的逻辑。如果状态异常,将发送消息到钉钉群。这个例子展示了如何通过Python调用API并根据响应结果发送消息的简单方法。
评论已关闭