发现身边很多朋友都在学习Python,而Python作为一个计算语言,很少有练习的机会,今天开放一个之前给广州客户做的消息中间件源码,读懂这套代码基本上就会应用Python,包括了:
1、websocket
2、类与对象
3、多线程
4、日志处理
5、json处理
6、时间处理
7、MySQL数据库处理
8、HTTP request 处理
9、头文件引用
一、定义配置文件 uncall_config.py
__author__ = 'ideacall' LISTEN_IP = "xx.xx.xx.xx" LISTEN_PORT = 6899 HOST= "" PORT=3306 USER='root' PASSWD='' DB='' UNCALL_HTTP_HOST = ""
二、主文件
#coding=utf8
#!/usr/bin/python
from __future__ import print_function
import struct,socket
import hashlib
import threading,random
import time
import json
import pymysql
from base64 import b64encode, b64decode
import urllib
import uncall_config
from time import ctime,sleep
SEND_BUF_SIZE = 4096 # send buf
RECV_BUF_SIZE = 4096 # recv buf
LISTEN_IP = uncall_config.LISTEN_IP #listen ip address
LISTEN_PORT = uncall_config.LISTEN_PORT #listen ip ports
HOST = uncall_config.HOST #db host
PORT = uncall_config.PORT #db port
USER = uncall_config.USER #db user
PASSWD = uncall_config.PASSWD #db pwd
DB = uncall_config.DB # db datanames
UNCALL_HTTP_HOST = uncall_config.UNCALL_HTTP_HOST #uncallcc webservices address
connectionlist = {}
webclasslist = {}
pymysql.install_as_MySQLdb()
def write_log(msg):
f=open("/tmp/cxst_socket.log","a+")
f.write(msg+"\n")
f.close()
#check number strip is 0
def str_number_strip_check(s):
if (s.startswith('01')) :
return s.strip('0')
else:
return s
#check exten login
def check_extension(extension):
rows = 0
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("select extension, status, dnd from users where extension='"+str(extension)+"'")
rows = cur.rowcount
cur.close()
conn.close()
return rows
def pushData():
global webclasslist
for webclient in webclasslist.values():
write_log(' TASK THREAD -----> %s is_login status %s %s ' %(webclient.index, webclient.is_login, webclient.extension))
if webclient.is_login > 0 :
status = get_extension_status(webclient.extension)
if status != webclient.status and len(status)>0 :
webclient.status = status
ret_data = {'Event':'STATUS','errorCode':'E0','errorMsg':'SUCCESS','data':status}
sendMessageIndex(json.dumps(ret_data),webclient.index)
pop_list_callin = get_extension_pop_callin(webclient.extension)
if len(pop_list_callin) >0 :
sendMessageIndex(json.dumps(pop_list_callin),webclient.index)
pop_list_callout = get_extension_pop_callout(webclient.extension)
if len(pop_list_callout) >0 :
sendMessageIndex(json.dumps(pop_list_callout),webclient.index)
cdr_list = get_extension_cdr(webclient.extension)
if len(cdr_list):
ret_data = {'Event':'CDR','errorCode':'E0','errorMsg':'SUCCESS','data':cdr_list}
sendMessageIndex(json.dumps(ret_data),webclient.index)
sleep(3)
def data_check():
while True:
pushData()
sleep(2)
def get_extension_status(extension):
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("SELECT d.device as TerminalId, d.extension as StaffId , FROM_UNIXTIME(d.logintime) as `Time`, u.dnd, u.`status` from asterisk.device_login as d LEFT JOIN asterisk.users as u on u.extension = d.device where d.extension ='"+extension+"'")
retlist=[]
for r in cur:
row ={'TerminalId': 'MIDEA'+str(r[0]),'StaffId' : str(r[1]), 'Time': str(r[2]), 'dnd': str(r[3]), 'status': str(r[4])}
retlist.append(row)
cur.close()
conn.close()
return retlist
def get_extension_status_all():
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("SELECT d.device as TerminalId, d.extension as StaffId , FROM_UNIXTIME(d.logintime) as `Time`, u.dnd, u.`status` from asterisk.device_login as d LEFT JOIN asterisk.users as u on u.extension = d.device")
retlist=[]
for r in cur:
row ={'TerminalId': 'MIDEA'+str(r[0]),'StaffId' : str(r[1]), 'Time': str(r[2]), 'dnd': str(r[3]), 'status': str(r[4])}
retlist.append(row)
cur.close()
conn.close()
return retlist
def get_extension_cdr(extension):
retlist =[]
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur_update = conn.cursor()
cdr_tables = "cdro_"+str(time.strftime("%Y_%-m_%-d",time.localtime()))
cur.execute("select 1 from information_schema.TABLES where table_name='"+cdr_tables+"'")
if cur.rowcount > 0 :
cur.execute("select src, dst, amaflags, billsec,disposition,uniqueid,userfield,calldate,hangup_src from asteriskcdrdb."+cdr_tables+" where (src ='"+extension+"' or dst = '"+extension+"') and analysis = '0' LIMIT 1")
for r in cur:
row ={'src': str_number_strip_check(str(r[0])),'dst' : str_number_strip_check(str(r[1])),'amaflags': str(r[2]),'billsec': str(r[3]),'disposition': str(r[4]),'uniqueid': str(r[5]),'userfield': str(r[6]),'calldate': str(r[7]),'hangup_src': str(r[8])}
retlist.append(row)
cur_update.execute("update asteriskcdrdb."+cdr_tables+" set analysis = '1' where uniqueid = '"+str(r[5])+"'")
cur_update.close()
cur.close()
conn.close()
return retlist
def check_extension_cdr(uniqueid):
retlist =[]
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cdr_tables = "cdro_"+str(time.strftime("%Y_%-m_%-d",time.localtime()))
cur.execute("select 1 from information_schema.TABLES where table_name='"+cdr_tables+"'")
if cur.rowcount > 0 :
cur.execute("select src, dst, amaflags, billsec,disposition,uniqueid,userfield,calldate,hangup_src from asteriskcdrdb."+cdr_tables+" where uniqueid = '"+uniqueid+"'")
for r in cur:
row ={'src': str_number_strip_check(str(r[0])),'dst' : str_number_strip_check(str(r[1])),'amaflags': str(r[2]),'billsec': str(r[3]),'disposition': str(r[4]),'uniqueid': str(r[5]),'userfield': str(r[6]),'calldate': str(r[7]),'hangup_src': str(r[8])}
retlist.append(row)
cur.close()
conn.close()
return retlist
def get_device_by_extension(extension):
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("select device from `asterisk`.`device_login` where extension = '"+extension+"' limit 1 ")
device = ""
for r in cur:
device = 'MIDEA'+str(r[0])
cur.close()
conn.close()
return device
#update set heart data
def update_set_heart(extension):
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("UPDATE asterisk.device_login set heart_time = UNIX_TIMESTAMP(NOW()) where extension = '" + extension +"'")
cur.close()
conn.close()
return ""
def get_extension_pop_callin(extension):
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("select activation, calla as src , callb as dst, uid, Stats, action, pjid from asteriskcdrdb.pop where callb='"+extension+"' limit 1 ")
row = ""
for r in cur:
device = get_device_by_extension(str(r[2]));
row ={'Event':'CALLIN','Time':str(r[0]),'CallId' : str_number_strip_check(str(r[1])), 'StaffId': str(r[2]), 'CallSession': str(r[3]), 'Stats': str(r[4]),'action': str(r[5]),'action': str(r[6]),'TerminalId':device}
cur.execute("DELETE from asteriskcdrdb.pop where uid = '"+str(r[3])+"'")
cur.execute("update asterisk.users set heart_times =NOW() where extension = '"+extension+"'")
cur.close()
conn.close()
return row
def get_extension_pop_callout(extension):
conn = pymysql.connect(host=HOST, port=PORT,user=USER,passwd=PASSWD,db=DB)
cur = conn.cursor()
cur.execute("select activation, calla as src , callb as dst, uid, Stats, action, pjid from asteriskcdrdb.pop where calla='"+extension+"' limit 1")
row = ""
for r in cur:
device = get_device_by_extension(str(r[1]))
row ={'Event':'CALLOUT','Time':str(r[0]),'StaffId' : str(r[1]), 'CallId': str_number_strip_check(str(r[2])), 'CallSession': str(r[3]), 'Stats': str(r[4]),'action': str(r[5]),'action': str(r[6]),'TerminalId':device}
cur.execute("DELETE from asteriskcdrdb.pop where uid = '"+str(r[3])+"'")
cur.close()
conn.close()
return row
#get http request
def http_reques_uncallcc(url):
req_header = {'User-Agent':'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
'Accept':'text/html;q=0.9,*/*;q=0.8',
'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
'Accept-Encoding':'gzip',
'Connection':'close',
'Referer':None
}
req_timeout = 3
req = urllib2.Request(url,None,req_header)
resp = urllib2.urlopen(req,None,req_timeout)
html = resp.read()
return html
def parse_data(msg):
code_length = ord(msg[1]) & 127
if code_length == 126:
masks = msg[4:8]
data = msg[8:]
elif code_length == 127:
masks = msg[10:14]
data = msg[14:]
else:
masks = msg[2:6]
data = msg[6:]
i = 0
raw_str = ''
for d in data:
raw_str += chr(ord(d) ^ ord(masks[i%4]))
i += 1
return raw_str
def sendMessage(message):
global connectionlist
for connection in connectionlist.values():
back_str = []
back_str.append('\x81')
data_length = len(message)
if data_length <= 125:
back_str.append(chr(data_length))
else:
back_str.append(chr(126))
back_str.append(chr(data_length >> 8))
back_str.append(chr(data_length & 0xFF))
back_str = "".join(back_str) + message
write_log(u'send message:' + message)
connection.send(back_str)
def sendMessageIndex(message, index):
try:
global connectionlist
back_str = []
back_str.append('\x81')
data_length = len(message)
if data_length <= 125:
back_str.append(chr(data_length))
else:
back_str.append(chr(126))
back_str.append(chr(data_length >> 8))
back_str.append(chr(data_length & 0xFF))
back_str = "".join(back_str) + message
write_log(u'send message:' + message)
connectionlist['connection'+str(index)].send(back_str)
except (ZeroDivisionError,Exception) as e:
del webclasslist[index]
connectionlist['connection'+str(index)].is_login = 0
deleteconnection(str(index))
connectionlist['connection'+str(index)].close()
write_log("EX1------->"+e)
def deleteconnection(item):
global connectionlist
global webclasslist
del webclasslist[item]
del connectionlist['connection'+item]
class WebSocket(threading.Thread):
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
def __init__(self,conn,index,name,remote, path="/"):
threading.Thread.__init__(self)
self.conn = conn
self.index = index
self.name = name
self.remote = remote
self.path = path
self.buffer = ""
self.extension = ""
self.is_login = 0
self.device =""
self.status = ""
def run(self):
write_log('Uncallcc Web Socket%s Start!' % self.index)
headers = {}
self.handshaken = False
while True:
if self.handshaken == False:
write_log('Socket%s Start Handshaken with %s!' % (self.index,self.remote))
self.buffer += bytes.decode(self.conn.recv(RECV_BUF_SIZE))
if self.buffer.find('\r\n\r\n') != -1:
header, data = self.buffer.split('\r\n\r\n', 1)
for line in header.split("\r\n")[1:]:
key, value = line.split(": ", 1)
headers[key] = value
headers["Location"] = ("ws://%s%s" %(headers["Host"], self.path))
key = headers['Sec-WebSocket-Key']
token = b64encode(hashlib.sha1(str.encode(str(key + self.GUID))).digest())
handshake="HTTP/1.1 101 Switching Protocols\r\n"\
"Upgrade: websocket\r\n"\
"Connection: Upgrade\r\n"\
"Sec-WebSocket-Accept: "+bytes.decode(token)+"\r\n"\
"WebSocket-Origin: "+str(headers["Origin"])+"\r\n"\
"WebSocket-Location: "+str(headers["Location"])+"\r\n\r\n"
self.conn.send(str.encode(str(handshake)))
self.handshaken = True
write_log('Socket%s Handshaken with %s success!' %(self.index, self.remote))
else:
try:
mm=self.conn.recv(RECV_BUF_SIZE)
msg_utf8 = parse_data(mm) #utf8
msg_unicode = msg_utf8.decode('utf-8', 'ignore') #unicode
msg_json_data = json.loads(msg_unicode)
action = msg_json_data['Event']
if self.is_login == 0:
if action!="LOGIN":
if action == 'LOGOUT': #quit
extension = msg_json_data['extension']
paaswd = msg_json_data['paaswd']
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+ extension +"&device="+devices
write_log('-----> %s action LOGOUT : %s ' %(self.index, request_url))
request = http_reques_uncallcc(request_url)
if request == "OK":
ret_data = {'Event':'LOGOUT','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'LOGOUT','errorCode':'E6','errorMsg':request}
sendMessageIndex(json.dumps(ret_data),self.index)
else :
ret_data = {'Event':'LOGIN','errorCode':'E2','errorMsg':'NO LOGIN'}
sendMessageIndex(json.dumps(ret_data),self.index)
else :
extension = msg_json_data['extension']
paaswd = msg_json_data['paaswd']
write_log('----->action start login : %s : %s ' %(self.index, paas))
if check_extension(extension) > 0 :
self.is_login=1
self.extension = extension
ret_data = {'Event':'LOGIN','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else :
ret_data = {'Event':'LOGIN','errorCode':'E4','errorMsg':'NO EXTEN'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
# dial
if action != "DIAL" and action != "QUIT" and action != "LOGOUT" and action !="HANGUP" and action != "GETSEATALL" and action !="GETSEAT" and action !="HB" and action !="SETBUSY" and action !="SETIDLE" and action !="TRANSFERCALL" and action!='CHECK_CDR' and action !="CALLIN" and action != "CALLOUT" :
ret_data = {'errorCode':'E3','errorMsg':'NO EVENT'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
if action == 'QUIT': #quit
extension = msg_json_data['StaffId']
devices_midea = msg_json_data['TerminalId']
devices = devices_midea[5:]
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+self.extension+"&device="+devices
write_log('-----> %s action QUIT : %s ' %(self.index, request_url))
request = http_reques_uncallcc(request_url)
ret_data = {'Event':'QUIT','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
self.is_login=0
deleteconnection(str(self.index))
self.conn.close()
break
if action == 'DIAL': #dial event
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/call.php?extension="+self.device+"&phone="+msg_json_data['DTMF']
request = http_reques_uncallcc(request_url)
write_log('-----> %s action DIAL : %s ' %(self.index, request_url))
if request == "OK":
ret_data = {'Event':'DIAL','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'DIAL','errorCode':'E5','errorMsg':'FAILED'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "HB": #pop event
ret_data = {'Event':'HB','errorCode':'E0','errorMsg':'SUCCESS'}
write_log('-----> %s action HB : %s ' %(self.index, extension))
update_set_heart(extension)
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "GETSEATALL" : #ALL STATUS
ret_data = {'Event':'GETSEATALL','errorCode':'E0','errorMsg':'SUCCESS','data':get_extension_status_all()}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "HANGUP": #HANGUP
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/hangup.php?extension="+self.device
request = http_reques_uncallcc(request_url)
write_log('-----> %s action HANGUP : %s ' %(self.index, request_url))
if request == "OK":
ret_data = {'Event':'HANGUP','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'HANGUP','errorCode':'E5','errorMsg':'FAILED'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "SETBUSY" : #set busy
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/setbusy.php?extension="+self.device
request = http_reques_uncallcc(request_url)
write_log('-----> %s action SETBUSY : %s ' %(self.index, request_url))
if request == "OK":
ret_data = {'Event':'SETBUSY','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'SETBUSY','errorCode':'E5','errorMsg':'FAILED'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "SETIDLE" : #set idle
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/setIdle.php?extension="+self.device
write_log('-----> %s action SETIDLE : %s ' %(self.index, request_url))
request = http_reques_uncallcc(request_url)
if request == "OK":
ret_data = {'Event':'SETIDLE','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'SETIDLE','errorCode':'E5','errorMsg':'FAILED'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "TRANSFERCALL" : #transferCall
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/transferCall.php?extension="+self.device+"&phone="+msg_json_data['Phone']
write_log('-----> %s action TRANSFERCALL : %s ' %(self.index, request_url))
request = http_reques_uncallcc(request_url)
if request == "OK":
ret_data = {'Event':'TRANSFERCALL','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
else:
ret_data = {'Event':'TRANSFERCALL','errorCode':'E5','errorMsg':'FAILED'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "CHECK_CDR":
uniquid = msg_json_data['Uniquid']
cdr_list = check_extension_cdr(uniquid)
ret_data = {'Event':'CHECK_CDR','errorCode':'E0','errorMsg':'SUCCESS','data':cdr_list}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "CALLIN":
ret_data = {'Event':'CALLIN','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
if action == "LOGOUT":
request_url = "http://"+UNCALL_HTTP_HOST+"/phpapi/deviceloout.php?exten="+self.extension+"&device="+self.device
request = http_reques_uncallcc(request_url)
write_log('-----> %s action LOGOUT : %s ' %(self.index, request_url))
ret_data = {'Event':'CALLOUT','errorCode':'E0','errorMsg':'SUCCESS'}
sendMessageIndex(json.dumps(ret_data),self.index)
self.is_login=0
deleteconnection(str(self.index))
self.conn.close()
break
except (ZeroDivisionError,Exception) as e:
ret_data = {'errorCode':'E1','errorMsg':'REQUEST ERROR'}
self.is_login=0
deleteconnection(str(self.index))
self.conn.close()
write_log("EX02---> "+str(self.index)+" :"+str(e))
break
self.buffer = ""
class WebSocketServer(object):
def __init__(self):
self.socket = None
def begin(self):
write_log('Uncallcc WebSocketServer Start!')
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, SEND_BUF_SIZE)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE)
self.socket.bind((LISTEN_IP,LISTEN_PORT))
self.socket.listen(100)
global connectionlist
global webclasslist
i=0
while True:
connection, address = self.socket.accept()
username=address[0]
newSocket = WebSocket(connection,i,username,address)
newSocket.start()
webclasslist[i]= newSocket
connectionlist['connection'+str(i)]=connection
write_log('add connection'+str(i))
i = i + 1
if __name__ == "__main__":
t = threading.Thread(target=data_check)
t.setDaemon(True)
t.start()
server = WebSocketServer()
server.begin()三、HTML websocket 测试
<!DOCTYPE html>
<html>
<head>
<title>WebSocket</title>
<style>
html, body {
font: normal 0.9em arial, helvetica;
}
#log {
width: 800px;
height: 400px;
border: 1px solid #7F9DB9;
overflow: auto;
}
#msg {
width: 800px;
}
</style>
<script>
var socket;
function init() {
var host = "ws://120.177.122.25:6899/";
try {
socket = new WebSocket(host);
socket.onopen = function (msg) {
log('Connected');
};
socket.onmessage = function (msg) {
log(msg.data);
if(msg.data=="SUCCESS"){//连接成功
}
if(msg.data.indexOf("errorCode")){//有事件交互
var json_data = JSON.parse(msg.data);
console.log(json_data['errorCode']);
if(json_data['errorCode']=="E0"){
if(json_data['Event']=="LOGIN"){//
console.log('SUCCESS LOGIN');
send_hearbeat();
}
}
}
};
socket.onclose = function (msg) {
log("Lose Connection!");
};
}
catch (ex) {
log(ex);
}
$("msg").focus();
}
function send_hearbeat()
{
socket.send("{\"Event\":\"HB\",\"Exten\":\"801\"}");
setTimeout("send_hearbeat();", 30000);
}
function send() {
var txt, msg;
txt = $("msg");
msg = txt.value;
if (!msg) {
alert("Message can not be empty");
return;
}
txt.value = "";
txt.focus();
try {
socket.send(msg);
} catch (ex) {
log(ex);
}
}
window.onbeforeunload = function () {
try {
socket.send('quit');
socket.close();
socket = null;
}
catch (ex) {
log(ex);
}
};
function $(id) {
return document.getElementById(id);
}
function log(msg) {
$("log").innerHTML += "<br>" + msg;
}
function onkey(event) {
if (event.keyCode == 13) {
send();
}
}
</script>
</head>
<body onload="init()">
<h3>Uncallcc WebSocket Demo</h3>
<br><br>
<div id="log"></div><br><br>
<input id="msg" type="textbox" onkeypress="onkey(event)"/>
<br><br>
<button onclick="send()">Send</button>
</body>
</html>
发表评论