Pytest操作间件
目录
1.背景
最近的脚本中需要使用Python操作中间件(zookeeper/ kafka/ elastichsearch),之前没有使用过,所以度娘上到处查资料,这里记录一下常用方法,方便以后使用,也希望其他人遇到时能方便查找
2.中间件
1)zookeeper
lib | version |
zookeeper | 3.7.0 |
kazoo | 2.8.0 |
-
# 连接
-
try:
-
zk = KazooClient(hosts=['127.0.0.1:2181'])
-
zk.start()
-
# 获取节点
-
# znode_path 节点。例如,"/"
-
result = zk.get_children(znode_path)
-
zk.stop()
-
except KazooTimeoutError as e:
-
print(e.args[0])
2)kafka
lib | version |
kafka | 2.3.0 |
zookeeper | 3.4.5 |
kafka-python | 2.0.2 |
- 连接
client = KafkaAdminClient(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'])
- 查询TOPIC列表
topics_list = client.list_topics()
- 查询TOPIC详情
topic_dict = client.describe_topics()
- 生产消息
-
# 连接
-
producer = KafkaProducer(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], retries=3, api_version=(0, 10, 2))
-
-
# 发送多条消息
-
for i in range(0, 5):
-
k = bytes("k" str(i), encoding='utf-8')
-
v = bytes("v" str(i), encoding='utf-8')
-
producer.send(topic_name, key=k, value=v)
-
-
# 刷新
-
producer.flush()
-
-
# 关闭连接
-
producer.close()
- 消费消息
-
# 连接
-
# group_id随机产生一串英文 数字的字符串即可
-
-
consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], consumer_timeout_ms=1000)
-
-
# 订阅消息
-
consumer.subscribe(topics=[topic_name])
-
-
tp = TopicPartition(topic_name, 0)
-
-
# 消费到最后一条消息则退出,否则代码中会一直等待
-
for message in consumer:
-
if message.offset == consumer.end_offsets([tp])[tp] - 1:
-
break
-
-
# 关闭连接
-
consumer.close()
3)elasticsearch
lib | version |
ElasticSearch | 7.4.2& 7.6.0 |
elasticsearch | 7.6.0 |
- 连接
conn = Elasticsearch(["http://127.0.0.1:9200"], http_auth=('elastic', '123456'))
- 测试连接
conn.ping()
- 索引的操作
-
# 判断索引是否存在
-
conn.indics.exists(index_name)
-
-
# 获取索引信息
-
conn.indics.get(index_name)
-
-
# 写入数据
-
for i in range(0, 5):
-
data = {
-
"key": "k" str(i),
-
"value": i
-
}
-
-
conn.index(index=index_name, body=data)
-
-
# 查询数据
-
1)
-
body = {
-
'query': {
-
'prefix': {
-
'key.keyword': 'k' # 匹配前缀
-
}
-
},
-
-
'size': 10
-
}
-
-
filter_path = ['hits.hits._source.key',
-
'hits.hits._source.value'] # 展示出的字段
-
result = conn.search(index=index_name, filter_path=filter_path, body=body)
-
-
2)
-
body = {
-
'query': {
-
'term': { # 匹配整个字串
-
'key.keyword': 'k1'
-
}
-
},
-
-
'size': 10
-
}
-
-
result = conn.search(index=index_name, filter_path=filter_path, body=body)
-
-
# result:找到(True)/ 未找到(False)
-
-
-
# 修改数据
-
body = {
-
'doc': {
-
'key': 'k2',
-
'value': 2
-
}
-
}
-
-
conn.update(index=index_name, id=1, body=body)
-
-
# 删除数据
-
all_data = conn.search(index=index_name)
-
hits = all_data['hits']['hits']
-
for item in hits:
-
conn.delete(index=index_name, id=item['_id'])
-
-
# 批量写入数据
-
data_list = []
-
for i in range(100):
-
body = {
-
'_op_type': 'create',
-
'_index': index_name,
-
'_type': 'doc',
-
'_id': str(i),
-
'_source': {'key': 'k' str(i), 'value': i}
-
}
-
-
data_list.append(body)
-
result = helpers.buld(conn, data_list)
-
-
# 一种批量修改数据
-
data_list = []
-
for i in range(100):
-
body = {
-
'_op_type': 'index',
-
'_index': index_name,
-
'_type': 'doc',
-
'_id': str(i),
-
'_source': {'key': 'k' str(i), 'value': int(i 1)}
-
}
-
-
data_list.append(body)
-
result = helpers.buld(conn, data_list)
-
-
-
# 另一种批量修改数据
-
data_list = []
-
for i in range(100):
-
body = {
-
'_op_type': 'update',
-
'_index': index_name,
-
'_type': 'doc',
-
'_id': str(i),
-
'_source': {'key': 'k' str(i), 'value': int(i 2)}
-
}
-
-
data_list.append(body)
-
result = helpers.buld(conn, data_list)
-
-
# 批量删除数据
-
data_list = []
-
for i in range(100):
-
body = {
-
'_op_type': 'delete',
-
'_index': index_name,
-
'_type': 'doc',
-
'_id': str(i),
-
'_source': {'key': 'k' str(i), 'value': int(i 2)}
-
}
-
-
data_list.append(body)
-
result = helpers.buld(conn, data_list)
-
3.参考资料
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfjbgke
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01