把抓到数据存起来——爬虫绝配mongodb

mongodb

分布式
松散数据结构(json)
查询语言强大

文档

你可以看做是一个dict,dict里面还可以嵌套dict,例如:

{“name”: “alan”, score_list: {“chinese”: 90, “english”: 80}}
集合
一组文档,就是一堆dict。

数据库

多个集合组成数据库

安装

官方安装方法

该如何把抓取到的数据存入mongodb

  • 把抓到的数据写成你想要的dict形式
  • insert到指定的书架上
  • 没了。。。

    增删查改例子 python2版本

    需要安装pymongo
1
pip install pymongo

mongo_api.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
import pymongo
import sys
import unittest
reload(sys)
sys.setdefaultencoding('utf-8')
class MongoAPI(object):
def __init__(self, db_ip, db_port, db_name, table_name):
self.db_ip = db_ip
self.db_port = db_port
self.db_name = db_name
self.table_name = table_name
self.conn = pymongo.MongoClient(host=self.db_ip, port=self.db_port)
self.db = self.conn[self.db_name]
self.table = self.db[self.table_name]
def get_one(self, query):
return self.table.find_one(query, projection={"_id": False})
def get_all(self, query):
return self.table.find(query)
def add(self, kv_dict):
return self.table.insert(kv_dict)
def delete(self, query):
return self.table.delete_many(query)
def check_exist(self, query):
ret = self.get(query)
return len(ret) > 0
# 如果没有 会新建
def update(self, query, kv_dict):
ret = self.table.update_many(
query,
{
"$set": kv_dict,
}
)
if not ret.matched_count or ret.matched_count == 0:
self.add(kv_dict)
elif ret.matched_count and ret.matched_count > 1:
self.delete(query)
self.add(kv_dict)
class DBAPITest(unittest.TestCase):
def setUp(self):
self.db_api = MongoAPI("127.0.0.1", # 图书馆大楼地址
27017, # 图书馆门牌号
"test", # 一号图书室
"test_table") # 第一排书架
def test(self):
db_api = self.db_api
db_api.add({"url": "test_url", "k": "v"})
self.assertEqual(db_api.get_one({"url": "test_url"})["k"], "v")
db_api.update({"url": "test_url"}, {"url_update": "url_update"})
ob = db_api.get_one({"url": "test_url"})
self.assertEqual(ob["url_update"], "url_update")
db_api.delete({"url": "test_url"})
self.assertEqual(db_api.get_one({"url": "test_url"}), None)
if __name__ == '__main__':
unittest.main()