training queue
This commit is contained in:
@@ -1,107 +1,137 @@
|
||||
import { getApiKey } from '../utils/auth';
|
||||
import { openaiError2 } from '../errorCode';
|
||||
import { PgClient } from '@/service/pg';
|
||||
import { getErrText } from '@/utils/tools';
|
||||
import { insertKbItem, PgClient } from '@/service/pg';
|
||||
import { openaiEmbedding } from '@/pages/api/openapi/plugin/openaiEmbedding';
|
||||
import { TrainingData } from '../models/trainingData';
|
||||
import { ERROR_ENUM } from '../errorCode';
|
||||
|
||||
export async function generateVector(next = false): Promise<any> {
|
||||
if (process.env.queueTask !== '1') {
|
||||
try {
|
||||
fetch(process.env.parentUrl || '');
|
||||
} catch (error) {
|
||||
console.log('parentUrl fetch error', error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (global.generatingVector && !next) return;
|
||||
|
||||
global.generatingVector = true;
|
||||
let dataId = null;
|
||||
// 每次最多选 5 组
|
||||
const listLen = 5;
|
||||
|
||||
/* 索引生成队列。每导入一次,就是一个单独的线程 */
|
||||
export async function generateVector(trainingId: string): Promise<any> {
|
||||
try {
|
||||
// 从找出一个 status = waiting 的数据
|
||||
const searchRes = await PgClient.select('modelData', {
|
||||
fields: ['id', 'q', 'user_id'],
|
||||
where: [['status', 'waiting']],
|
||||
limit: 1
|
||||
});
|
||||
|
||||
if (searchRes.rowCount === 0) {
|
||||
console.log('没有需要生成 【向量】 的数据');
|
||||
global.generatingVector = false;
|
||||
return;
|
||||
}
|
||||
|
||||
const dataItem: { id: string; q: string; userId: string } = {
|
||||
id: searchRes.rows[0].id,
|
||||
q: searchRes.rows[0].q,
|
||||
userId: searchRes.rows[0].user_id
|
||||
};
|
||||
|
||||
dataId = dataItem.id;
|
||||
|
||||
// 获取 openapi Key
|
||||
try {
|
||||
await getApiKey({ model: 'gpt-3.5-turbo', userId: dataItem.userId });
|
||||
} catch (err: any) {
|
||||
await PgClient.delete('modelData', {
|
||||
where: [['id', dataId]]
|
||||
});
|
||||
getErrText(err, '获取 OpenAi Key 失败');
|
||||
return generateVector(true);
|
||||
}
|
||||
|
||||
// 生成词向量
|
||||
const vectors = await openaiEmbedding({
|
||||
input: [dataItem.q],
|
||||
userId: dataItem.userId
|
||||
});
|
||||
|
||||
// 更新 pg 向量和状态数据
|
||||
await PgClient.update('modelData', {
|
||||
values: [
|
||||
{ key: 'vector', value: `[${vectors[0]}]` },
|
||||
{ key: 'status', value: `ready` }
|
||||
],
|
||||
where: [['id', dataId]]
|
||||
});
|
||||
|
||||
console.log(`生成向量成功: ${dataItem.id}`);
|
||||
|
||||
generateVector(true);
|
||||
} catch (error: any) {
|
||||
// log
|
||||
if (error?.response) {
|
||||
console.log('openai error: 生成向量错误');
|
||||
console.log(error.response?.status, error.response?.statusText, error.response?.data);
|
||||
} else {
|
||||
console.log('生成向量错误:', error);
|
||||
}
|
||||
|
||||
// 没有余额或者凭证错误时,拒绝任务
|
||||
if (dataId && openaiError2[error?.response?.data?.error?.type]) {
|
||||
console.log('删除向量生成任务记录');
|
||||
try {
|
||||
await PgClient.delete('modelData', {
|
||||
where: [['id', dataId]]
|
||||
});
|
||||
} catch (error) {
|
||||
error;
|
||||
// 找出一个需要生成的 dataItem (2分钟锁)
|
||||
const data = await TrainingData.findOneAndUpdate(
|
||||
{
|
||||
_id: trainingId,
|
||||
lockTime: { $lte: Date.now() - 2 * 60 * 1000 }
|
||||
},
|
||||
{
|
||||
lockTime: new Date()
|
||||
}
|
||||
generateVector(true);
|
||||
);
|
||||
|
||||
if (!data) {
|
||||
await TrainingData.findOneAndDelete({
|
||||
_id: trainingId,
|
||||
qaList: [],
|
||||
vectorList: []
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (error?.response?.statusText === 'Too Many Requests') {
|
||||
console.log('生成向量次数限制,1分钟后尝试');
|
||||
// 限制次数,1分钟后再试
|
||||
setTimeout(() => {
|
||||
generateVector(true);
|
||||
}, 60000);
|
||||
|
||||
const userId = String(data.userId);
|
||||
const kbId = String(data.kbId);
|
||||
|
||||
const dataItems: { q: string; a: string }[] = data.vectorList.slice(-listLen).map((item) => ({
|
||||
q: item.q,
|
||||
a: item.a
|
||||
}));
|
||||
|
||||
// 过滤重复的 qa 内容
|
||||
const searchRes = await Promise.allSettled(
|
||||
dataItems.map(async ({ q, a = '' }) => {
|
||||
if (!q) {
|
||||
return Promise.reject('q为空');
|
||||
}
|
||||
|
||||
q = q.replace(/\\n/g, '\n');
|
||||
a = a.replace(/\\n/g, '\n');
|
||||
|
||||
// Exactly the same data, not push
|
||||
try {
|
||||
const count = await PgClient.count('modelData', {
|
||||
where: [['user_id', userId], 'AND', ['kb_id', kbId], 'AND', ['q', q], 'AND', ['a', a]]
|
||||
});
|
||||
if (count > 0) {
|
||||
return Promise.reject('已经存在');
|
||||
}
|
||||
} catch (error) {
|
||||
error;
|
||||
}
|
||||
return Promise.resolve({
|
||||
q,
|
||||
a
|
||||
});
|
||||
})
|
||||
);
|
||||
const filterData = searchRes
|
||||
.filter((item) => item.status === 'fulfilled')
|
||||
.map<{ q: string; a: string }>((item: any) => item.value);
|
||||
|
||||
if (filterData.length > 0) {
|
||||
// 生成词向量
|
||||
const vectors = await openaiEmbedding({
|
||||
input: filterData.map((item) => item.q),
|
||||
userId,
|
||||
type: 'training'
|
||||
});
|
||||
|
||||
// 生成结果插入到 pg
|
||||
await insertKbItem({
|
||||
userId,
|
||||
kbId,
|
||||
data: vectors.map((vector, i) => ({
|
||||
q: filterData[i].q,
|
||||
a: filterData[i].a,
|
||||
vector
|
||||
}))
|
||||
});
|
||||
}
|
||||
|
||||
// 删除 mongo 训练队列. 如果小于 n 条,整个数据删掉。 如果大于 n 条,仅删数组后 n 个
|
||||
if (data.vectorList.length <= listLen) {
|
||||
await TrainingData.findByIdAndDelete(trainingId);
|
||||
console.log(`全部向量生成完毕: ${trainingId}`);
|
||||
} else {
|
||||
await TrainingData.findByIdAndUpdate(trainingId, {
|
||||
vectorList: data.vectorList.slice(0, -listLen),
|
||||
lockTime: new Date('2000/1/1')
|
||||
});
|
||||
console.log(`生成向量成功: ${trainingId}`);
|
||||
generateVector(trainingId);
|
||||
}
|
||||
} catch (err: any) {
|
||||
// log
|
||||
if (err?.response) {
|
||||
console.log('openai error: 生成向量错误');
|
||||
console.log(err.response?.status, err.response?.statusText, err.response?.data);
|
||||
} else {
|
||||
console.log('生成向量错误:', err);
|
||||
}
|
||||
|
||||
// openai 账号异常或者账号余额不足,删除任务
|
||||
if (openaiError2[err?.response?.data?.error?.type] || err === ERROR_ENUM.insufficientQuota) {
|
||||
console.log('余额不足,删除向量生成任务');
|
||||
await TrainingData.findByIdAndDelete(trainingId);
|
||||
return;
|
||||
}
|
||||
|
||||
// unlock
|
||||
await TrainingData.findByIdAndUpdate(trainingId, {
|
||||
lockTime: new Date('2000/1/1')
|
||||
});
|
||||
|
||||
// 频率限制
|
||||
if (err?.response?.statusText === 'Too Many Requests') {
|
||||
console.log('生成向量次数限制,30s后尝试');
|
||||
return setTimeout(() => {
|
||||
generateVector(trainingId);
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
generateVector(true);
|
||||
generateVector(trainingId);
|
||||
}, 1000);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user