Files
FastGPT/projects/app/src/service/events/generateVector.ts
Archer f642c9603b V4.9.4 feature (#4470)
* Training status (#4424)

* dataset data training state (#4311)

* dataset data training state

* fix

* fix ts

* fix

* fix api format

* fix

* fix

* perf: count training

* format

* fix: dataset training state (#4417)

* fix

* add test

* fix

* fix

* fix test

* fix test

* perf: training count

* count

* loading status

---------

Co-authored-by: heheer <heheer@sealos.io>

* doc

* website sync feature (#4429)

* perf: introduce BullMQ for website sync (#4403)

* perf: introduce BullMQ for website sync

* feat: new redis module

* fix: remove graceful shutdown

* perf: improve UI in dataset detail

- Updated the "change" icon SVG file.
- Modified i18n strings.
- Added new i18n string "immediate_sync".
- Improved UI in dataset detail page, including button icons and
background colors.

* refactor: Add chunkSettings to DatasetSchema

* perf: website sync ux

* env template

* fix: clean up website dataset when updating chunk settings (#4420)

* perf: check setting updated

* perf: worker currency

* feat: init script for website sync refactor (#4425)

* website feature doc

---------

Co-authored-by: a.e. <49438478+I-Info@users.noreply.github.com>

* pro migration (#4388) (#4433)

* pro migration

* reuse customPdfParseType

Co-authored-by: gggaaallleee <91131304+gggaaallleee@users.noreply.github.com>

* perf: remove loading ui

* feat: config chat file expired time

* Redis cache (#4436)

* perf: add Redis cache for vector counting (#4432)

* feat: cache

* perf: get cache key

---------

Co-authored-by: a.e. <49438478+I-Info@users.noreply.github.com>

* perf: mobile voice input (#4437)

* update:Mobile voice interaction (#4362)

* Add files via upload

* Add files via upload

* Update ollama.md

* Update ollama.md

* Add files via upload

* Update useSpeech.ts

* Update ChatInput.tsx

* Update useSpeech.ts

* Update ChatInput.tsx

* Update useSpeech.ts

* Update constants.ts

* Add files via upload

* Update ChatInput.tsx

* Update useSpeech.ts

* Update useSpeech.ts

* Update useSpeech.ts

* Update ChatInput.tsx

* Add files via upload

* Update common.json

* Update VoiceInput.tsx

* Update ChatInput.tsx

* Update VoiceInput.tsx

* Update useSpeech.ts

* Update useSpeech.ts

* Update common.json

* Update common.json

* Update common.json

* Update VoiceInput.tsx

* Update VoiceInput.tsx

* Update ChatInput.tsx

* Update VoiceInput.tsx

* Update ChatInput.tsx

* Update VoiceInput.tsx

* Update ChatInput.tsx

* Update useSpeech.ts

* Update common.json

* Update chat.json

* Update common.json

* Update chat.json

* Update common.json

* Update chat.json

* Update VoiceInput.tsx

* Update ChatInput.tsx

* Update useSpeech.ts

* Update VoiceInput.tsx

* speech ui

* 优化语音输入组件,调整输入框显示逻辑,修复语音输入遮罩层样式,更新画布背景透明度,增强用户交互体验。 (#4435)

* perf: mobil voice input

---------

Co-authored-by: dreamer6680 <1468683855@qq.com>

* Test completion v2 (#4438)

* add v2 completions (#4364)

* add v2 completions

* completion config

* config version

* fix

* frontend

* doc

* fix

* fix: completions v2 api

---------

Co-authored-by: heheer <heheer@sealos.io>

* package

* Test mongo log (#4443)

* feat: mongodb-log (#4426)

* perf: mongo log

* feat: completions stop reasoner

* mongo db log

---------

Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>

* update doc

* Update doc

* fix external var ui (#4444)

* action

* fix: ts (#4458)

* preview doc action

add docs preview permission

update preview action

udpate action

* update doc (#4460)

* update preview action

* update doc

* remove

* update

* schema

* update mq export;perf: redis cache  (#4465)

* perf: redis cache

* update mq export

* perf: website sync error tip

* add error worker

* website sync ui (#4466)

* Updated the dynamic display of the voice input pop-up (#4469)

* Update VoiceInput.tsx

* Update VoiceInput.tsx

* Update VoiceInput.tsx

* fix: voice input

---------

Co-authored-by: heheer <heheer@sealos.io>
Co-authored-by: a.e. <49438478+I-Info@users.noreply.github.com>
Co-authored-by: gggaaallleee <91131304+gggaaallleee@users.noreply.github.com>
Co-authored-by: dreamer6680 <1468683855@qq.com>
Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>
2025-04-08 12:05:04 +08:00

272 lines
7.0 KiB
TypeScript

import { insertData2Dataset } from '@/service/core/dataset/data/controller';
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constants';
import { pushGenerateVectorUsage } from '@/service/support/wallet/usage/push';
import { checkTeamAiPointsAndLock } from './utils';
import { addMinutes } from 'date-fns';
import { addLog } from '@fastgpt/service/common/system/log';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import {
deleteDatasetDataVector,
insertDatasetDataVector
} from '@fastgpt/service/common/vectorStore/controller';
import { getEmbeddingModel } from '@fastgpt/service/core/ai/model';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { DatasetTrainingSchemaType } from '@fastgpt/global/core/dataset/type';
import { Document } from '@fastgpt/service/common/mongo';
import { getErrText } from '@fastgpt/global/common/error/utils';
const reduceQueue = () => {
global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0;
return global.vectorQueueLen === 0;
};
const reduceQueueAndReturn = (delay = 0) => {
reduceQueue();
if (delay) {
setTimeout(() => {
generateVector();
}, delay);
} else {
generateVector();
}
};
/* 索引生成队列。每导入一次,就是一个单独的线程 */
export async function generateVector(): Promise<any> {
const max = global.systemEnv?.vectorMaxProcess || 10;
if (global.vectorQueueLen >= max) return;
global.vectorQueueLen++;
const start = Date.now();
// get training data
const {
data,
done = false,
error = false
} = await (async () => {
try {
const data = await MongoDatasetTraining.findOneAndUpdate(
{
mode: TrainingModeEnum.chunk,
retryCount: { $gt: 0 },
lockTime: { $lte: addMinutes(new Date(), -3) }
},
{
lockTime: new Date(),
$inc: { retryCount: -1 }
}
);
// task preemption
if (!data) {
return {
done: true
};
}
return {
data
};
} catch (error) {
addLog.error(`Get Training Data error`, error);
return {
error: true
};
}
})();
if (done || !data) {
if (reduceQueue()) {
addLog.info(`[Vector Queue] Done`);
}
return;
}
if (error) {
addLog.error(`[Vector Queue] Error`, { error });
return reduceQueueAndReturn();
}
// auth balance
if (!(await checkTeamAiPointsAndLock(data.teamId))) {
return reduceQueueAndReturn();
}
addLog.info(`[Vector Queue] Start`);
try {
const { tokens } = await (async () => {
if (data.dataId) {
return rebuildData({ trainingData: data });
} else {
return insertData({ trainingData: data });
}
})();
// push usage
pushGenerateVectorUsage({
teamId: data.teamId,
tmbId: data.tmbId,
inputTokens: tokens,
model: data.model,
billId: data.billId
});
addLog.info(`[Vector Queue] Finish`, {
time: Date.now() - start
});
return reduceQueueAndReturn();
} catch (err: any) {
addLog.error(`[Vector Queue] Error`, err);
await MongoDatasetTraining.updateOne(
{
teamId: data.teamId,
datasetId: data.datasetId,
_id: data._id
},
{
errorMsg: getErrText(err, 'unknown error')
}
);
return reduceQueueAndReturn(1000);
}
}
const rebuildData = async ({
trainingData
}: {
trainingData: Document<unknown, {}, DatasetTrainingSchemaType> &
Omit<
DatasetTrainingSchemaType &
Required<{
_id: string;
}>,
never
>;
}) => {
// find data
const mongoData = await MongoDatasetData.findById(
trainingData.dataId,
'indexes teamId datasetId collectionId'
);
if (!mongoData) {
await trainingData.deleteOne();
return Promise.reject('Not data');
}
const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId);
// Find next rebuilding data to insert training queue
await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
rebuilding: true,
teamId: mongoData.teamId,
datasetId: mongoData.datasetId
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});
if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
dataId: newRebuildingData._id,
retryCount: 50
}
],
{ session, ordered: true }
);
}
});
// update vector, update dataset_data rebuilding status, delete data from training
// 1. Insert new vector to dataset_data
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getEmbeddingModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
const { tokens } = await mongoSessionRun(async (session) => {
// 2. Ensure that the training data is deleted after the Mongo update is successful
await mongoData.save({ session });
// 3. Delete the training data
await trainingData.deleteOne({ session });
// 4. Delete old vector
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteVectorIdList
});
return {
tokens: updateResult.reduce((acc, cur) => acc + cur.tokens, 0)
};
});
return { tokens };
};
const insertData = async ({
trainingData
}: {
trainingData: Document<unknown, {}, DatasetTrainingSchemaType> &
Omit<
DatasetTrainingSchemaType &
Required<{
_id: string;
}>,
never
>;
}) => {
const { tokens } = await mongoSessionRun(async (session) => {
// insert new data to dataset
const { tokens } = await insertData2Dataset({
teamId: trainingData.teamId,
tmbId: trainingData.tmbId,
datasetId: trainingData.datasetId,
collectionId: trainingData.collectionId,
q: trainingData.q,
a: trainingData.a,
chunkIndex: trainingData.chunkIndex,
indexes: trainingData.indexes,
embeddingModel: trainingData.model,
session
});
// delete data from training
await trainingData.deleteOne({ session });
return {
tokens
};
});
return { tokens };
};