こんにちは、ますのです。
ISMS認証取得を進めるにあたり「BCP対策」なるものがありました。
その際に「安否確認の仕組み」をしっかり作りましょうねというお話を頂いた次第。
ちなみに言われるがままに作った安否確認の運用は以下の通り。
安否確認:現在の運用方法
- 安否確認ツールに緊急連絡先(メールアドレス)を登録してもらう(入社時)
- 緊急連絡先(メールアドレス)の定期的な棚卸しの実施
- 地震発生時に管理者が「震度」を確認し規定以上の場合は通知する
- 受信者はメールを確認して状況を報告し記録する
この運用は腐りそう…。
誰が幸せになるんだろうか…。
ということである程度簡易になるように出来ないか模索した内容とプログラムをメモっていきます。
安否確認ツール:運用改善(案)
まずメールアドレスの登録や、メールアドレスへの通知はやめたいです。
日頃から目にしていないものに連絡が飛んだところで誰も見ないのです。
安否確認ツール:改良する箇所
- 震度6以上が発生した際は地域関係なく全て自動通知する
- 1時間毎にプログラムを定期実行する
- 直近1時間で地震情報が更新されていれば通知を行う
- 通知先はSlackの専用チャンネルにする
- 従業員は安否の返答でSlackにスタンプや返信をする
- 人数が増えた際はGoogleアンケートとかのURL送って回答してもらうようにすれば良し
Slackは入社時に必ず登録するので各自の手間が減ります。
通知対応もシステムで判定するようにすれば管理者は気楽です。
システム導入するような規模でなければこれで十分かと感じます。
通知対応もシステムで判定するようにすれば管理者は気楽です。
システム導入するような規模でなければこれで十分かと感じます。
Pythonで地震情報を取得してSlackへ通知を行う
今回利用させて頂くのは「P2P地震情報」のJSON /API v2になります。
信頼性を説明する場面を考慮すると国が管理している元データを利用したかったのが本音。
気象庁が発行しているxmlデータやjsonデータで試していましたがわたしの力及ばず、今回は開発スピード重視で対応です。
Python初心者が作成しましたので生暖かい目で眺めて頂ければ幸いです。
こちらのコードは参考までに。
利用したときに発生したあらゆる事象は自己責任にてご対応くださいませ。
こちらのコードは参考までに。
利用したときに発生したあらゆる事象は自己責任にてご対応くださいませ。
import datetime
import requests
import json
import slackweb
# 地震確認:基準時間設定
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, 'JST')
now = datetime.datetime.now(JST)
hour_ago = now - datetime.timedelta(hours=1)
base_time = hour_ago.strftime('%Y/%m/%d %H:%M:%S') # 2021/11/04 17:37:28
# payload:検索条件
limit = 1
min_scale = 55 # 最大震度の下限。10(震度1)、20(震度2)、30(震度3)、40(震度4)、45(震度5弱)、50(震度5強)、55(震度6弱)、60(震度6強)、70(震度7)。
if min_scale < 45:
min_intensity = '震度1~4'
elif min_scale < 55:
min_intensity = '震度5'
elif min_scale < 70: min_intensity = '震度6' elif min_scale >= 70:
min_intensity = '震度7'
# 地震情報:概要抽出
url = "https://api.p2pquake.net/v2/jma/quake"
payload = {"limit":limit, "min_scale":min_scale}
r = requests.get(url, params=payload)
data = json.loads(r.text)
# 地震情報:詳細抽出
info_url = url + '/' + data[0]['id']
info_r = requests.get(info_url)
info_data = json.loads(info_r.text)
quake_time = info_data['earthquake']['time']
name = info_data['earthquake']['hypocenter']['name']
maxscale = info_data['earthquake']['maxScale']
# 震度判定
if maxscale < 45:
intensity = '震度4以下'
elif maxscale < 55:
intensity = '震度5'
elif maxscale < 70: intensity = '震度6' elif maxscale >= 70:
intensity = '震度7以上'
# (min_scale)以上に該当する都道府県を出力
region = []
for i in info_data['points']:
if i['scale'] >= min_scale:
region.append(i['pref'])
# 重複する都道府県を1つに処理
region = list(dict.fromkeys(region))
str_region = "\n".join(region)
# slack設定
slack = slackweb.Slack(url = "https://hooks.slack.com/services/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")
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<!channel> %s以上の地震を観測しました。状況についてお知らせください。" % (min_intensity)
}
},
{
"type": "header",
"text": {
"type": "plain_text",
"text": "【最大%s】の地震を観測" % (intensity)
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*発生時間*\n%s" % (quake_time)
},
{
"type": "mrkdwn",
"text": "*震源地域*\n%s(%s)" % (name, intensity)
},
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*%s以上の都道府県*\n%s" % (min_intensity, str_region)
}
},
{
"type": "header",
"text": {
"type": "plain_text",
"text": "従業員の皆様へお願い"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":one: 揺れがおさまるまで身の安全を確保し、落ち着いて行動しましょう!\n\n:two: ケガ等が無いか *「24時間以内 :clock3:」* にスタンプや返信で反応をお願いします:bow:"
}
},
{
"type": "divider"
},
{
"type": "divider"
}
]
# Slack通知:基準時間より最新の地震計測であれば通知
print("前回のチェック時間")
print(base_time)
print("直近発生した地震の時間")
print(quake_time)
if base_time < quake_time:
print('実行時間の1時間前よりも大きい時間=更新されている=通知')
slack.notify(text="地震発生!ケガ等が無いかお知らせください!", blocks=blocks)
elif base_time > quake_time:
print('実行時間の1時間前よりも小さい時間=更新されていない=処理終了')
動作確認
実行すると以下のような通知が届きます。
1時間毎に定期実行を行う
あとは作成したPythonが実行出来る環境に配置し、cronなどで定期実行を行います。
わたしはAWSのLambda環境へ配置し、EventBridgeのcron設定を利用しました。
「requests」や「slackweb」のモジュールを追加する必要があったのですが、地味に詰まりました。
- スケジュール式: cron(00 0/1 * * ? *)
- 毎時0分にPythonを実行する
もうちょっとやりたかったこと
今回出来なかったこととして備忘録です。
- 気象庁の元データを利用した処理にしたかった(データの信頼性、可用性の観点)
- 震度判定の分岐はもう少し見やすい書き方あったのでは?
- Slackへ通知する際に「ボタン」を追加したかった
- ボタンで「無事」「ケガ」「その他」を選択してもらえばその投稿に返信出来ないかな…。
ある程度人数が増えてきたらお金をかけて専用ツールを利用するなんて選択肢もありますね。
スタンプや返信だけで把握が難しくなってきたらアンケートURLを入れたり、状況によって考えていきたい所存。AWS Lambda×Pythonの勉強含めて作ってみました。
拙いコードですが誰かの参考になれば良いなと思う次第です。
スタンプや返信だけで把握が難しくなってきたらアンケートURLを入れたり、状況によって考えていきたい所存。AWS Lambda×Pythonの勉強含めて作ってみました。
拙いコードですが誰かの参考になれば良いなと思う次第です。
参考情報
Slack通知を作る時は Slack App を使ってみよう
【Lambda】import requests が使えるようにする【python】