【Twitter 舆论分析】Twitter 爬虫绕过API限制
·
0x00 前言
使用API可以很简单的获取到想要的数据,但是由于国内API的申请比较困难,所以如何绕过API直接爬虫是一个迫切需要解决的问题。Github上的点击收藏量高的不限制爬虫都已经被twitter封过了
这里分享的版本是最开始写爬虫时练手的一个版本,功能实现的比较粗糙
0x01 具体分析
实现了根据用户ID,每天自动爬取用户推文,相当于监视,代码读起来相当简单,你可以根据自己的需求进行更改,下面就分享一下代码。
- 该代码为通过
screen name
爬取推文,本质是通过screen name获取到id后爬取,因id在日常中比较难寻找所以这个代码通过screen name
即可爬取,也可自行更改为使用id爬取
headers中参数查找方法:打开twitter用户界面 按F12,再按F5刷新
0x03 代码分享
# -*- codeing =utf-8 -*-
# @Time : 2020/10/21 16:43
# @Author:yuchuan
# @File : crawl_tweets_by_id.py
# @Software : PyCharm
import requests
import json
import urllib.parse
import csv
import pandas as pd
from itertools import chain
import numpy as np
from datetime import datetime, timedelta
import time
import calendar
#getuserID可通过user screenname获取ID,不用人工去获取
def main():
#包装header,伪装成浏览器。
#*******************为需要替换部分
headers = {
"cookie": '*******************',
"user-agent": "*******************",
"x-csrf-token": "*******************",
"authorization": "*******************",
}
#也可直接获取id 爬取
username = pd.read_csv('*******************.csv')
for i in range(0, len(username['username'])):
print(str(username['username'][i]))
csv_username=str(username['username'][i])
userID=getuserID(csv_username,headers)
twitterURL = "https://twitter.com/i/api/2/timeline/profile/" + userID + ".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&userId=" + userID + "&ext=mediaStats%2ChighlightedLabel"
flag=True
content = []
full_content=[]
response = connect(headers, twitterURL)
#flag 相当于确定获取到的内容是否在当日里。flag = false 停止爬取
while (flag):
# 建立连接
response = connect(headers, twitterURL)
# formatRes修改源码的类型
responseJson = formatRes(response.content)
# 爬取每个json中 所有推文以及时间,转推,点赞等
content = parsetweets(responseJson)
# 将每个json中的内容添加到一个列表中
full_content.extend(content)
#获取下一页推文的json包
twitterURL = getNewURL(responseJson,userID)
flag=CtrlFlag(content)
# n = n - 1
print("------------------------------------------------------------------------------------------------\n------------------------------------------------------------------------------------------------")
# 提取只要当天的推文
everydaytweet=todaytweet(full_content)
# 将内容保存到CSV中,每个用户一个CSV
saveData(everydaytweet, csv_username)
time.sleep(30)
# 获取当天的推文:本来想法:直接在CSV中进行排序,然后截取当天推文。时间排序没成功:::::直接在列表中截取当天的推文,再保存在CSV文件中可以。
def CtrlFlag(content):
flag=True
time = (todaytime() + timedelta(hours=-8)).strftime("%Y-%m-%d")
count=0
for i in range(0,len(content)):
if content[i][0][0:10] not in str(time):
count=count+1
if count==len(content):
flag=False
return flag
def getuserID(username,headers):
connectURL = "https://twitter.com/i/api/graphql/jMaTS-_Ea8vh9rpKggJbCQ/UserByScreenName?variables=%7B%22screen_name%22%3A%22" + username + "%22%2C%22withHighlightedLabel%22%3Atrue%7D"
print(connectURL)
response=connect(headers,connectURL)
responseJson= formatRes(response.content)
# print(responseJson)
data=responseJson['data']['user']
# print(data)
userID=find('rest_id',data)
return userID
def todaytweet(full_content):
content=[]
#todaytime是香港时间,-8获得UTC时间,与爬取的created_at时间统一
time=(todaytime()+ timedelta(hours=-8)).strftime("%Y-%m-%d")
for i in range(0,len(full_content)):
if full_content[i][0][0:10] in str(time):
content.append(full_content[i])
return content
# 爬取推文,和时间等,对时间进行格式化****/**/**
def parsetweets(dict):
dict = dict['globalObjects']['tweets']
full_text=findAll('full_text',dict)
created_at=findAll('created_at',dict)
favorite_count=findAll('favorite_count',dict)
quote_count=findAll('quote_count',dict)
reply_count=findAll('reply_count',dict)
retweet_count=findAll('retweet_count',dict)
formatcreated_at=[]
time1=[]
time2=[]
utc_time1=[]
for i in range(0,len(created_at)):
#从twitter爬下来的时候,就是UTC时间,统一为UTC时间,将本地时间(香港)-8小时。美国时间+5小时
time1.append(datetime.strptime(created_at[i],"%a %b %d %H:%M:%S +0000 %Y"))
time2.append(datetime.strftime(time1[i],'%Y-%m-%d %H:%M:%S')) #datatime转str
tweetData = []
#tweetData = list(chain.from_iterable(zip( created_at,full_text))) # 合并两个列表
# print(tweetData)
for i in range(0,len(full_text)):
tweetData.append([time2[i],full_text[i],favorite_count[i],quote_count[i],reply_count[i],retweet_count[i]])
return tweetData
# 当前日期 20201029格式,此时时间type:datetime,调用可能需要转换成str
def todaytime():
today=datetime.today()
return today
#保存到CSV中,每个人保存在一个CSV文件中、
def saveData(content,filename):
filetime = todaytime().strftime('%y%y%m%d')
filename=filetime+" "+filename
filepath = 'D:/twitterdata/'+filename+'.csv'
name=['Time', 'Tweet','Favorite','Quote','Reply','Retweet']
Data=pd.DataFrame(columns=name,data=content)
Data.to_csv(filepath,encoding='utf-8-sig')
# 直接查找键值 find
def find(target, dictData, notFound='没找到'):
queue = [dictData]
while len(queue) > 0:
data = queue.pop()
for key, value in data.items():
if key == target: return value
elif type(value) == dict: queue.append(value)
return notFound
def findAll(target, dictData, notFound=[]):
#print(dictData)
result = []
for key, values in dictData.items():
content = values[target]
result.append(content)
#print(result)
return result
# 获取到cursor,并组成新的url
def getNewURL(responseJson,userID):
responseJsonCursor1 = responseJson['timeline']['instructions'][0]['addEntries']['entries'][-1]#这是字典,是列表中的最后一个元素
cursorASCII=find('cursor',responseJsonCursor1)
cursorASCII2 = find('value', cursorASCII)
cursor=urllib.parse.quote(cursorASCII2)
newURL="https://twitter.com/i/api/2/timeline/profile/"+userID+".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&cursor="+cursor+"&userId="+userID+"&ext=mediaStats%2ChighlightedLabel"
return newURL
#格式化获取到的json//bytes转string loads()将string读入字典中
def formatRes(res):
strRes = str(res, 'utf-8')
dictRes = json.loads(strRes)
return dictRes
#设置代理proxies,链接获取网页数据。代理部分需要自己设置
def connect(headers,twitterURL):
proxies = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890", }
response = requests.get(twitterURL,headers = headers, proxies=proxies)
return response
if __name__=="__main__": #当程序执行时
main()
0x04 一些闲话
本人创建了一个公众号,分享科研路上的小问题,新发现,欢迎关注公众号,给我留言!!!
一起奋发向上,攻克难题吧~~
更多推荐
已为社区贡献2条内容
所有评论(0)