python借助zookeeper实现分布式服务(二)1-22

叩:67019637重新思考了分布式服务的分工与合作,我梳理了分布式系统的三个角色,重写了上一篇的代码.

众所周知,分布式系统中一般有三个角色,master,worker和client

1.master
主服务器监视新的worker和task,将任务分配给可用的工作人员。若worker丢失,将曾经分配给丢失的worker的task重新分配给新的worker,当然自己也要高可用
2.worker
worker在系统中进行注册,以确保主服务器可以分配任务给自己,然后监视新任务,有任务分配给自己就开始执行。
3.client
客户端创建新任务并提交给系统,然后监控对应task的状态

详细的逻辑来讲是这样的:

python借助zookeeper实现分布式服务(二)1-22
#1.多个worker竞选master,竞选成功的成为master,创建/workers,/tasks,/assigin,竞选失败的worker监控/master随时准备竞选master
create -e /rpc/master "host:port"
create /rpc/workers ""
create /rpc/tasks ""
create /rpc/assign ""
ls -w /rpc/master
stat -w /rpc/master 
#2.worker在zk注册自己,zk会将worker存在/workers一个子节点,创建属于自己的分配空间,并监控master可能分配给自己的任务
create -e /rpc/workers/worker1.example.com "worker1.example.com:2224"
create /rpc/assign/worker1.example.com ""
ls -w /rpc/assign/worker1.example.com 
#3.client在tasks下创建有序task-子节点,并监控task-的状态
create -s /rpc/tasks/task- "cmd"
ls -w /rpc/tasks/task-0000000000 
#4.master查看新task,获取可用worker节点,将分配任务给master之外的worker
ls -w /rpc/workers
ls -w /rpc/tasks
create /rpc/assign/worker1.example.com/task-0000000000 ""
#5.worker监控分配给自己的task,发现有分配给自己的task时执行task,执行完修改task状态
ls /rpc/assigin/worker1.example.com
create /rpc/tasks/task-0000000000/status "done"
#6.client收到通知,获取到task的状态
get /rpc/tasks/task-0000000000/status
get /rpc/tasks/task-0000000000
python借助zookeeper实现分布式服务(二)1-22

 


下面看python代码

zk_master.py

python借助zookeeper实现分布式服务(二)1-22
# -*- coding:utf-8 -*-
# @Time    : 2020-07-22 14:12
# @Author  : wangbin
import json
import random
import socket
import sys
import time

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError


class ZKMaster(object):
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.zk = KazooClient(hosts='127.0.0.1:2181')
        self.zk.start()
        self._workers = []

    # 将自己注册到zk,临时节点连接不能中断,同时监听/rpc/master是否存在,不存在重新竞选master
    def register_zk(self, event=None):
        """
        注册到zookeeper
        """
        self.zk.ensure_path('/rpc')  # 创建根节点
        value = json.dumps({'host': self.host, 'port': self.port})
        # 创建服务子节点
        try:
            self.zk.create('/rpc/master', value.encode(), ephemeral=True)
        except NodeExistsError as e:
            print(e)
        self.zk.exists('/rpc/master', watch=self.register_zk)

    def do(self):
        self.get_works()
        self.watch_tasks()
        while True:
            print(self._workers)
            time.sleep(3)

    # 监控/rpc/workers子节点变化,都会实时更新self._servers列表
    def get_works(self, event=None):
        """
        从zookeeper获取服务器地址信息列表
        """
        workers = self.zk.get_children('/rpc/workers', watch=self.get_works)
        self._workers = []
        for worker in workers:
            data = self.zk.get('/rpc/workers/' + worker)[0]
            if data:
                addr = json.loads(data.decode())
                self._workers.append(addr)

    # 监控最新任务,没有状态的进行分配,成功的进行删除
    def watch_tasks(self, event=None):
        tasks = self.zk.get_children('/rpc/tasks', watch=self.watch_tasks)
        for task in tasks:
            # 如果没有status,那就没有在执行
            is_exist = self.zk.exists('/rpc/tasks/' + task + '/status')
            if not is_exist:
                worker = random.choice(self._workers)
                self.assign(worker['host'], task)
            else:
                # 如果task任务状态为成功,则删除task节点
                status, data = self.zk.get('/rpc/tasks/' + task + '/status')
                if status == b'done':
                    self.zk.delete('/rpc/tasks/' + task + '/status')
                    self.zk.delete('/rpc/tasks/' + task)
                    print('delete done task=' + task)

    # 将任务分配给worker,分配前打上状态标记
    def assign(self, worker, task):
        print('task=%s schedule to %s' % (task, worker))
        self.zk.create('/rpc/tasks/' + task + '/status', b'schedule')
        self.zk.create('/rpc/assign/{worker}/{task}'.format(worker=worker, task=task), value=b'')


if __name__ == '__main__':
    host = sys.argv[1]
    port = sys.argv[2]
    m = ZKMaster(host, int(port))
    m.register_zk()
    m.do()
python借助zookeeper实现分布式服务(二)1-22

zk_worker.py

python借助zookeeper实现分布式服务(二)1-22
# -*- coding:utf-8 -*-
# @Time    : 2020-07-20 17:50
# @Author  : wangbin
import json
import socket
import sys
import time

from kazoo.client import KazooClient

# worker将自己注册到zk
from kazoo.exceptions import NodeExistsError


class ZKWorker(object):
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.zk = None

    # 将自己注册到zk,临时节点,所以连接不能中断
    def register_zk(self):
        """
        注册到zookeeper
        """
        self.zk = KazooClient(hosts='127.0.0.1:2181')
        self.zk.start()
        self.zk.ensure_path('/rpc/workers')  # 创建根节点
        value = json.dumps({'host': self.host, 'port': self.port})
        # 创建服务子节点
        self.zk.create('/rpc/workers/{name}'.format(name=self.host), value.encode(), ephemeral=True)

    def wait_assign(self, event=None):
        """create /assign/worker1.example.com ""
            ls -w /assign/worker1.example.com """
        self.zk.ensure_path('/rpc/assign')  # 创建根节点
        try:
            self.zk.create('/rpc/assign/{name}'.format(name=self.host), b'')
        except NodeExistsError as e:
            print(e)
        tasks = self.zk.get_children('/rpc/assign/{name}'.format(name=self.host), watch=self.wait_assign)
        print(tasks)
        for task in tasks:
            self.do_task(task)

    # 执行
    def do_task(self, task):
        print('ready to do ' + task)
        time.sleep(2)
        self.update_task_status(task)
        self.delete_assign(task)

    # 获取新分配的任务,执行后更新任务状态
    def update_task_status(self, task):
        self.zk.set('/rpc/tasks/{task}/status'.format(task=task), b'done')
        print('task=%s is done! ' % task)

    def delete_assign(self, task):
        self.zk.delete('/rpc/assign/{host}/{task}'.format(host=self.host, task=task))
        print('delete done assign task=' + task)


if __name__ == '__main__':
    host = sys.argv[1]
    port = sys.argv[2]
    worker = ZKWorker(host, int(port))
    worker.register_zk()
    worker.wait_assign()
    while True:
        print('i am worker!wait for task')
        time.sleep(3)
python借助zookeeper实现分布式服务(二)1-22

zk_client.py

python借助zookeeper实现分布式服务(二)1-22
# -*- coding:utf-8 -*-
# @Time    : 2020-07-20 17:53
# @Author  : wangbin

from kazoo.client import KazooClient


# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient(object):
    def __init__(self):
        self.zk = KazooClient(hosts='127.0.0.1:2181')
        self.zk.start()

    def add_task(self):
        self.zk.ensure_path('/rpc/tasks')
        task = self.zk.create('/rpc/tasks/task-', value=b'cmd', sequence=True)
        print(task)


if __name__ == '__main__':
    client = ZKClient()
    client.add_task()
上一篇:[C++]C++类基本语法


下一篇:Visual Studio Code for .Net Framework