mdserver-web/plugins/gogs/index.py

685 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
import time
import os
import sys
import re
sys.path.append("/usr/local/lib/python2.7/site-packages")
import psutil
sys.path.append(os.getcwd() + "/class/core")
import public
app_debug = False
if public.getOs() == 'darwin':
app_debug = True
def getPluginName():
return 'gogs'
def getPluginDir():
return public.getPluginDir() + '/' + getPluginName()
sys.path.append(getPluginDir() + "/class")
import mysql
def getServerDir():
return public.getServerDir() + '/' + getPluginName()
def getInitDFile():
if app_debug:
return '/tmp/' + getPluginName()
return '/etc/init.d/' + getPluginName()
def getArgs():
args = sys.argv[2:]
tmp = {}
args_len = len(args)
if args_len == 1:
t = args[0].strip('{').strip('}')
t = t.split(':', 1)
tmp[t[0]] = t[1]
elif args_len > 1:
for i in range(len(args)):
t = args[i].split(':', 1)
tmp[t[0]] = t[1]
return tmp
def getInitdConfTpl():
path = getPluginDir() + "/init.d/gogs.tpl"
return path
def getInitdConf():
path = getServerDir() + "/init.d/gogs"
return path
def getConf():
path = getServerDir() + "/custom/conf/app.ini"
return path
def getConfTpl():
path = getPluginDir() + "/conf/app.ini"
return path
def status():
data = public.execShell(
"ps -ef|grep " + getPluginName() + " |grep -v grep | grep -v python | awk '{print $2}'")
if data[0] == '':
return 'stop'
return 'start'
def getHomeDir():
if public.isAppleSystem():
user = public.execShell(
"who | sed -n '2, 1p' |awk '{print $1}'")[0].strip()
return '/Users/' + user
else:
return '/root'
def getRunUser():
if public.isAppleSystem():
user = public.execShell(
"who | sed -n '2, 1p' |awk '{print $1}'")[0].strip()
return user
else:
return 'root'
__SR = '''#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
export USER=%s
export HOME=%s && ''' % ( getRunUser(), getHomeDir())
def contentReplace(content):
service_path = public.getServerDir()
content = content.replace('{$ROOT_PATH}', public.getRootDir())
content = content.replace('{$SERVER_PATH}', service_path)
content = content.replace('{$RUN_USER}', getRunUser())
content = content.replace('{$HOME_DIR}', getHomeDir())
return content
def initDreplace():
file_tpl = getInitdConfTpl()
service_path = public.getServerDir()
initD_path = getServerDir() + '/init.d'
if not os.path.exists(initD_path):
os.mkdir(initD_path)
file_bin = initD_path + '/' + getPluginName()
if not os.path.exists(file_bin):
content = public.readFile(file_tpl)
content = contentReplace(content)
public.writeFile(file_bin, content)
public.execShell('chmod +x ' + file_bin)
conf_bin = getConf()
if not os.path.exists(conf_bin):
public.execShell('mkdir -p ' + getServerDir() + '/custom/conf')
conf_tpl = getConfTpl()
content = public.readFile(conf_tpl)
content = contentReplace(content)
public.writeFile(conf_bin, content)
log_path = getServerDir() + '/log'
if not os.path.exists(log_path):
os.mkdir(log_path)
return file_bin
def getRootUrl():
content = public.readFile(getConf())
rep = 'ROOT_URL\s*=\s*(.*)'
tmp = re.search(rep, content)
if not tmp:
return ''
return tmp.groups()[0]
def getSshPort():
content = public.readFile(getConf())
rep = 'SSH_PORT\s*=\s*(.*)'
tmp = re.search(rep, content)
if not tmp:
return ''
return tmp.groups()[0]
def getRootPath():
content = public.readFile(getConf())
rep = 'ROOT\s*=\s*(.*)'
tmp = re.search(rep, content)
if not tmp:
return ''
return tmp.groups()[0]
def getDbConfValue():
content = public.readFile(getConf())
rep_scope = "\[database\](.*?)\["
tmp = re.findall(rep_scope, content, re.S)
rep = '(\w*)\s*=\s*(.*)'
tmp = re.findall(rep, tmp[0])
r = {}
for x in range(len(tmp)):
k = tmp[x][0]
v = tmp[x][1]
r[k] = v
return r
def pMysqlDb():
conf = getDbConfValue()
host = conf['HOST'].split(':')
conn = mysql.mysql()
conn.setHost(host[0])
conn.setUser(conf['USER'])
conn.setPwd(conf['PASSWD'])
conn.setPort(int(host[1]))
conn.setDb(conf['NAME'])
return conn
def isSqlError(mysqlMsg):
# 检测数据库执行错误
_mysqlMsg = str(mysqlMsg)
# print _mysqlMsg
if "MySQLdb" in _mysqlMsg:
return public.returnData(False, 'MySQLdb组件缺失! <br>进入SSH命令行输入 pip install mysql-python')
if "2002," in _mysqlMsg:
return public.returnData(False, '数据库连接失败,请检查数据库服务是否启动!')
if "using password:" in _mysqlMsg:
return public.returnData(False, '数据库管理密码错误!')
if "Connection refused" in _mysqlMsg:
return public.returnData(False, '数据库连接失败,请检查数据库服务是否启动!')
if "1133," in _mysqlMsg:
return public.returnData(False, '数据库用户不存在!')
if "1007," in _mysqlMsg:
return public.returnData(False, '数据库已经存在!')
if "1044," in _mysqlMsg:
return public.returnData(False, mysqlMsg[1])
if "2003," in _mysqlMsg:
return public.returnData(False, mysqlMsg[1])
return public.returnData(True, 'OK')
def start():
is_frist = True
conf_bin = getConf()
if os.path.exists(conf_bin):
is_frist = False
file = initDreplace()
if is_frist:
return "第一次启动Gogs,默认使用MySQL连接!<br>可以在配置文件中重新设置,再启动!"
conn = pMysqlDb()
list_table = conn.query('show tables')
data = isSqlError(list_table)
if not data['status']:
return data['msg']
data = public.execShell(__SR + file + ' start')
if data[1] == '':
return 'ok'
return data[0]
def stop():
file = initDreplace()
data = public.execShell(__SR + file + ' stop')
if data[1] == '':
return 'ok'
return data[1]
def restart():
file = initDreplace()
data = public.execShell(__SR + file + ' restart')
if data[1] == '':
return 'ok'
return data[1]
def reload():
file = initDreplace()
data = public.execShell(__SR + file + ' reload')
if data[1] == '':
return 'ok'
return data[1]
def initdStatus():
if not app_debug:
os_name = public.getOs()
if os_name == 'darwin':
return "Apple Computer does not support"
initd_bin = getInitDFile()
if os.path.exists(initd_bin):
return 'ok'
return 'fail'
def initdInstall():
import shutil
if not app_debug:
os_name = public.getOs()
if os_name == 'darwin':
return "Apple Computer does not support"
mem_bin = initDreplace()
initd_bin = getInitDFile()
shutil.copyfile(mem_bin, initd_bin)
public.execShell('chmod +x ' + initd_bin)
return 'ok'
def initdUinstall():
if not app_debug:
os_name = public.getOs()
if os_name == 'darwin':
return "Apple Computer does not support"
initd_bin = getInitDFile()
os.remove(initd_bin)
return 'ok'
def runLog():
log_path = getServerDir() + '/log/gogs.log'
return log_path
def postReceiveLog():
log_path = getServerDir() + '/log/hooks/post-receive.log'
return log_path
def getGogsConf():
gets = [
{'name': 'DOMAIN', 'type': -1, 'ps': '服务器域名'},
{'name': 'ROOT_URL', 'type': -1, 'ps': '公开的完整URL路径'},
{'name': 'HTTP_ADDR', 'type': -1, 'ps': '应用HTTP监听地址'},
{'name': 'HTTP_PORT', 'type': -1, 'ps': '应用 HTTP 监听端口号'},
{'name': 'START_SSH_SERVER', 'type': 2, 'ps': '启动内置SSH服务器'},
{'name': 'SSH_PORT', 'type': -1, 'ps': 'SSH 端口号'},
{'name': 'REQUIRE_SIGNIN_VIEW', 'type': 2, 'ps': '强制登录浏览'},
{'name': 'ENABLE_CAPTCHA', 'type': 2, 'ps': '启用验证码服务'},
{'name': 'DISABLE_REGISTRATION', 'type': 2, 'ps': '禁止注册,只能由管理员创建帐号'},
{'name': 'ENABLE_NOTIFY_MAIL', 'type': 2, 'ps': '是否开启邮件通知'},
{'name': 'FORCE_PRIVATE', 'type': 2, 'ps': '强制要求所有新建的仓库都是私有'},
{'name': 'SHOW_FOOTER_BRANDING', 'type': 2, 'ps': 'Gogs推广信息'},
{'name': 'SHOW_FOOTER_VERSION', 'type': 2, 'ps': 'Gogs版本信息'},
{'name': 'SHOW_FOOTER_TEMPLATE_LOAD_TIME', 'type': 2, 'ps': 'Gogs模板加载时间'},
]
conf = public.readFile(getConf())
result = []
for g in gets:
rep = g['name'] + '\s*=\s*(.*)'
tmp = re.search(rep, conf)
if not tmp:
continue
g['value'] = tmp.groups()[0]
result.append(g)
return public.getJson(result)
def submitGogsConf():
gets = ['DOMAIN',
'ROOT_URL',
'HTTP_ADDR',
'HTTP_PORT',
'START_SSH_SERVER',
'SSH_PORT',
'REQUIRE_SIGNIN_VIEW',
'FORCE_PRIVATE',
'ENABLE_CAPTCHA',
'DISABLE_REGISTRATION',
'ENABLE_NOTIFY_MAIL',
'SHOW_FOOTER_BRANDING',
'SHOW_FOOTER_VERSION',
'SHOW_FOOTER_TEMPLATE_LOAD_TIME']
args = getArgs()
filename = getConf()
conf = public.readFile(filename)
for g in gets:
if g in args:
rep = g + '\s*=\s*(.*)'
val = g + ' = ' + args[g]
conf = re.sub(rep, val, conf)
public.writeFile(filename, conf)
reload()
return public.returnJson(True, '设置成功')
def userList():
import math
args = getArgs()
page = 1
page_size = 10
search = ''
if 'page' in args:
page = int(args['page'])
if 'page_size' in args:
page_size = int(args['page_size'])
if 'search' in args:
search = args['search']
data = {}
data['root_url'] = getRootUrl()
pm = pMysqlDb()
start = (page - 1) * page_size
list_count = pm.query('select count(id) as num from user')
count = list_count[0][0]
list_data = pm.query(
'select id,name,email from user order by id desc limit ' + str(start) + ',' + str(page_size))
page_info = {'count': count, 'p': page,
'row': page_size, 'tojs': 'gogsUserList'}
data['list'] = public.getPage(page_info)
data['page'] = page
data['page_size'] = page_size
data['page_count'] = int(math.ceil(count / page_size))
data['data'] = list_data
return public.returnJson(True, 'OK', data)
def getAllUserProject(user, search=''):
path = getRootPath() + '/' + user
dlist = []
if os.path.exists(path):
for filename in os.listdir(path):
tmp = {}
filePath = path + '/' + filename
if os.path.isdir(filePath):
if search == '':
tmp['name'] = filename.replace('.git', '')
dlist.append(tmp)
else:
if filename.find(search) != -1:
tmp['name'] = filename.replace('.git', '')
dlist.append(tmp)
return dlist
def checkProjectListIsHasScript(user, data):
path = getRootPath() + '/' + user
for x in range(len(data)):
name = data[x]['name'] + '.git'
path_tmp = path + '/' + name + '/custom_hooks/post-receive'
if os.path.exists(path_tmp):
data[x]['has_hook'] = True
else:
data[x]['has_hook'] = False
return data
def userProjectList():
import math
args = getArgs()
# print args
page = 1
page_size = 5
search = ''
if not 'name' in args:
return public.returnJson(False, '缺少参数name')
if 'page' in args:
page = int(args['page'])
if 'page_size' in args:
page_size = int(args['page_size'])
if 'search' in args:
search = args['search']
data = {}
ulist = getAllUserProject(args['name'])
dlist_sum = len(ulist)
start = (page - 1) * page_size
ret_data = ulist[start:start + page_size]
ret_data = checkProjectListIsHasScript(args['name'], ret_data)
data['root_url'] = getRootUrl()
data['data'] = ret_data
data['args'] = args
data['list'] = public.getPage(
{'count': dlist_sum, 'p': page, 'row': page_size, 'tojs': 'userProjectList'})
return public.returnJson(True, 'OK', data)
def projectScriptEdit():
args = getArgs()
if not 'user' in args:
return public.returnJson(True, 'username missing')
if not 'name' in args:
return public.returnJson(True, 'project name missing')
user = args['user']
name = args['name'] + '.git'
post_receive = getRootPath() + '/' + user + '/' + name + \
'/custom_hooks/post-receive'
if os.path.exists(post_receive):
return public.returnJson(True, 'OK', {'path': post_receive})
else:
return public.returnJson(False, 'file does not exist')
def projectScriptLoad():
args = getArgs()
if not 'user' in args:
return public.returnJson(True, 'username missing')
if not 'name' in args:
return public.returnJson(True, 'project name missing')
user = args['user']
name = args['name'] + '.git'
path = getRootPath() + '/' + user + '/' + name
post_receive_tpl = getPluginDir() + '/hook/post-receive.tpl'
post_receive = path + '/custom_hooks/post-receive'
if not os.path.exists(path + '/custom_hooks'):
public.execShell('mkdir -p ' + path + '/custom_hooks')
pct_content = public.readFile(post_receive_tpl)
pct_content = pct_content.replace('{$PATH}', path + '/custom_hooks')
public.writeFile(post_receive, pct_content)
public.execShell('chmod 777 ' + post_receive)
commit_tpl = getPluginDir() + '/hook/commit.tpl'
commit = path + '/custom_hooks/commit'
codeDir = public.getRootDir() + '/git'
cc_content = public.readFile(commit_tpl)
sshUrl = 'ssh://127.0.0.1:' + getSshPort()
cc_content = cc_content.replace('{$GITROOTURL}', sshUrl)
cc_content = cc_content.replace('{$CODE_DIR}', codeDir)
cc_content = cc_content.replace('{$USERNAME}', user)
cc_content = cc_content.replace('{$PROJECT}', args['name'])
cc_content = cc_content.replace('{$WEB_ROOT}', public.getWwwDir())
public.writeFile(commit, cc_content)
public.execShell('chmod 777 ' + commit)
return 'ok'
def projectScriptUnload():
args = getArgs()
if not 'user' in args:
return public.returnJson(True, 'username missing')
if not 'name' in args:
return public.returnJson(True, 'project name missing')
user = args['user']
name = args['name'] + '.git'
post_receive = getRootPath() + '/' + user + '/' + name + \
'/custom_hooks/post-receive'
public.execShell('rm -f ' + post_receive)
commit = getRootPath() + '/' + user + '/' + name + \
'/custom_hooks/commit'
public.execShell('rm -f ' + commit)
return 'ok'
def projectScriptDebug():
args = getArgs()
if not 'user' in args:
return public.returnJson(True, 'username missing')
if not 'name' in args:
return public.returnJson(True, 'project name missing')
user = args['user']
name = args['name'] + '.git'
commit_log = getRootPath() + '/' + user + '/' + name + \
'/custom_hooks/sh.log'
data = {}
if os.path.exists(commit_log):
data['status'] = True
data['path'] = commit_log
else:
data['status'] = False
data['msg'] = '没有日志文件'
return public.getJson(data)
def gogsEdit():
data = {}
data['post_receive'] = getPluginDir() + '/hook/post-receive.tpl'
data['commit'] = getPluginDir() + '/hook/commit.tpl'
return public.getJson(data)
def getRsaPublic():
path = getHomeDir()
path += '/.ssh/id_rsa.pub'
content = public.readFile(path)
data = {}
data['public'] = content
return public.getJson(data)
def getTotalStatistics():
st = status()
data = {}
if st == 'start':
pm = pMysqlDb()
list_count = pm.query('select count(id) as num from repository')
count = list_count[0][0]
data['status'] = True
data['count'] = count
data['ver'] = public.readFile(getServerDir() + '/version.pl').strip()
return public.returnJson(True, 'ok', data)
else:
data['status'] = False
data['count'] = 0
return public.returnJson(False, 'fail', data)
if __name__ == "__main__":
func = sys.argv[1]
if func == 'status':
print status()
elif func == 'start':
print start()
elif func == 'stop':
print stop()
elif func == 'restart':
print restart()
elif func == 'reload':
print reload()
elif func == 'initd_status':
print initdStatus()
elif func == 'initd_install':
print initdInstall()
elif func == 'initd_uninstall':
print initdUinstall()
elif func == 'run_log':
print runLog()
elif func == 'post_receive_log':
print postReceiveLog()
elif func == 'conf':
print getConf()
elif func == 'init_conf':
print getInitdConf()
elif func == 'get_gogs_conf':
print getGogsConf()
elif func == 'submit_gogs_conf':
print submitGogsConf()
elif func == 'user_list':
print userList()
elif func == 'user_project_list':
print userProjectList()
elif func == 'project_script_edit':
print projectScriptEdit()
elif func == 'project_script_load':
print projectScriptLoad()
elif func == 'project_script_unload':
print projectScriptUnload()
elif func == 'project_script_debug':
print projectScriptDebug()
elif func == 'gogs_edit':
print gogsEdit()
elif func == 'get_rsa_public':
print getRsaPublic()
elif func == 'get_total_statistics':
print getTotalStatistics()
else:
print 'fail'