website sync feature (#4429)

* perf: introduce BullMQ for website sync (#4403)

* perf: introduce BullMQ for website sync

* feat: new redis module

* fix: remove graceful shutdown

* perf: improve UI in dataset detail

- Updated the "change" icon SVG file.
- Modified i18n strings.
- Added new i18n string "immediate_sync".
- Improved UI in dataset detail page, including button icons and
background colors.

* refactor: Add chunkSettings to DatasetSchema

* perf: website sync ux

* env template

* fix: clean up website dataset when updating chunk settings (#4420)

* perf: check setting updated

* perf: worker currency

* feat: init script for website sync refactor (#4425)

* website feature doc

---------

Co-authored-by: a.e. <49438478+I-Info@users.noreply.github.com>
This commit is contained in:
Archer
2025-04-02 13:51:58 +08:00
committed by archer
parent e54fe1eed6
commit d171b2d3d8
46 changed files with 1607 additions and 680 deletions

View File

@@ -1,6 +1,7 @@
import {
DatasetCollectionTypeEnum,
DatasetCollectionDataProcessModeEnum
DatasetCollectionDataProcessModeEnum,
DatasetTypeEnum
} from '@fastgpt/global/core/dataset/constants';
import type { CreateDatasetCollectionParams } from '@fastgpt/global/core/dataset/api.d';
import { MongoDatasetCollection } from './schema';
@@ -104,7 +105,8 @@ export const createCollectionAndInsertData = async ({
hashRawText: hashStr(rawText),
rawTextLength: rawText.length,
nextSyncTime: (() => {
if (!dataset.autoSync) return undefined;
// ignore auto collections sync for website datasets
if (!dataset.autoSync && dataset.type === DatasetTypeEnum.websiteDataset) return undefined;
if (
[DatasetCollectionTypeEnum.link, DatasetCollectionTypeEnum.apiFile].includes(
createCollectionParams.type

View File

@@ -1,13 +1,8 @@
import { connectionMongo, getMongoModel } from '../../../common/mongo';
const { Schema, model, models } = connectionMongo;
const { Schema } = connectionMongo;
import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type.d';
import {
DatasetCollectionTypeMap,
DatasetCollectionDataProcessModeEnum,
ChunkSettingModeEnum,
DataChunkSplitModeEnum
} from '@fastgpt/global/core/dataset/constants';
import { DatasetCollectionName } from '../schema';
import { DatasetCollectionTypeMap } from '@fastgpt/global/core/dataset/constants';
import { ChunkSettings, DatasetCollectionName } from '../schema';
import {
TeamCollectionName,
TeamMemberCollectionName
@@ -90,25 +85,7 @@ const DatasetCollectionSchema = new Schema({
customPdfParse: Boolean,
// Chunk settings
imageIndex: Boolean,
autoIndexes: Boolean,
trainingType: {
type: String,
enum: Object.values(DatasetCollectionDataProcessModeEnum)
},
chunkSettingMode: {
type: String,
enum: Object.values(ChunkSettingModeEnum)
},
chunkSplitMode: {
type: String,
enum: Object.values(DataChunkSplitModeEnum)
},
chunkSize: Number,
chunkSplitter: String,
indexSize: Number,
qaPrompt: String
...ChunkSettings
});
DatasetCollectionSchema.virtual('dataset', {

View File

@@ -9,6 +9,8 @@ import { deleteDatasetDataVector } from '../../common/vectorStore/controller';
import { MongoDatasetDataText } from './data/dataTextSchema';
import { DatasetErrEnum } from '@fastgpt/global/common/error/code/dataset';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { removeWebsiteSyncJobScheduler } from './websiteSync';
import { DatasetTypeEnum } from '@fastgpt/global/core/dataset/constants';
/* ============= dataset ========== */
/* find all datasetId by top datasetId */

View File

@@ -1,7 +1,8 @@
import { getMongoModel, Schema } from '../../common/mongo';
import {
DatasetStatusEnum,
DatasetStatusMap,
ChunkSettingModeEnum,
DataChunkSplitModeEnum,
DatasetCollectionDataProcessModeEnum,
DatasetTypeEnum,
DatasetTypeMap
} from '@fastgpt/global/core/dataset/constants';
@@ -13,6 +14,28 @@ import type { DatasetSchemaType } from '@fastgpt/global/core/dataset/type.d';
export const DatasetCollectionName = 'datasets';
export const ChunkSettings = {
imageIndex: Boolean,
autoIndexes: Boolean,
trainingType: {
type: String,
enum: Object.values(DatasetCollectionDataProcessModeEnum)
},
chunkSettingMode: {
type: String,
enum: Object.values(ChunkSettingModeEnum)
},
chunkSplitMode: {
type: String,
enum: Object.values(DataChunkSplitModeEnum)
},
chunkSize: Number,
chunkSplitter: String,
indexSize: Number,
qaPrompt: String
};
const DatasetSchema = new Schema({
parentId: {
type: Schema.Types.ObjectId,
@@ -40,11 +63,6 @@ const DatasetSchema = new Schema({
required: true,
default: DatasetTypeEnum.dataset
},
status: {
type: String,
enum: Object.keys(DatasetStatusMap),
default: DatasetStatusEnum.active
},
avatar: {
type: String,
default: '/icon/logo.svg'
@@ -84,6 +102,9 @@ const DatasetSchema = new Schema({
}
}
},
chunkSettings: {
type: ChunkSettings
},
inheritPermission: {
type: Boolean,
default: true
@@ -98,9 +119,8 @@ const DatasetSchema = new Schema({
type: Object
},
autoSync: Boolean,
// abandoned
autoSync: Boolean,
externalReadUrl: {
type: String
},

View File

@@ -0,0 +1,80 @@
import { Processor } from 'bullmq';
import { getQueue, getWorker, QueueNames } from '../../../common/bullmq';
import { DatasetStatusEnum } from '@fastgpt/global/core/dataset/constants';
export type WebsiteSyncJobData = {
datasetId: string;
};
export const websiteSyncQueue = getQueue<WebsiteSyncJobData>(QueueNames.websiteSync, {
defaultJobOptions: {
attempts: 3, // retry 3 times
backoff: {
type: 'exponential',
delay: 1000 // delay 1 second between retries
}
}
});
export const getWebsiteSyncWorker = (processor: Processor<WebsiteSyncJobData>) => {
return getWorker<WebsiteSyncJobData>(QueueNames.websiteSync, processor, {
removeOnFail: {
age: 15 * 24 * 60 * 60, // Keep up to 15 days
count: 1000 // Keep up to 1000 jobs
},
concurrency: 1 // Set worker to process only 1 job at a time
});
};
export const addWebsiteSyncJob = (data: WebsiteSyncJobData) => {
const datasetId = String(data.datasetId);
// deduplication: make sure only 1 job
return websiteSyncQueue.add(datasetId, data, { deduplication: { id: datasetId } });
};
export const getWebsiteSyncDatasetStatus = async (datasetId: string) => {
const jobId = await websiteSyncQueue.getDeduplicationJobId(datasetId);
if (!jobId) {
return DatasetStatusEnum.active;
}
const job = await websiteSyncQueue.getJob(jobId);
if (!job) {
return DatasetStatusEnum.active;
}
const jobState = await job.getState();
if (['waiting-children', 'waiting'].includes(jobState)) {
return DatasetStatusEnum.waiting;
}
if (jobState === 'active') {
return DatasetStatusEnum.syncing;
}
return DatasetStatusEnum.active;
};
// Scheduler setting
const repeatDuration = 24 * 60 * 60 * 1000; // every day
export const upsertWebsiteSyncJobScheduler = (data: WebsiteSyncJobData, startDate?: number) => {
const datasetId = String(data.datasetId);
return websiteSyncQueue.upsertJobScheduler(
datasetId,
{
every: repeatDuration,
startDate: startDate || new Date().getTime() + repeatDuration // First run tomorrow
},
{
name: datasetId,
data
}
);
};
export const getWebsiteSyncJobScheduler = (datasetId: string) => {
return websiteSyncQueue.getJobScheduler(String(datasetId));
};
export const removeWebsiteSyncJobScheduler = (datasetId: string) => {
return websiteSyncQueue.removeJobScheduler(String(datasetId));
};