Python自定义主从分布式架构

简介:

2000元阿里云代金券免费领取,2核4G云服务器仅664元/3年,新老用户都有优惠,立即抢购>>>


阿里云采购季(云主机223元/3年)活动入口:请点击进入>>>,


阿里云学生服务器(9.5元/月)购买入口:请点击进入>>>,

0、环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

1、图:

wKiom1R-e2ry4GwEAAEVjb9davM954.jpg

2、代码:

(1)、中心节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 中心节点(主要功能是分配任务)
 
import  SocketServer, socket, Queue
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
CenterClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #中心节点用于发送网络消息的socket
TaskQueue  =  Queue.Queue()  #任务队列
 
#获取任务队列
def  GetTaskQueue():
     for  in  range ( 1 11 ):
         TaskQueue.put( str (i))
 
#CenterServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print (data)
         
         if  data.startswith( 'wait' ):   
             vec  =  data.split( ':' )
             if  len (vec) ! =  3 :
                 print ( 'Error: len(vec) != 3' )
             else :
                 nodeIP  =  vec[ 1 ]
                 nodeListenPort  =  vec[ 2 ]
                 nodeID  =  nodeIP  +  ':'  +  nodeListenPort
                 if  not  TaskQueue.empty():
                     task  =  TaskQueue.get()
                     print ( 'send task '  +  task  +  ' to '  +  nodeID)
                     CenterClient.sendto( 'task:'  +  task, (nodeIP,  int (nodeListenPort)))
                 else :
                     print ( 'TaskQueue is empty!' )
 
GetTaskQueue()   #获取任务队列
 
CenterServer  =  SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print ( 'Listen port '  +  str (CenterListenPort)  +  ' ...' )
CenterServer.serve_forever()

(2)、任务节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#encoding=utf-8
#author: walker
#date: 2014-12-03
#summary: 任务节点(请求/接收/执行任务)
 
import  time, socket, SocketServer
from  apscheduler.scheduler  import  Scheduler
 
CenterIP  =  '127.0.0.1'     #中心节点IP
CenterListenPort  =  9999    #中心节点监听端口
NodeIP  =  socket.gethostbyname(socket.gethostname())    #任务节点自身IP
NodeClient  =  socket.socket(socket.AF_INET, socket.SOCK_DGRAM)     #任务节点用于发送网络消息的socket
 
#任务:发送网络信息
def  jobSendNetMsg():
     msg  =  ''
     if  NodeServer.TaskState  = =  'wait' :
         msg  =  'wait:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     elif  NodeServer.TaskState  = =  'exec' :
         msg  =  'exec:'  +  NodeIP  +  ':'  +  str (NodeListenPort)
     
     print (msg)
     NodeClient.sendto(msg, (CenterIP, CenterListenPort)) 
 
#添加并启动定时任务
def  InitTimer():
     sched  =  Scheduler()
     sched.add_interval_job(jobSendNetMsg, seconds = 1 )
     sched.start()
     
#执行任务
def  ExecTask(task):
     print ( 'ExecTask '  +  task  +  ' ...' )
     time.sleep( 2 )
     print ( 'ExecTask '  +  task  +  ' over' )
 
#NodeServer的回调函数,在接受到udp报文是触发
class  MyUDPHandler(SocketServer.BaseRequestHandler):
     def  handle( self ):
         data  =  self .request[ 0 ].strip()
         socket  =  self .request[ 1 ]
         print ( 'recv data: '  +  data)
         
         if  data.startswith( 'task' ):
             vec  =  data.split( ':' )
             if  len (vec) ! =  2 :
                 print ( 'Error: len(vec) != 2' )
             else :
                 task  =  vec[ 1 ]
                 self .server.TaskState  =  'exec'
                 ExecTask(task)
                 self .server.TaskState  =  'wait'
 
InitTimer()
                 
NodeServer  =  SocketServer.UDPServer(('',  0 ), MyUDPHandler)
NodeServer.TaskState  =  'wait'  #(exec/wait)
NodeListenPort  =  NodeServer.server_address[ 1 ]
print ( 'NodeListenPort:'  +  str (NodeListenPort))
NodeServer.serve_forever()


*** walker * 2014-12-03 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1585826如需转载请自行联系原作者

RQSLT
相关文章
|
5天前
|
Python
解决GNURadio自定义Python OOT块-导入块时报错问题
解决GNURadio自定义Python OOT块-导入块时报错问题
30 0
|
5天前
|
存储 开发者 Python
Python中的collections模块与UserDict:用户自定义字典详解
【4月更文挑战第2天】在Python中,`collections.UserDict`是用于创建自定义字典行为的基类,它提供了一个可扩展的接口。通过继承`UserDict`,可以轻松添加或修改字典功能,如在`__init__`和`__setitem__`等方法中插入自定义逻辑。使用`UserDict`有助于保持代码可读性和可维护性,而不是直接继承内置的`dict`。例如,可以创建一个`LoggingDict`类,在设置键值对时记录操作。这样,开发者可以根据具体需求定制字典行为,同时保持对字典内部管理的抽象。
|
5天前
|
消息中间件 监控 API
在Python中如何实现微服务架构,及相关的服务间通信方案?
Python微服务架构涉及服务划分、注册发现、通信协议选择(如HTTP、gRPC、消息队列)及服务间通信实现。每个服务应自治,有独立数据库和部署流程,并需考虑容错(如分布式事务、重试、熔断)和监控日志。API网关用于请求管理和路由。实际操作需根据需求和技术栈调整,并关注服务拆分和数据一致性。
26 5
|
5天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
2天前
|
iOS开发 Python
mac:python安装路径,带你全面解析Python框架体系架构view篇
mac:python安装路径,带你全面解析Python框架体系架构view篇
|
2天前
|
数据采集 数据挖掘 关系型数据库
Excel计算函数(计算机二级)(1),2024年最新2024Python架构面试指南
Excel计算函数(计算机二级)(1),2024年最新2024Python架构面试指南
|
5天前
|
负载均衡 NoSQL 关系型数据库
深入浅出Redis(六):Redis的主从架构与主从复制原理
深入浅出Redis(六):Redis的主从架构与主从复制原理
|
5天前
|
存储 程序员 Python
Python中自定义类实例化数组的艺术
Python中自定义类实例化数组的艺术
11 1
|
5天前
|
运维 负载均衡 监控
软件体系结构 - 关系数据库(3)主从架构
【4月更文挑战第26天】软件体系结构 - 关系数据库(3)主从架构
31 0
|
5天前
|
JSON API 数据格式
Python测试架构requests-mock
【4月更文挑战第19天】
10 1
http://www.vxiaotou.com