import { insertData2Dataset } from '@/service/core/dataset/data/controller'; import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constants'; import { pushGenerateVectorUsage } from '@/service/support/wallet/usage/push'; import { checkTeamAiPointsAndLock } from './utils'; import { addMinutes } from 'date-fns'; import { addLog } from '@fastgpt/service/common/system/log'; import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema'; import { deleteDatasetDataVector, insertDatasetDataVector } from '@fastgpt/service/common/vectorStore/controller'; import { getEmbeddingModel } from '@fastgpt/service/core/ai/model'; import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; import { DatasetTrainingSchemaType } from '@fastgpt/global/core/dataset/type'; import { Document } from '@fastgpt/service/common/mongo'; const reduceQueue = () => { global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0; return global.vectorQueueLen === 0; }; const reduceQueueAndReturn = (delay = 0) => { reduceQueue(); if (delay) { setTimeout(() => { generateVector(); }, delay); } else { generateVector(); } }; /* 索引生成队列。每导入一次,就是一个单独的线程 */ export async function generateVector(): Promise { const max = global.systemEnv?.vectorMaxProcess || 10; if (global.vectorQueueLen >= max) return; global.vectorQueueLen++; const start = Date.now(); // get training data const { data, done = false, error = false } = await (async () => { try { const data = await MongoDatasetTraining.findOneAndUpdate( { mode: TrainingModeEnum.chunk, retryCount: { $gte: 0 }, lockTime: { $lte: addMinutes(new Date(), -3) } }, { lockTime: new Date(), $inc: { retryCount: -1 } } ); // task preemption if (!data) { return { done: true }; } return { data }; } catch (error) { addLog.error(`Get Training Data error`, error); return { error: true }; } })(); if (done || !data) { if (reduceQueue()) { addLog.info(`[Vector Queue] Done`); } return; } if (error) { addLog.error(`[Vector Queue] Error`, { error }); return reduceQueueAndReturn(); } // auth balance if (!(await checkTeamAiPointsAndLock(data.teamId))) { return reduceQueueAndReturn(); } addLog.info(`[Vector Queue] Start`); try { const { tokens } = await (async () => { if (data.dataId) { return rebuildData({ trainingData: data }); } else { return insertData({ trainingData: data }); } })(); // push usage pushGenerateVectorUsage({ teamId: data.teamId, tmbId: data.tmbId, inputTokens: tokens, model: data.model, billId: data.billId }); addLog.info(`[Vector Queue] Finish`, { time: Date.now() - start }); return reduceQueueAndReturn(); } catch (err: any) { addLog.error(`[Vector Queue] Error`, err); return reduceQueueAndReturn(1000); } } const rebuildData = async ({ trainingData }: { trainingData: Document & Omit< DatasetTrainingSchemaType & Required<{ _id: string; }>, never >; }) => { // find data const mongoData = await MongoDatasetData.findById( trainingData.dataId, 'indexes teamId datasetId collectionId' ); if (!mongoData) { await trainingData.deleteOne(); return Promise.reject('Not data'); } const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId); // Find next rebuilding data to insert training queue await mongoSessionRun(async (session) => { // get new mongoData insert to training const newRebuildingData = await MongoDatasetData.findOneAndUpdate( { rebuilding: true, teamId: mongoData.teamId, datasetId: mongoData.datasetId }, { $unset: { rebuilding: null }, updateTime: new Date() }, { session } ).select({ _id: 1, collectionId: 1 }); if (newRebuildingData) { await MongoDatasetTraining.create( [ { teamId: mongoData.teamId, tmbId: trainingData.tmbId, datasetId: mongoData.datasetId, collectionId: newRebuildingData.collectionId, billId: trainingData.billId, mode: TrainingModeEnum.chunk, model: trainingData.model, dataId: newRebuildingData._id, retryCount: 50 } ], { session, ordered: true } ); } }); // update vector, update dataset_data rebuilding status, delete data from training // 1. Insert new vector to dataset_data const updateResult = await Promise.all( mongoData.indexes.map(async (index, i) => { const result = await insertDatasetDataVector({ query: index.text, model: getEmbeddingModel(trainingData.model), teamId: mongoData.teamId, datasetId: mongoData.datasetId, collectionId: mongoData.collectionId }); mongoData.indexes[i].dataId = result.insertId; return result; }) ); const { tokens } = await mongoSessionRun(async (session) => { // 2. Ensure that the training data is deleted after the Mongo update is successful await mongoData.save({ session }); // 3. Delete the training data await trainingData.deleteOne({ session }); // 4. Delete old vector await deleteDatasetDataVector({ teamId: mongoData.teamId, idList: deleteVectorIdList }); return { tokens: updateResult.reduce((acc, cur) => acc + cur.tokens, 0) }; }); return { tokens }; }; const insertData = async ({ trainingData }: { trainingData: Document & Omit< DatasetTrainingSchemaType & Required<{ _id: string; }>, never >; }) => { const { tokens } = await mongoSessionRun(async (session) => { // insert new data to dataset const { tokens } = await insertData2Dataset({ teamId: trainingData.teamId, tmbId: trainingData.tmbId, datasetId: trainingData.datasetId, collectionId: trainingData.collectionId, q: trainingData.q, a: trainingData.a, chunkIndex: trainingData.chunkIndex, indexes: trainingData.indexes, embeddingModel: trainingData.model, session }); // delete data from training await trainingData.deleteOne({ session }); return { tokens }; }); return { tokens }; };