RaspberryPi

RaspberryPi からTwitterのAPIを使って遅延情報を取得しよう

概要

前回はRaspberryPiからTwitterのAPIを使って、ツイートを検索しました。今回はTwitterAPIを使って電車の遅延情報を取得します。

使用するAPI

今回、遅延情報を取得するAPIは「Get Tweet timelines」を使います。このAPIはuser_idやscreen_nameをパラメータとして指定すると、その対象のツイートを取得できます。

自分はJR東日本の東海道方面沿線住みなので以下のTwitterのアカウントから運行情報のツイートを取得します。 JR東日本の公式アカウントです。

  • 使用するAPI: Get Tweet timelines
    https://api.twitter.com/1.1/statuses/user_timeline.json
  • 対象のTwitterアカウント: JR東日本【東海道方面】運行情報 (公式)
  • 指定するパラメータ: screen_name=@JRE_F_Tokaido

プログラムを書いていく

前回のTwitterのAPIを使ってツイートを検索するコードを流用していきます。後、前回作成したtwitter用のディレクトリ配下にresultディレクトリを追加します。

$ mkdir /home/pu/twitter/result

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import configparser
import json
from requests_oauthlib import OAuth1Session
from datetime import datetime

class TwitterApi:
  def __init__(self):
    config = configparser.ConfigParser()
    config.read("../config/twitter-api-key.conf","UTF-8")
    CK = config.get("API","CONSUMER_KEY")
    CS = config.get("API","CONSUMER_SECRET")
    AT = config.get("API","ACCESS_TOKEN")
    ATS = config.get("API","ACCESS_TOKEN_SECRET")
    self.twitter = OAuth1Session(CK, CS, AT, ATS)

  def getUserTimeLine(self,screenName,count):
    url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
    params = {
      "count" : count,
      "screen_name" : screenName
    }
    return self.twitter.get(url,params = params)

  def getText(self,response):
    if response.status_code == 200:
      response = json.loads(response.text)
      for line in response:
        if self.findText(line['text']):
          print(line['text'])
          result = self.replaceText(line['text'])
          self.writeFile(result)
          print('*******************************************')
    else:
      print("Failed: %d" % response.status_code)

  def findText(self,text):
    keyword = ["横須賀","遅延"]
    count = 0
    for i in keyword:
      if text.find(i) != -1:
        count += 1
    if count == len(keyword):
      return True
    else:
      return False

  def writeFile(self,text):
    file_path = "../result/result.txt"
    with open(file_path, mode='w') as f:
      f.write(text)

  def replaceText(self,text):
    rep = text.replace("\n","")
    print(rep.find("。"))
    print(len(rep))
    return text.replace("\n","")

if __name__ == "__main__":
  twitterapi = TwitterApi()
  response = twitterapi.getUserTimeLine("@JRE_F_Tokaido",10)
  twitterapi.getText(response)

関数などを簡単に説明していきます。

[ def init(self): ]
設定ファイルから認証情報を取得してオブジェクトを作成します。

[ def getUserTimeLine(self,screenName,count): ]
実際にtwitterのAPIを使って、引数に指定したscreenNameのアカウントのツイートをcount数取得します。戻り値として twitterのAPIの結果(Httpのレスポンス)を返します。

[ def getText(self,response): ]
引数に指定したResponseからツイートの本文を抽出します。戻り値として抽出したツイートの本文を返却します。

[ def findText(self,text): ]
引数に指定したtextから特定のキーワードから含まれているか判断して戻り値として真偽値を返却します。自分は横須賀線沿線住みなので、[“横須賀”,”遅延”]をキーワードにしました。

[ def writeFile(self,text): ]
引数に指定したtextをファイルに書き込みます。

[ def replaceText(self,text): ]
引数に指定したtextから改行(\n)を消します。

実際に実行してみる

$ python3 testapi2.py

5月30日16時59分 配信湘南新宿ライン:『遅延』湘南新宿ラインは、宇都宮線内での異音の確認の影響で、宇都宮線から横須賀線の一部列車に遅れがでています。 #湘南新宿ライン #Shonan_Shinjuku_Line https://t.co/LojidiuzAVpi@rasp-takaaki:~/twitter/result

終わりに

遅延情報の取得に成功しました。後はcronなどで定期的に実行します。
次回はcronで取得したデータをRaspberryPiから音声で出力しようと思います。