perf: generate queue

This commit is contained in:
archer
2023-05-27 04:38:00 +08:00
parent f05b12975c
commit 741381ecb0
19 changed files with 288 additions and 265 deletions

View File

@@ -7,49 +7,61 @@ import { modelServiceToolMap } from '../utils/chat';
import { ChatRoleEnum } from '@/constants/chat';
import { BillTypeEnum } from '@/constants/user';
import { pushDataToKb } from '@/pages/api/openapi/kb/pushData';
import { TrainingTypeEnum } from '@/constants/plugin';
import { ERROR_ENUM } from '../errorCode';
// 每次最多选 1 组
const listLen = 1;
export async function generateQA(): Promise<any> {
const maxProcess = Number(process.env.QA_MAX_PROCESS || 10);
if (global.qaQueueLen >= maxProcess) return;
global.qaQueueLen++;
let trainingId = '';
let userId = '';
export async function generateQA(trainingId: string): Promise<any> {
try {
// 找出一个需要生成的 dataItem (4分钟锁)
const data = await TrainingData.findOneAndUpdate(
{
_id: trainingId,
lockTime: { $lte: Date.now() - 4 * 60 * 1000 }
mode: TrainingTypeEnum.qa,
lockTime: { $lte: new Date(Date.now() - 2 * 60 * 1000) }
},
{
lockTime: new Date()
}
);
).select({
_id: 1,
userId: 1,
kbId: 1,
prompt: 1,
q: 1
});
if (!data || data.qaList.length === 0) {
await TrainingData.findOneAndDelete({
_id: trainingId,
qaList: [],
vectorList: []
});
/* 无待生成的任务 */
if (!data) {
global.qaQueueLen--;
!global.qaQueueLen && console.log(`没有需要【QA】的数据`);
return;
}
const qaList: string[] = data.qaList.slice(-listLen);
trainingId = data._id;
userId = String(data.userId);
const kbId = String(data.kbId);
// 余额校验并获取 openapi Key
const { userOpenAiKey, systemAuthKey } = await getApiKey({
model: OpenAiChatEnum.GPT35,
userId: data.userId,
userId,
type: 'training'
});
console.log(`正在生成一组QA, 包含 ${qaList.length} 组文本。ID: ${data._id}`);
console.log(`正在生成一组QA。ID: ${trainingId}`);
const startTime = Date.now();
// 请求 chatgpt 获取回答
const response = await Promise.all(
qaList.map((text) =>
[data.q].map((text) =>
modelServiceToolMap[OpenAiChatEnum.GPT35]
.chatCompletion({
apiKey: userOpenAiKey || systemAuthKey,
@@ -100,24 +112,19 @@ A2:
// 创建 向量生成 队列
pushDataToKb({
kbId: data.kbId,
kbId,
data: responseList,
userId: data.userId
userId,
mode: TrainingTypeEnum.index
});
// 删除 QA 队列。如果小于 n 条,整个数据删掉。 如果大于 n 条,仅删数组后 n 个
if (data.vectorList.length <= listLen) {
await TrainingData.findByIdAndDelete(data._id);
} else {
await TrainingData.findByIdAndUpdate(data._id, {
qaList: data.qaList.slice(0, -listLen),
lockTime: new Date('2000/1/1')
});
}
// delete data from training
await TrainingData.findByIdAndDelete(data._id);
console.log('生成QA成功time:', `${(Date.now() - startTime) / 1000}s`);
generateQA(trainingId);
global.qaQueueLen--;
generateQA();
} catch (err: any) {
// log
if (err?.response) {
@@ -130,25 +137,28 @@ A2:
// openai 账号异常或者账号余额不足,删除任务
if (openaiError2[err?.response?.data?.error?.type] || err === ERROR_ENUM.insufficientQuota) {
console.log('余额不足,删除向量生成任务');
await TrainingData.findByIdAndDelete(trainingId);
return;
await TrainingData.deleteMany({
userId
});
return generateQA();
}
// unlock
global.qaQueueLen--;
await TrainingData.findByIdAndUpdate(trainingId, {
lockTime: new Date('2000/1/1')
});
// 频率限制
if (err?.response?.statusText === 'Too Many Requests') {
console.log('生成向量次数限制,30s后尝试');
console.log('生成向量次数限制,20s后尝试');
return setTimeout(() => {
generateQA(trainingId);
}, 30000);
generateQA();
}, 20000);
}
setTimeout(() => {
generateQA(trainingId);
generateQA();
}, 1000);
}
}

View File

@@ -3,104 +3,109 @@ import { insertKbItem, PgClient } from '@/service/pg';
import { openaiEmbedding } from '@/pages/api/openapi/plugin/openaiEmbedding';
import { TrainingData } from '../models/trainingData';
import { ERROR_ENUM } from '../errorCode';
// 每次最多选 5 组
const listLen = 5;
import { TrainingTypeEnum } from '@/constants/plugin';
/* 索引生成队列。每导入一次,就是一个单独的线程 */
export async function generateVector(trainingId: string): Promise<any> {
export async function generateVector(): Promise<any> {
const maxProcess = Number(process.env.VECTOR_MAX_PROCESS || 10);
if (global.vectorQueueLen >= maxProcess) return;
global.vectorQueueLen++;
let trainingId = '';
let userId = '';
try {
// 找出一个需要生成的 dataItem (2分钟锁)
const data = await TrainingData.findOneAndUpdate(
{
_id: trainingId,
lockTime: { $lte: Date.now() - 2 * 60 * 1000 }
mode: TrainingTypeEnum.index,
lockTime: { $lte: new Date(Date.now() - 2 * 60 * 1000) }
},
{
lockTime: new Date()
}
);
).select({
_id: 1,
userId: 1,
kbId: 1,
q: 1,
a: 1
});
/* 无待生成的任务 */
if (!data) {
await TrainingData.findOneAndDelete({
_id: trainingId,
qaList: [],
vectorList: []
});
global.vectorQueueLen--;
!global.vectorQueueLen && console.log(`没有需要【索引】的数据`);
return;
}
const userId = String(data.userId);
trainingId = data._id;
userId = String(data.userId);
const kbId = String(data.kbId);
const dataItems: { q: string; a: string }[] = data.vectorList.slice(-listLen).map((item) => ({
q: item.q,
a: item.a
}));
const dataItems = [
{
q: data.q,
a: data.a
}
];
// 过滤重复的 qa 内容
const searchRes = await Promise.allSettled(
dataItems.map(async ({ q, a = '' }) => {
if (!q) {
return Promise.reject('q为空');
}
// const searchRes = await Promise.allSettled(
// dataItems.map(async ({ q, a = '' }) => {
// if (!q) {
// return Promise.reject('q为空');
// }
q = q.replace(/\\n/g, '\n');
a = a.replace(/\\n/g, '\n');
// q = q.replace(/\\n/g, '\n');
// a = a.replace(/\\n/g, '\n');
// Exactly the same data, not push
try {
const count = await PgClient.count('modelData', {
where: [['user_id', userId], 'AND', ['kb_id', kbId], 'AND', ['q', q], 'AND', ['a', a]]
});
if (count > 0) {
return Promise.reject('已经存在');
}
} catch (error) {
error;
}
return Promise.resolve({
q,
a
});
})
);
const filterData = searchRes
.filter((item) => item.status === 'fulfilled')
.map<{ q: string; a: string }>((item: any) => item.value);
// // Exactly the same data, not push
// try {
// const count = await PgClient.count('modelData', {
// where: [['user_id', userId], 'AND', ['kb_id', kbId], 'AND', ['q', q], 'AND', ['a', a]]
// });
if (filterData.length > 0) {
// 生成词向量
const vectors = await openaiEmbedding({
input: filterData.map((item) => item.q),
userId,
type: 'training'
});
// if (count > 0) {
// return Promise.reject('已经存在');
// }
// } catch (error) {
// error;
// }
// return Promise.resolve({
// q,
// a
// });
// })
// );
// const filterData = searchRes
// .filter((item) => item.status === 'fulfilled')
// .map<{ q: string; a: string }>((item: any) => item.value);
// 生成结果插入到 pg
await insertKbItem({
userId,
kbId,
data: vectors.map((vector, i) => ({
q: filterData[i].q,
a: filterData[i].a,
vector
}))
});
}
// 生成词向量
const vectors = await openaiEmbedding({
input: dataItems.map((item) => item.q),
userId,
type: 'training'
});
// 删除 mongo 训练队列. 如果小于 n 条,整个数据删掉。 如果大于 n 条,仅删数组后 n 个
if (data.vectorList.length <= listLen) {
await TrainingData.findByIdAndDelete(trainingId);
console.log(`全部向量生成完毕: ${trainingId}`);
} else {
await TrainingData.findByIdAndUpdate(trainingId, {
vectorList: data.vectorList.slice(0, -listLen),
lockTime: new Date('2000/1/1')
});
console.log(`生成向量成功: ${trainingId}`);
generateVector(trainingId);
}
// 生成结果插入到 pg
await insertKbItem({
userId,
kbId,
data: vectors.map((vector, i) => ({
q: dataItems[i].q,
a: dataItems[i].a,
vector
}))
});
// delete data from training
await TrainingData.findByIdAndDelete(data._id);
console.log(`生成向量成功: ${data._id}`);
global.vectorQueueLen--;
generateVector();
} catch (err: any) {
// log
if (err?.response) {
@@ -113,25 +118,28 @@ export async function generateVector(trainingId: string): Promise<any> {
// openai 账号异常或者账号余额不足,删除任务
if (openaiError2[err?.response?.data?.error?.type] || err === ERROR_ENUM.insufficientQuota) {
console.log('余额不足,删除向量生成任务');
await TrainingData.findByIdAndDelete(trainingId);
return;
await TrainingData.deleteMany({
userId
});
return generateVector();
}
// unlock
global.vectorQueueLen--;
await TrainingData.findByIdAndUpdate(trainingId, {
lockTime: new Date('2000/1/1')
});
// 频率限制
if (err?.response?.statusText === 'Too Many Requests') {
console.log('生成向量次数限制,30s后尝试');
console.log('生成向量次数限制,20s后尝试');
return setTimeout(() => {
generateVector(trainingId);
}, 30000);
generateVector();
}, 20000);
}
setTimeout(() => {
generateVector(trainingId);
generateVector();
}, 1000);
}
}

View File

@@ -1,9 +1,9 @@
/* 模型的知识库 */
import { Schema, model, models, Model as MongoModel } from 'mongoose';
import { TrainingDataSchema as TrainingDateType } from '@/types/mongoSchema';
import { TrainingTypeMap } from '@/constants/plugin';
// pgList and vectorList, Only one of them will work
const TrainingDataSchema = new Schema({
userId: {
type: Schema.Types.ObjectId,
@@ -19,18 +19,27 @@ const TrainingDataSchema = new Schema({
type: Date,
default: () => new Date('2000/1/1')
},
vectorList: {
type: [{ q: String, a: String }],
default: []
mode: {
type: String,
enum: Object.keys(TrainingTypeMap),
required: true
},
prompt: {
// 拆分时的提示词
type: String,
default: ''
},
qaList: {
type: [String],
default: []
q: {
// 如果是
type: String,
default: ''
},
a: {
type: String,
default: ''
},
vectorList: {
type: Object
}
});

View File

@@ -1,8 +1,7 @@
import mongoose from 'mongoose';
import { generateQA } from './events/generateQA';
import { generateVector } from './events/generateVector';
import tunnel from 'tunnel';
import { TrainingData } from './mongo';
import { startQueue } from './utils/tools';
/**
* 连接 MongoDB 数据库
@@ -38,7 +37,10 @@ export async function connectToDatabase(): Promise<void> {
});
}
startTrain();
global.qaQueueLen = 0;
global.vectorQueueLen = 0;
startQueue();
// 5 分钟后解锁不正常的数据,并触发开始训练
setTimeout(async () => {
await TrainingData.updateMany(
@@ -49,24 +51,10 @@ export async function connectToDatabase(): Promise<void> {
lockTime: new Date('2000/1/1')
}
);
startTrain();
startQueue();
}, 5 * 60 * 1000);
}
async function startTrain() {
const qa = await TrainingData.find({
qaList: { $exists: true, $ne: [] }
});
qa.map((item) => generateQA(String(item._id)));
const vector = await TrainingData.find({
vectorList: { $exists: true, $ne: [] }
});
vector.map((item) => generateVector(String(item._id)));
}
export * from './models/authCode';
export * from './models/chat';
export * from './models/model';

View File

@@ -14,8 +14,8 @@ export const connectPg = async () => {
password: process.env.PG_PASSWORD,
database: process.env.PG_DB_NAME,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000
idleTimeoutMillis: 60000,
connectionTimeoutMillis: 20000
});
global.pgClient.on('error', (err) => {

View File

@@ -45,7 +45,7 @@ export const jsonRes = <T = any>(
} else if (openaiError[error?.response?.statusText]) {
msg = openaiError[error.response.statusText];
}
console.log(error);
console.log(error?.message || error);
}
res.json({

View File

@@ -2,6 +2,8 @@ import type { NextApiResponse, NextApiHandler, NextApiRequest } from 'next';
import NextCors from 'nextjs-cors';
import crypto from 'crypto';
import jwt from 'jsonwebtoken';
import { generateQA } from '../events/generateQA';
import { generateVector } from '../events/generateVector';
/* 密码加密 */
export const hashPassword = (psw: string) => {
@@ -45,7 +47,7 @@ export function withNextCors(handler: NextApiHandler): NextApiHandler {
req: NextApiRequest,
res: NextApiResponse
) {
const methods = ['GET', 'HEAD', 'PUT', 'PATCH', 'POST', 'DELETE'];
const methods = ['GET', 'eHEAD', 'PUT', 'PATCH', 'POST', 'DELETE'];
const origin = req.headers.origin;
await NextCors(req, res, {
methods,
@@ -56,3 +58,15 @@ export function withNextCors(handler: NextApiHandler): NextApiHandler {
return handler(req, res);
};
}
export const startQueue = () => {
const qaMax = Number(process.env.QA_MAX_PROCESS || 10);
const vectorMax = Number(process.env.VECTOR_MAX_PROCESS || 10);
for (let i = 0; i < qaMax; i++) {
generateQA();
}
for (let i = 0; i < vectorMax; i++) {
generateVector();
}
};