专栏名称: 大数据挖掘DT数据分析
实战数据资源提供。数据实力派社区,手把手带你玩各种数据分析,涵盖数据分析工具使用,数据挖掘算法原理与案例,机器学习,R语言,Python编程,爬虫。如需发布广告请联系: hai299014
目录
相关文章推荐
51好读  ›  专栏  ›  大数据挖掘DT数据分析

用python抓取摩拜单车API数据并做可视化分析(源码)

大数据挖掘DT数据分析  · 公众号  · 大数据  · 2017-04-19 08:04

正文

请到「今天看啥」查看全文



这是之前发的一个朋友圈的视频截图,可以看到在营门口附近有一个尖,在那里其实车是停住的,但是GPS轨迹显示短时间内在附近攒动,甚至攒动到很远,又回到那个位置。



这样的数据对于数据分析来讲根本没法用,我差点就放弃了。


随着微信小程序的火爆,摩拜单车也在第一时间出了小程序。我一看就笑了,不错,又给我来了一个数据源,试试。用Packet Capture抓了一次数据后很容易确定API。抓取后爬取了两三天的数据,发现出现了转机,数据符合正常的单车的轨迹。


剩下事情,就是提高爬虫的效率了。



其他尝试

有时候直接分析APP的源代码会很方便的找到API入口,将摩拜的Android端的APP进行反编译,但发现里面除了一些资源文件有用外,其他的文件都是用奇虎360的混淆器加壳的。网上有文章分析如何进行脱壳,但我没有太多时间去钻研,也就算了。



摩拜单车的API之所以很容易抓取和分析,很大程度上来讲是由于API设计的太简陋:

  • 仅使用http请求,使得很容易进行抓包分析

  • 在这些API中都没有对request进行一些加密,使得自己的服务很容易被人利用。

  • 另外微信小程序也是泄露API的一个重要来源,毕竟在APP中request请求可以通过native代码进行加密然后在发出,但在小程序中似乎还没有这样的功能。

如果大家有兴趣,可以试着看一下小蓝单车APP的request,他们使用https请求,对数据的request进行了加密,要抓取到他们的数据难度会增加非常多。

当然了,如果摩拜单车官方并不care数据的事情的话,这样的API设计也是ok的。



声明:
此爬虫仅用于学习、研究用途,请不要用于非法用途。任何由此引发的法律纠纷自行负责。

没耐心看文章的请直接:


关注公众号datadw 后回复“摩拜”获取完整源码


目录结构

\analysis - jupyter做数据分析
\influx-importer - 导入到influxdb,但之前没怎么弄好
\modules - 代理模块
\web - 实时图形化显示模块,当时只是为了学一下react而已,效果请见这里
crawler.py - 爬虫核心代码
importToDb.py - 导入到postgres数据库中进行分析
sql.sql - 创建表的sql
start.sh - 持续运行的脚本

思路
核心代码放在crawler.py中,数据首先存储在sqlite3数据库中,然后去重复后导出到csv文件中以节约空间。

摩拜单车的API返回的是一个正方形区域中的单车,我只要按照一块一块的区域移动就能抓取到整个大区域的数据。

left,top,right,bottom定义了抓取的范围,目前是成都市绕城高速之内以及南至南湖的正方形区域。offset定义了抓取的间隔,现在以0.002为基准,在DigitalOcean 5$的服务器上能够15分钟内抓取一次。

def start(self):
left = 30.7828453209
top = 103.9213455517
right = 30.4781772402
bottom = 104.2178123382

offset = 0.002

if os.path.isfile(self.db_name):
os.remove(self.db_name)

try:
with sqlite3.connect(self.db_name) as c:
c.execute('''CREATE TABLE mobike
(Time DATETIME, bikeIds VARCHAR(12), bikeType TINYINT,distId INTEGER,distNum TINYINT, type TINYINT, x DOUBLE, y DOUBLE)''')
except Exception as ex:
pass

然后就启动了250个线程,至于你要问我为什么没有用协程,哼哼~~我当时没学~~~其实是可以的,说不定效率更高。

由于抓取后需要对数据进行去重,以便消除小正方形区域之间重复的部分,最后的group_data正是做这个事情。

executor = ThreadPoolExecutor(max_workers=250)
print("Start")
self.total = 0
lat_range = np.arange(left, right, -offset)
for lat in lat_range:
lon_range = np.arange(top, bottom, offset)
for lon in lon_range:
self.total += 1
executor.submit(self.get_nearby_bikes, (lat, lon))

executor.shutdown()
self.group_data()

最核心的API代码在这里。小程序的API接口,搞几个变量就可以了,十分简单。

def get_nearby_bikes(self, args):
try:
url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"

payload = "latitude=%s&longitude=%s&errMsg=getMapCenterLocation" % (args[0], args[1])

headers = {
'charset': "utf-8",
'platform': "4",
"referer":"https://servicewechat.com/wx40f112341ae33edb/1/",
'content-type': "application/x-www-form-urlencoded",
'user-agent': "MicroMessenger/6.5.4.1000 NetType/WIFI Language/zh_CN",
'host': "mwx.mobike.com",
'connection': "Keep-Alive",
'accept-encoding': "gzip",
'cache-control': "no-cache"
}

self.request(headers, payload, args, url)
except Exception as ex:
print(ex)

最后你可能要问频繁的抓取IP没有被封么?其实摩拜单车是有IP的访问速度限制的,只不过破解之道非常简单,就是用大量的代理。

我是有一个代理池,每天基本上有8000以上的代理。在ProxyProvider中直接获取到这个代理池然后提供一个pick函数用于随机选取得分前50的代理。请注意,我的代理池是每小时更新的,但是代码中提供的jsonblob的代理列表仅仅是一个样例,过段时间后应该大部分都作废了。

在这里用到一个代理得分的机制。我并不是直接随机选择代理,而是将代理按照得分高低进行排序。每一次成功的请求将加分,而出错的请求将减分。这样一会儿就能选出速度、质量最佳的代理。如果有需要还可以存下来下次继续用。

class ProxyProvider:
def __init__(self, min_proxies=200):
self._bad_proxies = {}
self._minProxies = min_proxies
self.lock = threading.RLock()

self.get_list()

def get_list(self):
logger.debug("Getting proxy list")
r = requests.get("https://jsonblob.com/31bf2dc8-00e6-11e7-a0ba-e39b7fdbe78b", timeout=10)
proxies = ujson.decode(r.text)
logger.debug("Got %s proxies", len(proxies))
self._proxies = list(map(lambda p: Proxy(p), proxies))

def pick(self):
with self.lock:
self._proxies.sort(key = lambda p: p.score, reverse=True)
proxy_len = len(self._proxies)
max_range = 50 if proxy_len > 50 else proxy_len
proxy = self._proxies[random.randrange(1, max_range)]
proxy.used()
return proxy

在实际使用中,通过proxyProvider.pick()选择代理,然后使用。如果代理出现任何问题,则直接用proxy.fatal_error()降低评分,这样后续就不会选择到这个代理了。

def request(self, headers, payload, args, url):
while True:
proxy = self.proxyProvider.pick()
try:
response = requests.request(
"POST", url, data=payload, headers=headers,
proxies={"https": proxy.url},
timeout=5,verify=False
)

with self.lock:
with sqlite3.connect(self.db_name) as c:
try:
print(response.text)
decoded = ujson.decode(response.text)['object']
self.done += 1
for x in decoded:
c.execute("INSERT INTO mobike VALUES (%d,'%s',%d,%d,%s,%s,%f,%f)" % (
int(time.time()) * 1000, x['bikeIds'], int(x['biketype']), int(x['distId']),
x['distNum'], x['type'], x['distX'],
x['distY']))

timespend = datetime.datetime.now() - self.start_time
percent = self.done / self.total
total = timespend / percent
print(args, self.done, percent * 100, self.done / timespend.total_seconds() * 60, total,
total - timespend)
except Exception as ex:
print(ex)
break
except Exception as ex:
proxy.fatal_error()







请到「今天看啥」查看全文