Feat: Images dataset collection (#4941)

* New pic (#4858)

* 更新数据集相关类型,添加图像文件ID和预览URL支持;优化数据集导入功能,新增图像数据集处理组件;修复部分国际化文本;更新文件上传逻辑以支持新功能。

* 与原先代码的差别

* 新增 V4.9.10 更新说明,支持 PG 设置`systemEnv.hnswMaxScanTuples`参数,优化 LLM stream 调用超时,修复全文检索多知识库排序问题。同时更新数据集索引,移除 datasetId 字段以简化查询。

* 更换成fileId_image逻辑,并增加训练队列匹配的逻辑

* 新增图片集合判断逻辑,优化预览URL生成流程,确保仅在数据集为图片集合时生成预览URL,并添加相关日志输出以便调试。

* Refactor Docker Compose configuration to comment out exposed ports for production environments, update image versions for pgvector, fastgpt, and mcp_server, and enhance Redis service with a health check. Additionally, standardize dataset collection labels in constants and improve internationalization strings across multiple languages.

* Enhance TrainingStates component by adding internationalization support for the imageParse training mode and update defaultCounts to include imageParse mode in trainingDetail API.

* Enhance dataset import context by adding additional steps for image dataset import process and improve internationalization strings for modal buttons in the useEditTitle hook.

* Update DatasetImportContext to conditionally render MyStep component based on data source type, improving the import process for non-image datasets.

* Refactor image dataset handling by improving internationalization strings, enhancing error messages, and streamlining the preview URL generation process.

* 图片上传到新建的 dataset_collection_images 表,逻辑跟随更改

* 修改了除了controller的其他部分问题

* 把图片数据集的逻辑整合到controller里面

* 补充i18n

* 补充i18n

* resolve评论:主要是上传逻辑的更改和组件复用

* 图片名称的图标显示

* 修改编译报错的命名问题

* 删除不需要的collectionid部分

* 多余文件的处理和改动一个删除按钮

* 除了loading和统一的imageId,其他都resolve掉的

* 处理图标报错

* 复用了MyPhotoView并采用全部替换的方式将imageFileId变成imageId

* 去除不必要文件修改

* 报错和字段修改

* 增加上传成功后删除临时文件的逻辑以及回退一些修改

* 删除path字段,将图片保存到gridfs内,并修改增删等操作的代码

* 修正编译错误

---------

Co-authored-by: archer <545436317@qq.com>

* perf: image dataset

* feat: insert image

* perf: image icon

* fix: training state

---------

Co-authored-by: Zhuangzai fa <143257420+ctrlz526@users.noreply.github.com>
This commit is contained in:
Archer
2025-06-03 16:30:59 +08:00
committed by archer
parent 9fb5d05865
commit 92c38d9d2f
104 changed files with 2341 additions and 693 deletions

View File

@@ -20,6 +20,10 @@ export const getVlmModel = (model?: string) => {
?.find((item) => item.model === model || item.name === model);
};
export const getVlmModelList = () => {
return Array.from(global.llmModelMap.values())?.filter((item) => item.vision) || [];
};
export const getDefaultEmbeddingModel = () => global?.systemDefaultModel.embedding!;
export const getEmbeddingModel = (model?: string) => {
if (!model) return getDefaultEmbeddingModel();

View File

@@ -5,9 +5,10 @@ import {
} from '@fastgpt/global/core/dataset/constants';
import type { CreateDatasetCollectionParams } from '@fastgpt/global/core/dataset/api.d';
import { MongoDatasetCollection } from './schema';
import {
type DatasetCollectionSchemaType,
type DatasetSchemaType
import type {
DatasetCollectionSchemaType,
DatasetDataFieldType,
DatasetSchemaType
} from '@fastgpt/global/core/dataset/type';
import { MongoDatasetTraining } from '../training/schema';
import { MongoDatasetData } from '../data/schema';
@@ -15,7 +16,7 @@ import { delImgByRelatedId } from '../../../common/file/image/controller';
import { deleteDatasetDataVector } from '../../../common/vectorDB/controller';
import { delFileByFileIdList } from '../../../common/file/gridfs/controller';
import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { type ClientSession } from '../../../common/mongo';
import type { ClientSession } from '../../../common/mongo';
import { createOrGetCollectionTags } from './utils';
import { rawText2Chunks } from '../read';
import { checkDatasetLimit } from '../../../support/permission/teamLimit';
@@ -38,20 +39,25 @@ import {
getLLMMaxChunkSize
} from '@fastgpt/global/core/dataset/training/utils';
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/constants';
import { deleteDatasetImage } from '../image/controller';
import { clearCollectionImages, removeDatasetImageExpiredTime } from '../image/utils';
export const createCollectionAndInsertData = async ({
dataset,
rawText,
relatedId,
imageIds,
createCollectionParams,
backupParse = false,
billId,
session
}: {
dataset: DatasetSchemaType;
rawText: string;
rawText?: string;
relatedId?: string;
imageIds?: string[];
createCollectionParams: CreateOneCollectionParams;
backupParse?: boolean;
billId?: string;
@@ -69,13 +75,13 @@ export const createCollectionAndInsertData = async ({
// Set default params
const trainingType =
createCollectionParams.trainingType || DatasetCollectionDataProcessModeEnum.chunk;
const chunkSize = computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
});
const chunkSplitter = computeChunkSplitter(createCollectionParams);
const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams);
const trainingMode = getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
});
if (
trainingType === DatasetCollectionDataProcessModeEnum.qa ||
@@ -90,35 +96,60 @@ export const createCollectionAndInsertData = async ({
delete createCollectionParams.qaPrompt;
}
// 1. split chunks
const chunks = rawText2Chunks({
rawText,
chunkTriggerType: createCollectionParams.chunkTriggerType,
chunkTriggerMinSize: createCollectionParams.chunkTriggerMinSize,
chunkSize,
paragraphChunkDeep,
paragraphChunkMinSize: createCollectionParams.paragraphChunkMinSize,
maxSize: getLLMMaxChunkSize(getLLMModel(dataset.agentModel)),
overlapRatio: trainingType === DatasetCollectionDataProcessModeEnum.chunk ? 0.2 : 0,
customReg: chunkSplitter ? [chunkSplitter] : [],
backupParse
});
// 1. split chunks or create image chunks
const {
chunks,
chunkSize
}: {
chunks: Array<{
q?: string;
a?: string; // answer or custom content
imageId?: string;
indexes?: string[];
}>;
chunkSize?: number;
} = (() => {
if (rawText) {
const chunkSize = computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
});
// Process text chunks
const chunks = rawText2Chunks({
rawText,
chunkTriggerType: createCollectionParams.chunkTriggerType,
chunkTriggerMinSize: createCollectionParams.chunkTriggerMinSize,
chunkSize,
paragraphChunkDeep,
paragraphChunkMinSize: createCollectionParams.paragraphChunkMinSize,
maxSize: getLLMMaxChunkSize(getLLMModel(dataset.agentModel)),
overlapRatio: trainingType === DatasetCollectionDataProcessModeEnum.chunk ? 0.2 : 0,
customReg: chunkSplitter ? [chunkSplitter] : [],
backupParse
});
return { chunks, chunkSize };
}
if (imageIds) {
// Process image chunks
const chunks = imageIds.map((imageId: string) => ({
imageId,
indexes: []
}));
return { chunks };
}
throw new Error('Either rawText or imageIdList must be provided');
})();
// 2. auth limit
await checkDatasetLimit({
teamId,
insertLen: predictDataLimitLength(
getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
}),
chunks
)
insertLen: predictDataLimitLength(trainingMode, chunks)
});
const fn = async (session: ClientSession) => {
// 3. create collection
// 3. Create collection
const { _id: collectionId } = await createOneCollection({
...createCollectionParams,
trainingType,
@@ -126,8 +157,8 @@ export const createCollectionAndInsertData = async ({
chunkSize,
chunkSplitter,
hashRawText: hashStr(rawText),
rawTextLength: rawText.length,
hashRawText: rawText ? hashStr(rawText) : undefined,
rawTextLength: rawText?.length,
nextSyncTime: (() => {
// ignore auto collections sync for website datasets
if (!dataset.autoSync && dataset.type === DatasetTypeEnum.websiteDataset) return undefined;
@@ -169,11 +200,7 @@ export const createCollectionAndInsertData = async ({
vectorModel: dataset.vectorModel,
vlmModel: dataset.vlmModel,
indexSize: createCollectionParams.indexSize,
mode: getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
}),
mode: trainingMode,
prompt: createCollectionParams.qaPrompt,
billId: traingBillId,
data: chunks.map((item, index) => ({
@@ -187,7 +214,12 @@ export const createCollectionAndInsertData = async ({
session
});
// 6. remove related image ttl
// 6. Remove images ttl index
await removeDatasetImageExpiredTime({
ids: imageIds,
collectionId,
session
});
if (relatedId) {
await MongoImage.updateMany(
{
@@ -207,7 +239,7 @@ export const createCollectionAndInsertData = async ({
}
return {
collectionId,
collectionId: String(collectionId),
insertResults
};
};
@@ -288,17 +320,20 @@ export const delCollectionRelatedSource = async ({
.map((item) => item?.metadata?.relatedImgId || '')
.filter(Boolean);
// Delete files
await delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList
});
// Delete images
await delImgByRelatedId({
teamId,
relateIds: relatedImageIds,
session
});
// Delete files and images in parallel
await Promise.all([
// Delete files
delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList
}),
// Delete images
delImgByRelatedId({
teamId,
relateIds: relatedImageIds,
session
})
]);
};
/**
* delete collection and it related data
@@ -343,16 +378,16 @@ export async function delCollection({
datasetId: { $in: datasetIds },
collectionId: { $in: collectionIds }
}),
// Delete dataset_images
clearCollectionImages(collectionIds),
// Delete images if needed
...(delImg
? [
delImgByRelatedId({
teamId,
relateIds: collections
.map((item) => item?.metadata?.relatedImgId || '')
.filter(Boolean)
})
]
? collections
.map((item) => item?.metadata?.relatedImgId || '')
.filter(Boolean)
.map((imageId) => deleteDatasetImage(imageId))
: []),
// Delete files if needed
...(delFile
? [
delFileByFileIdList({

View File

@@ -1,11 +1,9 @@
import { MongoDatasetCollection } from './schema';
import { type ClientSession } from '../../../common/mongo';
import type { ClientSession } from '../../../common/mongo';
import { MongoDatasetCollectionTags } from '../tag/schema';
import { readFromSecondary } from '../../../common/mongo/utils';
import {
type CollectionWithDatasetType,
type DatasetCollectionSchemaType
} from '@fastgpt/global/core/dataset/type';
import type { CollectionWithDatasetType } from '@fastgpt/global/core/dataset/type';
import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type';
import {
DatasetCollectionDataProcessModeEnum,
DatasetCollectionSyncResultEnum,
@@ -233,18 +231,37 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
QA: 独立进程
Chunk: Image Index -> Auto index -> chunk index
*/
export const getTrainingModeByCollection = (collection: {
trainingType: DatasetCollectionSchemaType['trainingType'];
autoIndexes?: DatasetCollectionSchemaType['autoIndexes'];
imageIndex?: DatasetCollectionSchemaType['imageIndex'];
export const getTrainingModeByCollection = ({
trainingType,
autoIndexes,
imageIndex
}: {
trainingType: DatasetCollectionDataProcessModeEnum;
autoIndexes?: boolean;
imageIndex?: boolean;
}) => {
if (collection.trainingType === DatasetCollectionDataProcessModeEnum.qa) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.imageParse &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.imageParse;
}
if (trainingType === DatasetCollectionDataProcessModeEnum.qa) {
return TrainingModeEnum.qa;
}
if (collection.imageIndex && global.feConfigs?.isPlus) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.chunk &&
imageIndex &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.image;
}
if (collection.autoIndexes && global.feConfigs?.isPlus) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.chunk &&
autoIndexes &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.auto;
}
return TrainingModeEnum.chunk;

View File

@@ -9,6 +9,7 @@ import { deleteDatasetDataVector } from '../../common/vectorDB/controller';
import { MongoDatasetDataText } from './data/dataTextSchema';
import { DatasetErrEnum } from '@fastgpt/global/common/error/code/dataset';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { clearDatasetImages } from './image/utils';
/* ============= dataset ========== */
/* find all datasetId by top datasetId */
@@ -102,8 +103,10 @@ export async function delDatasetRelevantData({
}),
//delete dataset_datas
MongoDatasetData.deleteMany({ teamId, datasetId: { $in: datasetIds } }),
// Delete Image and file
// Delete collection image and file
delCollectionRelatedSource({ collections }),
// Delete dataset Image
clearDatasetImages(datasetIds),
// Delete vector data
deleteDatasetDataVector({ teamId, datasetIds })
]);

View File

@@ -0,0 +1,57 @@
import { getDatasetImagePreviewUrl } from '../image/utils';
import type { QuoteDataItemType } from '../../../../../projects/app/src/service/core/chat/constants';
import type { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type';
export const formatDatasetDataValue = ({
q,
a,
imageId,
teamId,
datasetId
}: {
q: string;
a?: string;
imageId?: string;
teamId: string;
datasetId: string;
}): {
q: string;
a?: string;
imagePreivewUrl?: string;
} => {
if (!imageId) {
return {
q,
a
};
}
const previewUrl = getDatasetImagePreviewUrl({
imageId,
teamId,
datasetId,
expiredMinutes: 60 * 24 * 7 // 7 days
});
return {
q: `![${q.replaceAll('\n', '\\n')}](${previewUrl})`,
a,
imagePreivewUrl: previewUrl
};
};
export const getFormatDatasetCiteList = (list: DatasetDataSchemaType[]) => {
return list.map<QuoteDataItemType>((item) => ({
_id: item._id,
...formatDatasetDataValue({
teamId: item.teamId,
datasetId: item.datasetId,
q: item.q,
a: item.a,
imageId: item.imageId
}),
history: item.history,
updateTime: item.updateTime,
index: item.chunkIndex
}));
};

View File

@@ -37,8 +37,7 @@ const DatasetDataSchema = new Schema({
required: true
},
a: {
type: String,
default: ''
type: String
},
history: {
type: [
@@ -74,6 +73,9 @@ const DatasetDataSchema = new Schema({
default: []
},
imageId: {
type: String
},
updateTime: {
type: Date,
default: () => new Date()

View File

@@ -0,0 +1,166 @@
import { addMinutes } from 'date-fns';
import { bucketName, MongoDatasetImageSchema } from './schema';
import { connectionMongo, Types } from '../../../common/mongo';
import fs from 'fs';
import type { FileType } from '../../../common/file/multer';
import fsp from 'fs/promises';
import { computeGridFsChunSize } from '../../../common/file/gridfs/utils';
import { setCron } from '../../../common/system/cron';
import { checkTimerLock } from '../../../common/system/timerLock/utils';
import { TimerIdEnum } from '../../../common/system/timerLock/constants';
import { addLog } from '../../../common/system/log';
const getGridBucket = () => {
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
bucketName: bucketName
});
};
export const createDatasetImage = async ({
teamId,
datasetId,
file,
expiredTime = addMinutes(new Date(), 30)
}: {
teamId: string;
datasetId: string;
file: FileType;
expiredTime?: Date;
}): Promise<{ imageId: string; previewUrl: string }> => {
const path = file.path;
const gridBucket = getGridBucket();
const metadata = {
teamId: String(teamId),
datasetId: String(datasetId),
expiredTime
};
const stats = await fsp.stat(path);
if (!stats.isFile()) return Promise.reject(`${path} is not a file`);
const readStream = fs.createReadStream(path, {
highWaterMark: 256 * 1024
});
const chunkSizeBytes = computeGridFsChunSize(stats.size);
const stream = gridBucket.openUploadStream(file.originalname, {
metadata,
contentType: file.mimetype,
chunkSizeBytes
});
// save to gridfs
await new Promise((resolve, reject) => {
readStream
.pipe(stream as any)
.on('finish', resolve)
.on('error', reject);
});
return {
imageId: String(stream.id),
previewUrl: ''
};
};
export const getDatasetImageReadData = async (imageId: string) => {
// Get file metadata to get contentType
const fileInfo = await MongoDatasetImageSchema.findOne({
_id: new Types.ObjectId(imageId)
}).lean();
if (!fileInfo) {
return Promise.reject('Image not found');
}
const gridBucket = getGridBucket();
return {
stream: gridBucket.openDownloadStream(new Types.ObjectId(imageId)),
fileInfo
};
};
export const getDatasetImageBase64 = async (imageId: string) => {
// Get file metadata to get contentType
const fileInfo = await MongoDatasetImageSchema.findOne({
_id: new Types.ObjectId(imageId)
}).lean();
if (!fileInfo) {
return Promise.reject('Image not found');
}
// Get image stream from GridFS
const { stream } = await getDatasetImageReadData(imageId);
// Convert stream to buffer
const chunks: Buffer[] = [];
return new Promise<string>((resolve, reject) => {
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', () => {
// Combine all chunks into a single buffer
const buffer = Buffer.concat(chunks);
// Convert buffer to base64 string
const base64 = buffer.toString('base64');
const dataUrl = `data:${fileInfo.contentType || 'image/jpeg'};base64,${base64}`;
resolve(dataUrl);
});
stream.on('error', reject);
});
};
export const deleteDatasetImage = async (imageId: string) => {
const gridBucket = getGridBucket();
try {
await gridBucket.delete(new Types.ObjectId(imageId));
} catch (error: any) {
const msg = error?.message;
if (msg.includes('File not found')) {
addLog.warn('Delete dataset image error', error);
return;
} else {
return Promise.reject(error);
}
}
};
export const clearExpiredDatasetImageCron = async () => {
const gridBucket = getGridBucket();
const clearExpiredDatasetImages = async () => {
addLog.debug('Clear expired dataset image start');
const data = await MongoDatasetImageSchema.find(
{
'metadata.expiredTime': { $lt: new Date() }
},
'_id'
).lean();
for (const item of data) {
try {
await gridBucket.delete(item._id);
} catch (error) {
addLog.error('Delete expired dataset image error', error);
}
}
addLog.debug('Clear expired dataset image end');
};
setCron('*/10 * * * *', async () => {
if (
await checkTimerLock({
timerId: TimerIdEnum.clearExpiredDatasetImage,
lockMinuted: 9
})
) {
try {
await clearExpiredDatasetImages();
} catch (error) {
addLog.error('clearExpiredDatasetImageCron error', error);
}
}
});
};

View File

@@ -0,0 +1,36 @@
import type { Types } from '../../../common/mongo';
import { getMongoModel, Schema } from '../../../common/mongo';
export const bucketName = 'dataset_image';
const MongoDatasetImage = new Schema({
length: { type: Number, required: true },
chunkSize: { type: Number, required: true },
uploadDate: { type: Date, required: true },
filename: { type: String, required: true },
contentType: { type: String, required: true },
metadata: {
teamId: { type: String, required: true },
datasetId: { type: String, required: true },
collectionId: { type: String },
expiredTime: { type: Date, required: true }
}
});
MongoDatasetImage.index({ 'metadata.datasetId': 'hashed' });
MongoDatasetImage.index({ 'metadata.collectionId': 'hashed' });
MongoDatasetImage.index({ 'metadata.expiredTime': -1 });
export const MongoDatasetImageSchema = getMongoModel<{
_id: Types.ObjectId;
length: number;
chunkSize: number;
uploadDate: Date;
filename: string;
contentType: string;
metadata: {
teamId: string;
datasetId: string;
collectionId: string;
expiredTime: Date;
};
}>(`${bucketName}.files`, MongoDatasetImage);

View File

@@ -0,0 +1,101 @@
import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode';
import { Types, type ClientSession } from '../../../common/mongo';
import { deleteDatasetImage } from './controller';
import { MongoDatasetImageSchema } from './schema';
import { addMinutes } from 'date-fns';
import jwt from 'jsonwebtoken';
export const removeDatasetImageExpiredTime = async ({
ids = [],
collectionId,
session
}: {
ids?: string[];
collectionId: string;
session?: ClientSession;
}) => {
if (ids.length === 0) return;
return MongoDatasetImageSchema.updateMany(
{
_id: {
$in: ids
.filter((id) => Types.ObjectId.isValid(id))
.map((id) => (typeof id === 'string' ? new Types.ObjectId(id) : id))
}
},
{
$unset: { 'metadata.expiredTime': '' },
$set: {
'metadata.collectionId': String(collectionId)
}
},
{ session }
);
};
export const getDatasetImagePreviewUrl = ({
imageId,
teamId,
datasetId,
expiredMinutes
}: {
imageId: string;
teamId: string;
datasetId: string;
expiredMinutes: number;
}) => {
const expiredTime = Math.floor(addMinutes(new Date(), expiredMinutes).getTime() / 1000);
const key = (process.env.FILE_TOKEN_KEY as string) ?? 'filetoken';
const token = jwt.sign(
{
teamId: String(teamId),
datasetId: String(datasetId),
exp: expiredTime
},
key
);
return `/api/core/dataset/image/${imageId}?token=${token}`;
};
export const authDatasetImagePreviewUrl = (token?: string) =>
new Promise<{
teamId: string;
datasetId: string;
}>((resolve, reject) => {
if (!token) {
return reject(ERROR_ENUM.unAuthFile);
}
const key = (process.env.FILE_TOKEN_KEY as string) ?? 'filetoken';
jwt.verify(token, key, (err, decoded: any) => {
if (err || !decoded?.teamId || !decoded?.datasetId) {
reject(ERROR_ENUM.unAuthFile);
return;
}
resolve({
teamId: decoded.teamId,
datasetId: decoded.datasetId
});
});
});
export const clearDatasetImages = async (datasetIds: string[]) => {
const images = await MongoDatasetImageSchema.find(
{
'metadata.datasetId': { $in: datasetIds.map((item) => String(item)) }
},
'_id'
).lean();
await Promise.all(images.map((image) => deleteDatasetImage(String(image._id))));
};
export const clearCollectionImages = async (collectionIds: string[]) => {
const images = await MongoDatasetImageSchema.find(
{
'metadata.collectionId': { $in: collectionIds.map((item) => String(item)) }
},
'_id'
).lean();
await Promise.all(images.map((image) => deleteDatasetImage(String(image._id))));
};

View File

@@ -186,9 +186,11 @@ export const rawText2Chunks = ({
chunkTriggerMinSize = 1000,
backupParse,
chunkSize = 512,
imageIdList,
...splitProps
}: {
rawText: string;
imageIdList?: string[];
chunkTriggerType?: ChunkTriggerConfigTypeEnum;
chunkTriggerMinSize?: number; // maxSize from agent model, not store
@@ -199,6 +201,7 @@ export const rawText2Chunks = ({
q: string;
a: string;
indexes?: string[];
imageIdList?: string[];
}[] => {
const parseDatasetBackup2Chunks = (rawText: string) => {
const csvArr = Papa.parse(rawText).data as string[][];
@@ -209,7 +212,8 @@ export const rawText2Chunks = ({
.map((item) => ({
q: item[0] || '',
a: item[1] || '',
indexes: item.slice(2)
indexes: item.slice(2),
imageIdList
}))
.filter((item) => item.q || item.a);
@@ -231,7 +235,8 @@ export const rawText2Chunks = ({
return [
{
q: rawText,
a: ''
a: '',
imageIdList
}
];
}
@@ -240,7 +245,7 @@ export const rawText2Chunks = ({
if (chunkTriggerType !== ChunkTriggerConfigTypeEnum.forceChunk) {
const textLength = rawText.trim().length;
if (textLength < chunkTriggerMinSize) {
return [{ q: rawText, a: '' }];
return [{ q: rawText, a: '', imageIdList }];
}
}
@@ -253,6 +258,7 @@ export const rawText2Chunks = ({
return chunks.map((item) => ({
q: item,
a: '',
indexes: []
indexes: [],
imageIdList
}));
};

View File

@@ -28,6 +28,7 @@ import type { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { datasetSearchQueryExtension } from './utils';
import type { RerankModelItemType } from '@fastgpt/global/core/ai/model.d';
import { addLog } from '../../../common/system/log';
import { formatDatasetDataValue } from '../data/controller';
export type SearchDatasetDataProps = {
histories: ChatItemType[];
@@ -175,6 +176,12 @@ export async function searchDatasetData(
collectionFilterMatch
} = props;
// Constants data
const datasetDataSelectField =
'_id datasetId collectionId updateTime q a imageId chunkIndex indexes';
const datsaetCollectionSelectField =
'_id name fileId rawLink apiFileId externalFileId externalFileUrl';
/* init params */
searchMode = DatasetSearchModeMap[searchMode] ? searchMode : DatasetSearchModeEnum.embedding;
usingReRank = usingReRank && !!getDefaultRerankModel();
@@ -463,14 +470,14 @@ export async function searchDatasetData(
collectionId: { $in: collectionIdList },
'indexes.dataId': { $in: results.map((item) => item.id?.trim()) }
},
'_id datasetId collectionId updateTime q a chunkIndex indexes',
datasetDataSelectField,
{ ...readFromSecondary }
).lean(),
MongoDatasetCollection.find(
{
_id: { $in: collectionIdList }
},
'_id name fileId rawLink apiFileId externalFileId externalFileUrl',
datsaetCollectionSelectField,
{ ...readFromSecondary }
).lean()
]);
@@ -494,8 +501,13 @@ export async function searchDatasetData(
const result: SearchDataResponseItemType = {
id: String(data._id),
updateTime: data.updateTime,
q: data.q,
a: data.a,
...formatDatasetDataValue({
teamId,
datasetId: data.datasetId,
q: data.q,
a: data.a,
imageId: data.imageId
}),
chunkIndex: data.chunkIndex,
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
@@ -597,14 +609,14 @@ export async function searchDatasetData(
{
_id: { $in: searchResults.map((item) => item.dataId) }
},
'_id datasetId collectionId updateTime q a chunkIndex indexes',
datasetDataSelectField,
{ ...readFromSecondary }
).lean(),
MongoDatasetCollection.find(
{
_id: { $in: searchResults.map((item) => item.collectionId) }
},
'_id name fileId rawLink apiFileId externalFileId externalFileUrl',
datsaetCollectionSelectField,
{ ...readFromSecondary }
).lean()
]);
@@ -630,8 +642,13 @@ export async function searchDatasetData(
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
updateTime: data.updateTime,
q: data.q,
a: data.a,
...formatDatasetDataValue({
teamId,
datasetId: data.datasetId,
q: data.q,
a: data.a,
imageId: data.imageId
}),
chunkIndex: data.chunkIndex,
indexes: data.indexes,
...getCollectionSourceData(collection),

View File

@@ -12,10 +12,7 @@ import { getCollectionWithDataset } from '../controller';
import { mongoSessionRun } from '../../../common/mongo/sessionRun';
import { type PushDataToTrainingQueueProps } from '@fastgpt/global/core/dataset/training/type';
import { i18nT } from '../../../../web/i18n/utils';
import {
getLLMDefaultChunkSize,
getLLMMaxChunkSize
} from '../../../../global/core/dataset/training/utils';
import { getLLMMaxChunkSize } from '../../../../global/core/dataset/training/utils';
export const lockTrainingDataByTeamId = async (teamId: string): Promise<any> => {
try {
@@ -65,7 +62,7 @@ export async function pushDataListToTrainingQueue({
const getImageChunkMode = (data: PushDatasetDataChunkProps, mode: TrainingModeEnum) => {
if (mode !== TrainingModeEnum.image) return mode;
// 检查内容中,是否包含 ![](xxx) 的图片格式
const text = data.q + data.a || '';
const text = (data.q || '') + (data.a || '');
const regex = /!\[\]\((.*?)\)/g;
const match = text.match(regex);
if (match) {
@@ -82,9 +79,6 @@ export async function pushDataListToTrainingQueue({
if (!agentModelData) {
return Promise.reject(i18nT('common:error_llm_not_config'));
}
if (mode === TrainingModeEnum.chunk || mode === TrainingModeEnum.auto) {
prompt = undefined;
}
const { model, maxToken, weight } = await (async () => {
if (mode === TrainingModeEnum.chunk) {
@@ -101,7 +95,7 @@ export async function pushDataListToTrainingQueue({
weight: 0
};
}
if (mode === TrainingModeEnum.image) {
if (mode === TrainingModeEnum.image || mode === TrainingModeEnum.imageParse) {
const vllmModelData = getVlmModel(vlmModel);
if (!vllmModelData) {
return Promise.reject(i18nT('common:error_vlm_not_config'));
@@ -117,11 +111,9 @@ export async function pushDataListToTrainingQueue({
})();
// filter repeat or equal content
const set = new Set();
const filterResult: Record<string, PushDatasetDataChunkProps[]> = {
success: [],
overToken: [],
repeat: [],
error: []
};
@@ -140,7 +132,7 @@ export async function pushDataListToTrainingQueue({
.filter(Boolean);
// filter repeat content
if (!item.q) {
if (!item.imageId && !item.q) {
filterResult.error.push(item);
return;
}
@@ -153,32 +145,26 @@ export async function pushDataListToTrainingQueue({
return;
}
if (set.has(text)) {
filterResult.repeat.push(item);
} else {
filterResult.success.push(item);
set.add(text);
}
filterResult.success.push(item);
});
// insert data to db
const insertLen = filterResult.success.length;
const failedDocuments: PushDatasetDataChunkProps[] = [];
// 使用 insertMany 批量插入
const batchSize = 200;
const batchSize = 500;
const insertData = async (startIndex: number, session: ClientSession) => {
const list = filterResult.success.slice(startIndex, startIndex + batchSize);
if (list.length === 0) return;
try {
await MongoDatasetTraining.insertMany(
const result = await MongoDatasetTraining.insertMany(
list.map((item) => ({
teamId,
tmbId,
datasetId,
collectionId,
datasetId: datasetId,
collectionId: collectionId,
billId,
mode: getImageChunkMode(item, mode),
prompt,
@@ -189,25 +175,25 @@ export async function pushDataListToTrainingQueue({
indexSize,
weight: weight ?? 0,
indexes: item.indexes,
retryCount: 5
retryCount: 5,
...(item.imageId ? { imageId: item.imageId } : {})
})),
{
session,
ordered: true
ordered: false,
rawResult: true,
includeResultMetadata: false // 进一步减少返回数据
}
);
if (result.insertedCount !== list.length) {
return Promise.reject(`Insert data error, ${JSON.stringify(result)}`);
}
} catch (error: any) {
addLog.error(`Insert error`, error);
// 如果有错误,将失败的文档添加到失败列表中
error.writeErrors?.forEach((writeError: any) => {
failedDocuments.push(data[writeError.index]);
});
console.log('failed', failedDocuments);
return Promise.reject(error);
}
// 对于失败的文档,尝试单独插入
await MongoDatasetTraining.create(failedDocuments, { session });
return insertData(startIndex + batchSize, session);
};
@@ -222,7 +208,6 @@ export async function pushDataListToTrainingQueue({
delete filterResult.success;
return {
insertLen,
...filterResult
insertLen
};
}

View File

@@ -99,6 +99,9 @@ const TrainingDataSchema = new Schema({
],
default: []
},
imageId: {
type: String
},
errorMsg: String
});

View File

@@ -358,7 +358,7 @@ async function filterDatasetQuote({
return replaceVariable(quoteTemplate, {
id: item.id,
q: item.q,
a: item.a,
a: item.a || '',
updateTime: formatTime2YMDHM(item.updateTime),
source: item.sourceName,
sourceId: String(item.sourceId || ''),