Add image index and pdf parse (#3956)

* feat: think tag parse

* feat: parse think tag test

* feat: pdf parse ux

* feat: doc2x parse

* perf: rewrite training mode setting

* feat: image parse queue

* perf: image index

* feat: image parse process

* feat: add init sh

* fix: ts
This commit is contained in:
Archer
2025-03-03 23:08:29 +08:00
committed by archer
parent 08b6f594df
commit adf5377ebe
106 changed files with 2337 additions and 1454 deletions

View File

@@ -8,12 +8,41 @@ import { insertDatasetDataVector } from '@fastgpt/service/common/vectorStore/con
import { getDefaultIndex } from '@fastgpt/global/core/dataset/utils';
import { jiebaSplit } from '@fastgpt/service/common/string/jieba';
import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/controller';
import { DatasetDataItemType } from '@fastgpt/global/core/dataset/type';
import { DatasetDataIndexItemType, DatasetDataItemType } from '@fastgpt/global/core/dataset/type';
import { getEmbeddingModel } from '@fastgpt/service/core/ai/model';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { ClientSession } from '@fastgpt/service/common/mongo';
import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema';
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/constants';
const formatIndexes = ({
indexes,
q,
a = ''
}: {
indexes?: (Omit<DatasetDataIndexItemType, 'dataId'> & { dataId?: string })[];
q: string;
a?: string;
}) => {
indexes = indexes || [];
const defaultIndex = getDefaultIndex({ q, a });
// 1. Reset default index
indexes = indexes.filter((item) => item.type !== DatasetDataIndexTypeEnum.default);
// 2. Add default index
indexes.unshift(...defaultIndex);
// 3. Filter same text
indexes = indexes.filter(
(item, index, self) =>
!!item.text.trim() && index === self.findIndex((t) => t.text === item.text)
);
return indexes.map((index) => ({
type: index.type,
text: index.text,
dataId: index.dataId
}));
};
/* insert data.
* 1. create data id
* 2. insert pg
@@ -41,42 +70,28 @@ export async function insertData2Dataset({
return Promise.reject("teamId and tmbId can't be the same");
}
const qaStr = getDefaultIndex({ q, a }).text;
// 1. Get vector indexes and insert
// Empty indexes check, if empty, create default index
indexes =
Array.isArray(indexes) && indexes.length > 0
? indexes.map((index) => ({
text: index.text,
dataId: undefined,
defaultIndex: index.text.trim() === qaStr
}))
: [getDefaultIndex({ q, a })];
if (!indexes.find((index) => index.defaultIndex)) {
indexes.unshift(getDefaultIndex({ q, a }));
} else if (q && a && !indexes.find((index) => index.text === q)) {
// push a q index
indexes.push({
defaultIndex: false,
text: q
});
}
indexes = indexes.slice(0, 6);
const newIndexes = formatIndexes({ indexes, q, a });
// insert to vector store
const result = await Promise.all(
indexes.map((item) =>
insertDatasetDataVector({
newIndexes.map(async (item) => {
const result = await insertDatasetDataVector({
query: item.text,
model: getEmbeddingModel(model),
teamId,
datasetId,
collectionId
})
)
});
return {
tokens: result.tokens,
index: {
...item,
dataId: result.insertId
}
};
})
);
// 2. Create mongo data
@@ -89,13 +104,8 @@ export async function insertData2Dataset({
collectionId,
q,
a,
// FullText tmp
// fullTextToken: jiebaSplit({ text: qaStr }),
chunkIndex,
indexes: indexes?.map((item, i) => ({
...item,
dataId: result[i].insertId
}))
indexes: result.map((item) => item.index)
}
],
{ session, ordered: true }
@@ -109,7 +119,7 @@ export async function insertData2Dataset({
datasetId,
collectionId,
dataId: _id,
fullTextToken: jiebaSplit({ text: qaStr })
fullTextToken: jiebaSplit({ text: `${q}\n${a}`.trim() })
}
],
{ session, ordered: true }
@@ -122,7 +132,7 @@ export async function insertData2Dataset({
}
/**
* update data
* Update data(indexes overwrite)
* 1. compare indexes
* 2. insert new pg data
* session run:
@@ -139,30 +149,19 @@ export async function updateData2Dataset({
if (!Array.isArray(indexes)) {
return Promise.reject('indexes is required');
}
const qaStr = getDefaultIndex({ q, a }).text;
// patch index and update pg
// 1. Get mongo data
const mongoData = await MongoDatasetData.findById(dataId);
if (!mongoData) return Promise.reject('core.dataset.error.Data not found');
// remove defaultIndex
let formatIndexes = indexes.map((index) => ({
...index,
text: index.text.trim(),
defaultIndex: index.text.trim() === qaStr
}));
if (!formatIndexes.find((index) => index.defaultIndex)) {
const defaultIndex = mongoData.indexes.find((index) => index.defaultIndex);
formatIndexes.unshift(defaultIndex ? defaultIndex : getDefaultIndex({ q, a }));
}
formatIndexes = formatIndexes.slice(0, 6);
// 2. Compute indexes
const formatIndexesResult = formatIndexes({ indexes, q, a });
// patch indexes, create, update, delete
// 3. Patch indexes, create, update, delete
const patchResult: PatchIndexesProps[] = [];
// find database indexes in new Indexes, if have not, delete it
for (const item of mongoData.indexes) {
const index = formatIndexes.find((index) => index.dataId === item.dataId);
const index = formatIndexesResult.find((index) => index.dataId === item.dataId);
if (!index) {
patchResult.push({
type: 'delete',
@@ -170,53 +169,48 @@ export async function updateData2Dataset({
});
}
}
for (const item of formatIndexes) {
const index = mongoData.indexes.find((index) => index.dataId === item.dataId);
// in database, update
if (index) {
// default index update
if (index.defaultIndex && index.text !== qaStr) {
patchResult.push({
type: 'update',
index: {
//@ts-ignore
...index.toObject(),
text: qaStr
}
});
continue;
}
// custom index update
if (index.text !== item.text) {
patchResult.push({
type: 'update',
index: item
});
continue;
}
patchResult.push({
type: 'unChange',
index: item
});
} else {
// not in database, create
for (const item of formatIndexesResult) {
if (!item.dataId) {
patchResult.push({
type: 'create',
index: item
});
} else {
const index = mongoData.indexes.find((index) => index.dataId === item.dataId);
if (!index) continue;
// Not change
if (index.text === item.text) {
patchResult.push({
type: 'unChange',
index: {
...item,
dataId: index.dataId
}
});
} else {
// index Update
patchResult.push({
type: 'update',
index: {
...item,
dataId: index.dataId
}
});
}
}
}
// update mongo updateTime
// 4. Update mongo updateTime(便于脏数据检查器识别)
mongoData.updateTime = new Date();
await mongoData.save();
// insert vector
const clonePatchResult2Insert: PatchIndexesProps[] = JSON.parse(JSON.stringify(patchResult));
// 5. Insert vector
const insertResult = await Promise.all(
clonePatchResult2Insert.map(async (item) => {
// insert new vector and update dateId
if (item.type === 'create' || item.type === 'update') {
patchResult
.filter((item) => item.type === 'create' || item.type === 'update')
.map(async (item) => {
// insert new vector and update dateId
const result = await insertDatasetDataVector({
query: item.index.text,
model: getEmbeddingModel(model),
@@ -225,26 +219,22 @@ export async function updateData2Dataset({
collectionId: mongoData.collectionId
});
item.index.dataId = result.insertId;
return result;
}
return {
tokens: 0
};
})
return {
tokens: result.tokens
};
})
);
const tokens = insertResult.reduce((acc, cur) => acc + cur.tokens, 0);
const newIndexes = patchResult
.filter((item) => item.type !== 'delete')
.map((item) => item.index) as DatasetDataIndexItemType[];
console.log(newIndexes, '---');
// console.log(clonePatchResult2Insert);
await mongoSessionRun(async (session) => {
// update mongo
const newIndexes = clonePatchResult2Insert
.filter((item) => item.type !== 'delete')
.map((item) => item.index);
// update mongo other data
// Update MongoData
mongoData.q = q || mongoData.q;
mongoData.a = a ?? mongoData.a;
// FullText tmp
// mongoData.fullTextToken = jiebaSplit({ text: `${mongoData.q}\n${mongoData.a}`.trim() });
// @ts-ignore
mongoData.indexes = newIndexes;
await mongoData.save({ session });
@@ -255,15 +245,15 @@ export async function updateData2Dataset({
{ session }
);
// delete vector
// Delete vector
const deleteIdList = patchResult
.filter((item) => item.type === 'delete' || item.type === 'update')
.map((item) => item.index.dataId)
.filter(Boolean);
.filter(Boolean) as string[];
if (deleteIdList.length > 0) {
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteIdList as string[]
idList: deleteIdList
});
}
});

View File

@@ -142,7 +142,7 @@ ${replaceVariable(Prompt_AgentQA.fixedText, { text })}`;
teamId: data.teamId,
tmbId: data.tmbId,
collectionId: data.collectionId,
trainingMode: TrainingModeEnum.chunk,
mode: TrainingModeEnum.chunk,
data: qaArr.map((item) => ({
...item,
chunkIndex: data.chunkIndex
@@ -179,9 +179,7 @@ ${replaceVariable(Prompt_AgentQA.fixedText, { text })}`;
}
}
/**
* 检查文本是否按格式返回
*/
// Format qa answer
function formatSplitText(text: string, rawText: string) {
text = text.replace(/\\n/g, '\n'); // 将换行符替换为空格
const regex = /Q\d+:(\s*)(.*)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q\d|$)/g; // 匹配Q和A的正则表达式
@@ -194,13 +192,7 @@ function formatSplitText(text: string, rawText: string) {
if (q) {
result.push({
q,
a,
indexes: [
{
defaultIndex: true,
text: `${q}\n${a.trim().replace(/\n\s*/g, '\n')}`
}
]
a
});
}
}
@@ -211,13 +203,7 @@ function formatSplitText(text: string, rawText: string) {
chunks.forEach((chunk) => {
result.push({
q: chunk,
a: '',
indexes: [
{
defaultIndex: true,
text: chunk
}
]
a: ''
});
});
}

View File

@@ -20,6 +20,16 @@ const reduceQueue = () => {
return global.vectorQueueLen === 0;
};
const reduceQueueAndReturn = (delay = 0) => {
reduceQueue();
if (delay) {
setTimeout(() => {
generateVector();
}, delay);
} else {
generateVector();
}
};
/* 索引生成队列。每导入一次,就是一个单独的线程 */
export async function generateVector(): Promise<any> {
@@ -45,20 +55,7 @@ export async function generateVector(): Promise<any> {
lockTime: new Date(),
$inc: { retryCount: -1 }
}
).select({
_id: 1,
teamId: 1,
tmbId: 1,
datasetId: 1,
collectionId: 1,
q: 1,
a: 1,
chunkIndex: 1,
dataId: 1,
indexes: 1,
model: 1,
billId: 1
});
);
// task preemption
if (!data) {
@@ -85,14 +82,12 @@ export async function generateVector(): Promise<any> {
}
if (error) {
addLog.error(`[Vector Queue] Error`, { error });
reduceQueue();
return generateVector();
return reduceQueueAndReturn();
}
// auth balance
if (!(await checkTeamAiPointsAndLock(data.teamId))) {
reduceQueue();
return generateVector();
return reduceQueueAndReturn();
}
addLog.info(`[Vector Queue] Start`);
@@ -119,15 +114,10 @@ export async function generateVector(): Promise<any> {
time: Date.now() - start
});
reduceQueue();
generateVector();
return reduceQueueAndReturn();
} catch (err: any) {
addLog.error(`[Vector Queue] Error`, err);
reduceQueue();
setTimeout(() => {
generateVector();
}, 1000);
return reduceQueueAndReturn(1000);
}
}

View File

@@ -127,12 +127,12 @@ export const pushGenerateVectorUsage = ({
createUsage({
teamId,
tmbId,
appName: i18nT('common:support.wallet.moduleName.index'),
appName: i18nT('account_usage:embedding_index'),
totalPoints,
source,
list: [
{
moduleName: i18nT('common:support.wallet.moduleName.index'),
moduleName: i18nT('account_usage:embedding_index'),
amount: totalVector,
model: vectorModelName,
inputTokens
@@ -203,7 +203,7 @@ export const pushQuestionGuideUsage = ({
});
};
export function pushAudioSpeechUsage({
export const pushAudioSpeechUsage = ({
appName = i18nT('common:support.wallet.usage.Audio Speech'),
model,
charsLength,
@@ -217,7 +217,7 @@ export function pushAudioSpeechUsage({
teamId: string;
tmbId: string;
source: UsageSourceEnum;
}) {
}) => {
const { totalPoints, modelName } = formatModelChars2Points({
model,
inputTokens: charsLength,
@@ -239,9 +239,9 @@ export function pushAudioSpeechUsage({
}
]
});
}
};
export function pushWhisperUsage({
export const pushWhisperUsage = ({
teamId,
tmbId,
duration
@@ -249,7 +249,7 @@ export function pushWhisperUsage({
teamId: string;
tmbId: string;
duration: number;
}) {
}) => {
const whisperModel = getDefaultTTSModel();
if (!whisperModel) return;
@@ -278,4 +278,4 @@ export function pushWhisperUsage({
}
]
});
}
};