perf: full text collection and search code;perf: rename function (#3519)

* perf: full text collection and search code

* perf: rename function

* perf: notify modal

* remove invalid code

* perf: sso login

* perf: pay process
This commit is contained in:
Archer
2025-01-03 14:33:13 +08:00
committed by archer
parent 20c6c202d2
commit c0d0c629c5
30 changed files with 423 additions and 270 deletions

View File

@@ -24,6 +24,7 @@ import { pushDataListToTrainingQueue } from '../training/controller';
import { MongoImage } from '../../../common/file/image/schema';
import { hashStr } from '@fastgpt/global/common/string/tools';
import { addDays } from 'date-fns';
import { MongoDatasetDataText } from '../data/dataTextSchema';
export const createCollectionAndInsertData = async ({
dataset,
@@ -240,12 +241,12 @@ export const delCollectionRelatedSource = async ({
.map((item) => item?.metadata?.relatedImgId || '')
.filter(Boolean);
// delete files
// Delete files
await delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList
});
// delete images
// Delete images
await delImgByRelatedId({
teamId,
relateIds: relatedImageIds,
@@ -273,7 +274,7 @@ export async function delCollection({
const datasetIds = Array.from(new Set(collections.map((item) => String(item.datasetId))));
const collectionIds = collections.map((item) => String(item._id));
// delete training data
// Delete training data
await MongoDatasetTraining.deleteMany({
teamId,
datasetIds: { $in: datasetIds },
@@ -285,11 +286,16 @@ export async function delCollection({
await delCollectionRelatedSource({ collections, session });
}
// delete dataset.datas
// Delete dataset_datas
await MongoDatasetData.deleteMany(
{ teamId, datasetIds: { $in: datasetIds }, collectionId: { $in: collectionIds } },
{ session }
);
// Delete dataset_data_texts
await MongoDatasetDataText.deleteMany(
{ teamId, datasetIds: { $in: datasetIds }, collectionId: { $in: collectionIds } },
{ session }
);
// delete collections
await MongoDatasetCollection.deleteMany(

View File

@@ -6,6 +6,7 @@ import { ClientSession } from '../../common/mongo';
import { MongoDatasetTraining } from './training/schema';
import { MongoDatasetData } from './data/schema';
import { deleteDatasetDataVector } from '../../common/vectorStore/controller';
import { MongoDatasetDataText } from './data/dataTextSchema';
/* ============= dataset ========== */
/* find all datasetId by top datasetId */
@@ -92,7 +93,7 @@ export async function delDatasetRelevantData({
{ session }
).lean();
// image and file
// Delete Image and file
await delCollectionRelatedSource({ collections, session });
// delete collections
@@ -101,9 +102,15 @@ export async function delDatasetRelevantData({
datasetId: { $in: datasetIds }
}).session(session);
// delete dataset.datas(Not need session)
// No session delete:
// Delete dataset_data_texts
await MongoDatasetDataText.deleteMany({
teamId,
datasetId: { $in: datasetIds }
});
// delete dataset_datas
await MongoDatasetData.deleteMany({ teamId, datasetId: { $in: datasetIds } });
// no session delete: delete files, vector data
// Delete vector data
await deleteDatasetDataVector({ teamId, datasetIds });
}

View File

@@ -0,0 +1,48 @@
import { connectionMongo, getMongoModel } from '../../../common/mongo';
const { Schema } = connectionMongo;
import { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type.d';
import { TeamCollectionName } from '@fastgpt/global/support/user/team/constant';
import { DatasetCollectionName } from '../schema';
import { DatasetColCollectionName } from '../collection/schema';
import { DatasetDataCollectionName } from './schema';
export const DatasetDataTextCollectionName = 'dataset_data_texts';
const DatasetDataTextSchema = new Schema({
teamId: {
type: Schema.Types.ObjectId,
ref: TeamCollectionName,
required: true
},
datasetId: {
type: Schema.Types.ObjectId,
ref: DatasetCollectionName,
required: true
},
collectionId: {
type: Schema.Types.ObjectId,
ref: DatasetColCollectionName,
required: true
},
dataId: {
type: String,
ref: DatasetDataCollectionName,
required: true
},
fullTextToken: {
type: String,
required: true
}
});
try {
DatasetDataTextSchema.index({ teamId: 1, datasetId: 1, fullTextToken: 'text' });
DatasetDataTextSchema.index({ dataId: 'hashed' });
} catch (error) {
console.log(error);
}
export const MongoDatasetDataText = getMongoModel<DatasetDataSchemaType>(
DatasetDataTextCollectionName,
DatasetDataTextSchema
);

View File

@@ -71,17 +71,8 @@ const DatasetDataSchema = new Schema({
type: Number,
default: 0
},
inited: {
type: Boolean
},
rebuilding: Boolean
});
DatasetDataSchema.virtual('collection', {
ref: DatasetColCollectionName,
localField: 'collectionId',
foreignField: '_id',
justOne: true
rebuilding: Boolean,
inited: Boolean
});
try {
@@ -100,6 +91,7 @@ try {
DatasetDataSchema.index({ updateTime: 1 });
// rebuild data
DatasetDataSchema.index({ rebuilding: 1, teamId: 1, datasetId: 1 });
DatasetDataSchema.index({ inited: 'hashed' });
} catch (error) {
console.log(error);
}

View File

@@ -8,8 +8,8 @@ import { getVectorsByText } from '../../ai/embedding';
import { getVectorModel } from '../../ai/model';
import { MongoDatasetData } from '../data/schema';
import {
DatasetCollectionSchemaType,
DatasetDataSchemaType,
DatasetDataTextSchemaType,
SearchDataResponseItemType
} from '@fastgpt/global/core/dataset/type';
import { MongoDatasetCollection } from '../collection/schema';
@@ -23,6 +23,7 @@ import { Types } from '../../../common/mongo';
import json5 from 'json5';
import { MongoDatasetCollectionTags } from '../tag/schema';
import { readFromSecondary } from '../../../common/mongo/utils';
import { MongoDatasetDataText } from '../data/dataTextSchema';
type SearchDatasetDataProps = {
teamId: string;
@@ -266,57 +267,60 @@ export async function searchDatasetData(props: SearchDatasetDataProps) {
filterCollectionIdList
});
// get q and a
const dataList = await MongoDatasetData.find(
{
teamId,
datasetId: { $in: datasetIds },
collectionId: { $in: Array.from(new Set(results.map((item) => item.collectionId))) },
'indexes.dataId': { $in: results.map((item) => item.id?.trim()) }
},
'datasetId collectionId updateTime q a chunkIndex indexes'
)
.populate<{ collection: DatasetCollectionSchemaType }>(
'collection',
'name fileId rawLink externalFileId externalFileUrl'
)
.lean();
// Get data and collections
const collectionIdList = Array.from(new Set(results.map((item) => item.collectionId)));
const [dataList, collections] = await Promise.all([
MongoDatasetData.find(
{
teamId,
datasetId: { $in: datasetIds },
collectionId: { $in: collectionIdList },
'indexes.dataId': { $in: results.map((item) => item.id?.trim()) }
},
'_id datasetId collectionId updateTime q a chunkIndex indexes',
{ ...readFromSecondary }
).lean(),
MongoDatasetCollection.find(
{
_id: { $in: collectionIdList }
},
'_id name fileId rawLink externalFileId externalFileUrl',
{ ...readFromSecondary }
).lean()
]);
// add score to data(It's already sorted. The first one is the one with the most points)
const concatResults = dataList.map((data) => {
const dataIdList = data.indexes.map((item) => item.dataId);
const formatResult = dataList
.map((data, index) => {
const collection = collections.find((col) => String(col._id) === String(data.collectionId));
if (!collection) {
console.log('Collection is not found', data);
return;
}
const maxScoreResult = results.find((item) => {
return dataIdList.includes(item.id);
});
// add score to data(It's already sorted. The first one is the one with the most points)
const dataIdList = data.indexes.map((item) => item.dataId);
const maxScoreResult = results.find((item) => {
return dataIdList.includes(item.id);
});
const score = maxScoreResult?.score || 0;
return {
...data,
score: maxScoreResult?.score || 0
};
});
const result: SearchDataResponseItemType = {
id: String(data._id),
updateTime: data.updateTime,
q: data.q,
a: data.a,
chunkIndex: data.chunkIndex,
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
...getCollectionSourceData(collection),
score: [{ type: SearchScoreTypeEnum.embedding, value: score, index }]
};
concatResults.sort((a, b) => b.score - a.score);
return result;
})
.filter(Boolean) as SearchDataResponseItemType[];
const formatResult = concatResults.map((data, index) => {
if (!data.collectionId) {
console.log('Collection is not found', data);
}
const result: SearchDataResponseItemType = {
id: String(data._id),
updateTime: data.updateTime,
q: data.q,
a: data.a,
chunkIndex: data.chunkIndex,
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
...getCollectionSourceData(data.collection),
score: [{ type: SearchScoreTypeEnum.embedding, value: data.score, index }]
};
return result;
});
formatResult.sort((a, b) => b.score[0].value - a.score[0].value);
return {
embeddingRecallResults: formatResult,
@@ -344,88 +348,114 @@ export async function searchDatasetData(props: SearchDatasetDataProps) {
};
}
let searchResults = (
const searchResults = (
await Promise.all(
datasetIds.map(async (id) => {
return MongoDatasetData.aggregate([
{
$match: {
teamId: new Types.ObjectId(teamId),
datasetId: new Types.ObjectId(id),
$text: { $search: jiebaSplit({ text: query }) },
...(filterCollectionIdList
? {
collectionId: {
$in: filterCollectionIdList.map((id) => new Types.ObjectId(id))
return MongoDatasetData.aggregate(
[
{
$match: {
teamId: new Types.ObjectId(teamId),
datasetId: new Types.ObjectId(id),
$text: { $search: jiebaSplit({ text: query }) },
...(filterCollectionIdList
? {
collectionId: {
$in: filterCollectionIdList.map((id) => new Types.ObjectId(id))
}
}
}
: {}),
...(forbidCollectionIdList && forbidCollectionIdList.length > 0
? {
collectionId: {
$nin: forbidCollectionIdList.map((id) => new Types.ObjectId(id))
: {}),
...(forbidCollectionIdList && forbidCollectionIdList.length > 0
? {
collectionId: {
$nin: forbidCollectionIdList.map((id) => new Types.ObjectId(id))
}
}
}
: {})
: {})
}
},
{
$sort: {
score: { $meta: 'textScore' }
}
},
{
$limit: limit
},
{
$project: {
_id: 1,
datasetId: 1,
collectionId: 1,
updateTime: 1,
q: 1,
a: 1,
chunkIndex: 1,
score: { $meta: 'textScore' }
}
}
},
],
{
$addFields: {
score: { $meta: 'textScore' }
}
},
{
$sort: {
score: { $meta: 'textScore' }
}
},
{
$limit: limit
},
{
$project: {
_id: 1,
datasetId: 1,
collectionId: 1,
updateTime: 1,
q: 1,
a: 1,
chunkIndex: 1,
score: 1
}
...readFromSecondary
}
]);
);
})
)
).flat() as (DatasetDataSchemaType & { score: number })[];
// resort
searchResults.sort((a, b) => b.score - a.score);
searchResults.slice(0, limit);
// Get data and collections
const collections = await MongoDatasetCollection.find(
{
_id: { $in: searchResults.map((item) => item.collectionId) }
},
'_id name fileId rawLink'
);
'_id name fileId rawLink externalFileId externalFileUrl',
{ ...readFromSecondary }
).lean();
// const [dataList, collections] = await Promise.all([
// MongoDatasetData.find(
// {
// _id: { $in: searchResults.map((item) => item.dataId) }
// },
// '_id datasetId collectionId updateTime q a chunkIndex indexes',
// { ...readFromSecondary }
// ).lean(),
// MongoDatasetCollection.find(
// {
// _id: { $in: searchResults.map((item) => item.collectionId) }
// },
// '_id name fileId rawLink externalFileId externalFileUrl',
// { ...readFromSecondary }
// ).lean()
// ]);
return {
fullTextRecallResults: searchResults.map((item, index) => {
const collection = collections.find((col) => String(col._id) === String(item.collectionId));
return {
id: String(item._id),
datasetId: String(item.datasetId),
collectionId: String(item.collectionId),
updateTime: item.updateTime,
...getCollectionSourceData(collection),
q: item.q,
a: item.a,
chunkIndex: item.chunkIndex,
indexes: item.indexes,
score: [{ type: SearchScoreTypeEnum.fullText, value: item.score, index }]
};
}),
fullTextRecallResults: searchResults
.map((data, index) => {
const collection = collections.find(
(col) => String(col._id) === String(data.collectionId)
);
if (!collection) {
console.log('Collection is not found', data);
return;
}
// const score =
// searchResults.find((item) => String(item.dataId) === String(data._id))?.score || 0;
return {
id: String(data._id),
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
updateTime: data.updateTime,
q: data.q,
a: data.a,
chunkIndex: data.chunkIndex,
indexes: data.indexes,
...getCollectionSourceData(collection),
score: [{ type: SearchScoreTypeEnum.fullText, value: data.score ?? 0, index }]
};
})
.filter(Boolean) as SearchDataResponseItemType[],
tokenLen: 0
};
};