cabinet-manage/git/repository.py

244 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
'''
@AuthorRobin
@Email: 329080237@qq.com
@Wechat: 15618110227
@File: api.py
@Date: 2021/9/25 9:08
@Description:
'''
from utils.public import *
from .base import RequestGit
from django.db import transaction
import json, base64
@try_except
def getReposTags(request, full_name):
api = "/repos/"+ full_name +"/tags"
res = RequestGit(request, api).get()
data = json.loads(res.content)
tagList = [ i['name'] for i in data ]
return tagList
def getLanguages(request, owner, repo):
api = "/repos/{}/{}/languages".format(owner, repo)
res = RequestGit(request, api).get()
data = json.loads(res.text)
tagList = [i for i in data]
return tagList
@try_except
def search(request):
args = ("id", "name", "repo_name", "userKey__username", "content", "userKey__avatar", "score", "level")
items = Project.objects.exclude(private=1).values(*args).order_by("?")[:6]
#console(items)
# api = "/repos/search?page=1&limit=6"
# res = RequestGit(request, api).get()
# print(res.status_code)
# data = json.loads(res.content)
# items = data['data']
for i in items:
i['tagList'] = getLanguages(request, i["userKey__username"], i["repo_name"])
i['stars_count'] = ProjectTriple.objects.filter(projectKey=i['id'], type=2).count()
resData = {
"items": list(items),
"total": len(items)
}
#console(resData)
return jsonData(data=resData)
class CreateRepo:
def __init__(self, request, data, owner=None, repo=None):
self.userKey = request.session.get("user_id")
self.data = data
self.owner = owner
self.repo = repo
self.request = request
def project(self):
'''本地数据库创建项目'''
groupKey = self.data.get("groupKey")
if type(self.data.get("groupKey")) == list:
groupKey = [x for x in groupKey if x is not None].pop()
payload = {
"name": self.data.get("name"),
"repo_name": self.data.get("repo_name"),
"content": self.data.get("content"),
"userKey_id": self.userKey,
"groupKey_id": groupKey,
"private": self.data.get("private")
}
Project.objects.create(**payload)
def repo_create(self):
api = "/user/repos"
payload = {
"name": self.data.get("repo_name"),
"description": self.data.get("content"),
"private": self.data.get("private"),
"license": self.data.get("license")
}
#console(data, payload)
res = RequestGit(self.request, api).post(payload=payload)
if res.status_code == 201:
print('创建仓库成功')
data = json.loads(res.text)
else:
print(res.text)
raise Exception("创建项目失败!")
return data
def team(self):
'''将圈子中所有成员拉到项目群组中'''
username = self.request.session.get("username") # 拉取协助时排除自己
groupKey = self.data.get("groupKey")
# 前端返回的是数组,所以要进行过滤处理
if type(self.data.get("groupKey")) == list:
groupKey = [x for x in groupKey if x is not None].pop()
usernameList = GroupUsers.objects.exclude(userKey__username=username).filter(groupKey=groupKey).values_list('userKey__username', flat=True)
# console(usernameList)
for i in usernameList:
api = "/repos/{}/{}/collaborators/{}".format(self.owner, self.repo, i)
RequestGit(self.request, api).put()
@transaction.atomic
def create(request):
# 记录回滚点
sp = transaction.savepoint()
try:
data = json.loads(request.body)
CreateRepo(request, data).project() # 创建本地数据
res = CreateRepo(request, data).repo_create() # 创建Gitea仓库
CreateRepo(request, data, res['owner']['login'], res['name']).team() # 创建协作团队
resData = {"owner": res['owner']['login'], "repo": res['name']}
except:
# 如以上三步操作发生异常时,回滚事务
transaction.savepoint_rollback(sp)
raise Exception("创建项目失败!")
return jsonData(data=resData)
@try_except
def getRepo(request, owner, repo):
api = "/repos/"+owner+"/"+repo
res = RequestGit(request, api).get()
if res.status_code == 200:
data = json.loads(res.text)
else:
print(res.text)
raise Exception("获取项目信息失败!")
return jsonData(data=data)
@try_except
def getBranches(request, owner, repo):
'''获取项目分支列表'''
api = "/repos/"+ owner +"/"+ repo +"/branches"
res = RequestGit(request, api).get()
if res.status_code == 200:
data = json.loads(res.text)
else:
print(res.text)
raise Exception("获取项目分支信息失败!")
resData = {
"items": data
}
return jsonData(data=resData)
@try_except
def getCommits(request, owner, repo):
data = json.loads(request.GET.get("data"))
api = "/repos/{}/{}/commits?sha={}".format(owner, repo, data['sha'])
res = RequestGit(request, api).get()
if res.status_code == 200:
total = res.headers['X-Total']
else:
print(res.text)
raise Exception("获取提交数信息失败!")
resData = {
"total": total
}
#console(resData)
return jsonData(data=resData)
@try_except
def readme(request, owner, repo, filepath):
data = json.loads(request.GET.get("data"))
api = "/repos/{}/{}/contents/{}?ref={}".format(owner, repo, filepath, data['ref'])
res = RequestGit(request, api).get()
if res.status_code == 200:
data = json.loads(res.text)
content = str(base64.b64decode(data['content']), encoding="utf-8")
else:
content = ""
#print(res.text)
#raise Exception("获取readme.md失败")
resData = {
"content": content
}
return jsonData(data=resData)
def getRepoTree(request, owner, repo):
data = json.loads(request.GET.get("data"))
ref = data.get("ref")
filepath = data.get("filepath")
if filepath:
api = "/repos/{}/{}/contents/{}?ref={}".format(owner, repo, filepath, ref)
else:
api = "/repos/{}/{}/contents?ref={}".format(owner, repo, ref)
res = RequestGit(request, api).get()
items = []
if res.status_code == 200:
data = json.loads(res.text)
items = sorted(data, key=lambda x: x["type"])
resData = {
"items": items
}
return jsonData(data=resData)
def downloadZIP(request, owner, repo):
data = json.loads(request.GET.get("data"))
ref = data.get("ref")
api = "/repos/{}/{}/archive/{}.zip".format(owner, repo, ref)
res = RequestGit(request, api).get()
if res.status_code == 200:
print("下载")
else:
print(res.text)
raise Exception("下载失败!")
return jsonData()
def lastCommit(request, owner, repo):
data = json.loads(request.GET.get("data"))
ref = data.get("ref")
api = "/repos/{}/{}/commits?sha={}&page=1&limit=1".format(owner, repo, ref)
res = RequestGit(request, api).get()
if res.status_code == 200:
data = json.loads(res.text)
committer = data[0]['commit']['author']
userInfo = User.objects.filter(username=committer["name"])
committer["avatar_url"] = userInfo[0].avatar if userInfo else None
committer["total"] = res.headers['X-Total-Count']
else:
print(res.text)
raise Exception("获取提交信息失败!")
return jsonData(data=committer)