0x00 前言

使用API可以很简单的获取到想要的数据,但是由于国内API的申请比较困难,所以如何绕过API直接爬虫是一个迫切需要解决的问题。Github上的点击收藏量高的不限制爬虫都已经被twitter封过了

这里分享的版本是最开始写爬虫时练手的一个版本,功能实现的比较粗糙

0x01 具体分析

实现了根据用户ID,每天自动爬取用户推文,相当于监视,代码读起来相当简单,你可以根据自己的需求进行更改,下面就分享一下代码。

  • 该代码为通过screen name爬取推文,本质是通过screen name获取到id后爬取,因id在日常中比较难寻找所以这个代码通过screen name即可爬取,也可自行更改为使用id爬取

headers中参数查找方法:打开twitter用户界面 按F12,再按F5刷新
在这里插入图片描述
在这里插入图片描述

0x03 代码分享

# -*- codeing =utf-8 -*-
# @Time : 2020/10/21 16:43
# @Author:yuchuan
# @File : crawl_tweets_by_id.py
# @Software : PyCharm
import requests
import json
import urllib.parse
import csv
import pandas as pd
from itertools import  chain
import numpy as np
from datetime import datetime, timedelta
import time
import calendar
#getuserID可通过user screenname获取ID,不用人工去获取
def main():
        #包装header,伪装成浏览器。
				#*******************为需要替换部分
        headers = {
            "cookie": '*******************',
            "user-agent": "*******************",
            "x-csrf-token": "*******************",
            "authorization": "*******************",
        }
        #也可直接获取id 爬取
        username = pd.read_csv('*******************.csv')
        for i in range(0, len(username['username'])):
            print(str(username['username'][i]))
            csv_username=str(username['username'][i])
            userID=getuserID(csv_username,headers)
            twitterURL = "https://twitter.com/i/api/2/timeline/profile/" + userID + ".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&userId=" + userID + "&ext=mediaStats%2ChighlightedLabel"
            flag=True
            content = []
            full_content=[]
            response = connect(headers, twitterURL)
            #flag 相当于确定获取到的内容是否在当日里。flag = false 停止爬取
            while (flag):
                # 建立连接
                response = connect(headers, twitterURL)
                # formatRes修改源码的类型
                responseJson = formatRes(response.content)
                # 爬取每个json中 所有推文以及时间,转推,点赞等
                content = parsetweets(responseJson)
                # 将每个json中的内容添加到一个列表中
                full_content.extend(content)
                #获取下一页推文的json包
                twitterURL = getNewURL(responseJson,userID)
                flag=CtrlFlag(content)
                # n = n - 1
            print("------------------------------------------------------------------------------------------------\n------------------------------------------------------------------------------------------------")
            # 提取只要当天的推文
            everydaytweet=todaytweet(full_content)
            # 将内容保存到CSV中,每个用户一个CSV
            saveData(everydaytweet, csv_username)
            time.sleep(30)

#     获取当天的推文:本来想法:直接在CSV中进行排序,然后截取当天推文。时间排序没成功:::::直接在列表中截取当天的推文,再保存在CSV文件中可以。
def CtrlFlag(content):
    flag=True
    time = (todaytime() + timedelta(hours=-8)).strftime("%Y-%m-%d")
    count=0
    for i in range(0,len(content)):
        if content[i][0][0:10] not in str(time):
            count=count+1
        if count==len(content):
            flag=False
    return flag
def getuserID(username,headers):
    connectURL = "https://twitter.com/i/api/graphql/jMaTS-_Ea8vh9rpKggJbCQ/UserByScreenName?variables=%7B%22screen_name%22%3A%22" + username + "%22%2C%22withHighlightedLabel%22%3Atrue%7D"
    print(connectURL)
    response=connect(headers,connectURL)
    responseJson= formatRes(response.content)
    # print(responseJson)
    data=responseJson['data']['user']
    # print(data)
    userID=find('rest_id',data)
    return userID

def todaytweet(full_content):
    content=[]
    #todaytime是香港时间,-8获得UTC时间,与爬取的created_at时间统一
    time=(todaytime()+ timedelta(hours=-8)).strftime("%Y-%m-%d")
    for i in range(0,len(full_content)):

        if full_content[i][0][0:10] in str(time):
            content.append(full_content[i])
    return content

# 爬取推文,和时间等,对时间进行格式化****/**/**
def parsetweets(dict):
    dict = dict['globalObjects']['tweets']
    full_text=findAll('full_text',dict)
    created_at=findAll('created_at',dict)
    favorite_count=findAll('favorite_count',dict)
    quote_count=findAll('quote_count',dict)
    reply_count=findAll('reply_count',dict)
    retweet_count=findAll('retweet_count',dict)
    formatcreated_at=[]
    time1=[]
    time2=[]
    utc_time1=[]
    for i in range(0,len(created_at)):
        #从twitter爬下来的时候,就是UTC时间,统一为UTC时间,将本地时间(香港)-8小时。美国时间+5小时
        time1.append(datetime.strptime(created_at[i],"%a %b %d %H:%M:%S +0000 %Y"))
        time2.append(datetime.strftime(time1[i],'%Y-%m-%d %H:%M:%S'))   #datatime转str
    tweetData = []
    #tweetData = list(chain.from_iterable(zip( created_at,full_text)))  # 合并两个列表
    # print(tweetData)
    for i in range(0,len(full_text)):
        tweetData.append([time2[i],full_text[i],favorite_count[i],quote_count[i],reply_count[i],retweet_count[i]])
    return tweetData

# 当前日期 20201029格式,此时时间type:datetime,调用可能需要转换成str
def todaytime():
    today=datetime.today()
    return today

#保存到CSV中,每个人保存在一个CSV文件中、
def saveData(content,filename):
    filetime = todaytime().strftime('%y%y%m%d')
    filename=filetime+" "+filename
    filepath = 'D:/twitterdata/'+filename+'.csv'
    name=['Time', 'Tweet','Favorite','Quote','Reply','Retweet']
    Data=pd.DataFrame(columns=name,data=content)
    Data.to_csv(filepath,encoding='utf-8-sig')

# 直接查找键值 find
def find(target, dictData, notFound='没找到'):
    queue = [dictData]
    while len(queue) > 0:
        data = queue.pop()
        for key, value in data.items():
            if key == target: return value
            elif type(value) == dict: queue.append(value)
    return notFound
def findAll(target, dictData, notFound=[]):
    #print(dictData)
    result = []
    for key, values in dictData.items():
        content = values[target]
        result.append(content)
    #print(result)
    return result

# 获取到cursor,并组成新的url
def getNewURL(responseJson,userID):
    responseJsonCursor1 = responseJson['timeline']['instructions'][0]['addEntries']['entries'][-1]#这是字典,是列表中的最后一个元素
    cursorASCII=find('cursor',responseJsonCursor1)
    cursorASCII2 = find('value', cursorASCII)
    cursor=urllib.parse.quote(cursorASCII2)
    newURL="https://twitter.com/i/api/2/timeline/profile/"+userID+".json?include_profile_interstitial_type=1&include_blocking=1&include_blocked_by=1&include_followed_by=1&include_want_retweets=1&include_mute_edge=1&include_can_dm=1&include_can_media_tag=1&skip_status=1&cards_platform=Web-12&include_cards=1&include_ext_alt_text=true&include_quote_count=true&include_reply_count=1&tweet_mode=extended&include_entities=true&include_user_entities=true&include_ext_media_color=true&include_ext_media_availability=true&send_error_codes=true&simple_quoted_tweet=true&include_tweet_replies=false&count=20&cursor="+cursor+"&userId="+userID+"&ext=mediaStats%2ChighlightedLabel"
    return newURL

#格式化获取到的json//bytes转string loads()将string读入字典中
def formatRes(res):
       strRes = str(res, 'utf-8')
       dictRes = json.loads(strRes)
       return dictRes

#设置代理proxies,链接获取网页数据。代理部分需要自己设置
def connect(headers,twitterURL):
       proxies = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890", }
       response = requests.get(twitterURL,headers = headers, proxies=proxies)
       return response

if __name__=="__main__":   #当程序执行时
    main()

0x04 一些闲话

本人创建了一个公众号,分享科研路上的小问题,新发现,欢迎关注公众号,给我留言!!!
一起奋发向上,攻克难题吧~~

在这里插入图片描述

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐