• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Pytest操作间件

武飞扬头像
aduocd
帮助1

目录

1.背景

2.中间件

1)zookeeper

2)kafka

3)elasticsearch

3.参考资料


1.背景

最近的脚本中需要使用Python操作中间件(zookeeper/ kafka/ elastichsearch),之前没有使用过,所以度娘上到处查资料,这里记录一下常用方法,方便以后使用,也希望其他人遇到时能方便查找

2.中间件

1)zookeeper

lib version
zookeeper 3.7.0
kazoo 2.8.0
  1.  
    # 连接
  2.  
    try:
  3.  
    zk = KazooClient(hosts=['127.0.0.1:2181'])
  4.  
    zk.start()
  5.  
    # 获取节点
  6.  
    # znode_path 节点。例如,"/"
  7.  
    result = zk.get_children(znode_path)
  8.  
    zk.stop()
  9.  
    except KazooTimeoutError as e:
  10.  
    print(e.args[0])

2)kafka

lib version
kafka 2.3.0
zookeeper 3.4.5
kafka-python 2.0.2
  •  连接
client = KafkaAdminClient(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'])
  • 查询TOPIC列表
topics_list = client.list_topics()
  • 查询TOPIC详情
topic_dict = client.describe_topics()
  • 生产消息
  1.  
    # 连接
  2.  
    producer = KafkaProducer(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], retries=3, api_version=(0, 10, 2))
  3.  
     
  4.  
    # 发送多条消息
  5.  
    for i in range(0, 5):
  6.  
    k = bytes("k" str(i), encoding='utf-8')
  7.  
    v = bytes("v" str(i), encoding='utf-8')
  8.  
    producer.send(topic_name, key=k, value=v)
  9.  
     
  10.  
    # 刷新
  11.  
    producer.flush()
  12.  
     
  13.  
    # 关闭连接
  14.  
    producer.close()
  • 消费消息
  1.  
    # 连接
  2.  
    # group_id随机产生一串英文 数字的字符串即可
  3.  
     
  4.  
    consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], consumer_timeout_ms=1000)
  5.  
     
  6.  
    # 订阅消息
  7.  
    consumer.subscribe(topics=[topic_name])
  8.  
     
  9.  
    tp = TopicPartition(topic_name, 0)
  10.  
     
  11.  
    # 消费到最后一条消息则退出,否则代码中会一直等待
  12.  
    for message in consumer:
  13.  
    if message.offset == consumer.end_offsets([tp])[tp] - 1:
  14.  
    break
  15.  
     
  16.  
    # 关闭连接
  17.  
    consumer.close()
学新通

3)elasticsearch

lib version
ElasticSearch 7.4.2& 7.6.0
elasticsearch 7.6.0
  • 连接
conn = Elasticsearch(["http://127.0.0.1:9200"], http_auth=('elastic', '123456'))
  • 测试连接
conn.ping()
  • 索引的操作
  1.  
    # 判断索引是否存在
  2.  
    conn.indics.exists(index_name)
  3.  
     
  4.  
    # 获取索引信息
  5.  
    conn.indics.get(index_name)
  6.  
     
  7.  
    # 写入数据
  8.  
    for i in range(0, 5):
  9.  
    data = {
  10.  
    "key": "k" str(i),
  11.  
    "value": i
  12.  
    }
  13.  
     
  14.  
    conn.index(index=index_name, body=data)
  15.  
     
  16.  
    # 查询数据
  17.  
    1
  18.  
    body = {
  19.  
    'query': {
  20.  
    'prefix': {
  21.  
    'key.keyword': 'k' # 匹配前缀
  22.  
    }
  23.  
    },
  24.  
     
  25.  
    'size': 10
  26.  
    }
  27.  
     
  28.  
    filter_path = ['hits.hits._source.key',
  29.  
    'hits.hits._source.value'] # 展示出的字段
  30.  
    result = conn.search(index=index_name, filter_path=filter_path, body=body)
  31.  
     
  32.  
    2
  33.  
    body = {
  34.  
    'query': {
  35.  
    'term': { # 匹配整个字串
  36.  
    'key.keyword': 'k1'
  37.  
    }
  38.  
    },
  39.  
     
  40.  
    'size': 10
  41.  
    }
  42.  
     
  43.  
    result = conn.search(index=index_name, filter_path=filter_path, body=body)
  44.  
     
  45.  
    # result:找到(True)/ 未找到(False)
  46.  
     
  47.  
     
  48.  
    # 修改数据
  49.  
    body = {
  50.  
    'doc': {
  51.  
    'key': 'k2',
  52.  
    'value': 2
  53.  
    }
  54.  
    }
  55.  
     
  56.  
    conn.update(index=index_name, id=1, body=body)
  57.  
     
  58.  
    # 删除数据
  59.  
    all_data = conn.search(index=index_name)
  60.  
    hits = all_data['hits']['hits']
  61.  
    for item in hits:
  62.  
    conn.delete(index=index_name, id=item['_id'])
  63.  
     
  64.  
    # 批量写入数据
  65.  
    data_list = []
  66.  
    for i in range(100):
  67.  
    body = {
  68.  
    '_op_type': 'create',
  69.  
    '_index': index_name,
  70.  
    '_type': 'doc',
  71.  
    '_id': str(i),
  72.  
    '_source': {'key': 'k' str(i), 'value': i}
  73.  
    }
  74.  
     
  75.  
    data_list.append(body)
  76.  
    result = helpers.buld(conn, data_list)
  77.  
     
  78.  
    # 一种批量修改数据
  79.  
    data_list = []
  80.  
    for i in range(100):
  81.  
    body = {
  82.  
    '_op_type': 'index',
  83.  
    '_index': index_name,
  84.  
    '_type': 'doc',
  85.  
    '_id': str(i),
  86.  
    '_source': {'key': 'k' str(i), 'value': int(i 1)}
  87.  
    }
  88.  
     
  89.  
    data_list.append(body)
  90.  
    result = helpers.buld(conn, data_list)
  91.  
     
  92.  
     
  93.  
    # 另一种批量修改数据
  94.  
    data_list = []
  95.  
    for i in range(100):
  96.  
    body = {
  97.  
    '_op_type': 'update',
  98.  
    '_index': index_name,
  99.  
    '_type': 'doc',
  100.  
    '_id': str(i),
  101.  
    '_source': {'key': 'k' str(i), 'value': int(i 2)}
  102.  
    }
  103.  
     
  104.  
    data_list.append(body)
  105.  
    result = helpers.buld(conn, data_list)
  106.  
     
  107.  
    # 批量删除数据
  108.  
    data_list = []
  109.  
    for i in range(100):
  110.  
    body = {
  111.  
    '_op_type': 'delete',
  112.  
    '_index': index_name,
  113.  
    '_type': 'doc',
  114.  
    '_id': str(i),
  115.  
    '_source': {'key': 'k' str(i), 'value': int(i 2)}
  116.  
    }
  117.  
     
  118.  
    data_list.append(body)
  119.  
    result = helpers.buld(conn, data_list)
  120.  
     
学新通

3.参考资料

python操作kafka - 是阿凯啊 - 博客园 

Python3操作Kafka - 慕夏一缕风 - 博客园 

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjbgke
系列文章
更多 icon
同类精品
更多 icon
继续加载