@@ -52,6 +52,29 @@ export class VectorManager {
)
}
// 强制垃圾回收的辅助方法
private forceGarbageCollection() {
try {
if ( typeof global !== 'undefined' && global . gc ) {
global . gc ( )
} else if ( typeof window !== 'undefined' && ( window as any ) . gc ) {
( window as any ) . gc ( )
}
} catch ( e ) {
// 忽略垃圾回收错误
}
}
// 检查并清理内存的辅助方法
private async memoryCleanup ( batchCount : number ) {
// 每10批次强制垃圾回收
if ( batchCount % 10 === 0 ) {
this . forceGarbageCollection ( )
// 短暂延迟让内存清理完成
await new Promise ( resolve = > setTimeout ( resolve , 100 ) )
}
}
async updateVaultIndex (
embeddingModel : EmbeddingModel ,
options : {
@@ -100,29 +123,43 @@ export class VectorManager {
} ,
)
const skippedFiles : string [ ] = [ ]
const contentChunks : InsertVector [ ] = (
await Promise . all (
filesToIndex . map ( async ( file ) = > {
const fileContent = await this . app . vault . cachedRead ( file )
cons t fileDocum ents = await textSplitter . createDocuments ( [
fileContent ,
] )
return fileDocuments . map ( ( chunk ) : InsertVector = > {
return {
path : file.path ,
mtime : file.stat.mtime ,
content : chunk.pageContent ,
embedding : [ ] ,
metadata : {
startLine : Number ( chunk . metadata . loc . lines . from ) ,
endLine : Number ( chunk . metadata . loc . lines . to ) ,
} ,
}
} )
try {
le t fileCont ent = await this . app . vault . cachedRead ( file )
// 清理null字节, 防止PostgreSQL UTF8编码错误
fileContent = fileContent . replace ( /\0/g , '' )
const fileDocuments = await textSplitter . createDocuments ( [
fileContent ,
] )
return fileDocuments . map ( ( chunk ) : InsertVector = > {
return {
path : file.path ,
mtime : file.stat.mtime ,
content : chunk.pageContent.replace ( /\0/g , '' ) , // 再次清理,确保安全
embedding : [ ] ,
metadata : {
startLine : Number ( chunk . metadata . loc . lines . from ) ,
endLine : Number ( chunk . metadata . loc . lines . to ) ,
} ,
}
} )
} catch ( error ) {
console . warn ( ` 跳过文件 ${ file . path } : ` , error . message )
skippedFiles . push ( file . path )
return [ ]
}
} ) ,
)
) . flat ( )
if ( skippedFiles . length > 0 ) {
console . warn ( ` 跳过了 ${ skippedFiles . length } 个有问题的文件: ` , skippedFiles )
new Notice ( ` 跳过了 ${ skippedFiles . length } 个有问题的文件 ` )
}
updateProgress ? . ( {
completedChunks : 0 ,
totalChunks : contentChunks.length ,
@@ -130,18 +167,22 @@ export class VectorManager {
} )
const embeddingProgress = { completed : 0 }
const embeddingChunks : InsertVector [ ] = [ ]
const insertBatchSize = 64 // 数据库插入批量大小
// 减少批量大小以降低内存压力
const insertBatchSize = 1 6 // 从64降低到16
let batchCount = 0
try {
if ( embeddingModel . supportsBatch ) {
// 支持批量处理的提供商:使用批量 处理逻辑
const embeddingBatchSize = 64 // API批量处理大小
// 支持批量处理的提供商:使用流式 处理逻辑
const embeddingBatchSize = 1 6 // 从64降低到16
for ( let i = 0 ; i < contentChunks . length ; i += embeddingBatchSize ) {
batchCount ++
const batchChunks = contentChunks . slice ( i , Math . min ( i + embeddingBatchSize , contentChunks . length ) )
const batchTexts = batchChunks . map ( chunk = > chunk . content )
const embeddedBatch : InsertVector [ ] = [ ]
await backOff (
async ( ) = > {
const batchEmbeddings = await embeddingModel . getBatchEmbeddings ( batchTexts )
@@ -155,80 +196,99 @@ export class VectorManager {
embedding : batchEmbeddings [ j ] ,
metadata : batchChunks [ j ] . metadata ,
}
embeddingChunks . push ( embeddedChunk )
embeddedBatch . push ( embeddedChunk )
}
embeddingProgress . completed += batchChunks . length
updateProgress ? . ( {
completedChunks : embeddingProgress.completed ,
totalChunks : contentChunks.length ,
totalFiles : filesToIndex.length ,
} )
} ,
{
numOfAttempts : 5 ,
startingDelay : 10 00,
numOfAttempts : 3 , // 减少重试次数
startingDelay : 5 00, // 减少延迟
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
// 立即插入当前批次,避免内存累积
if ( embeddedBatch . length > 0 ) {
await this . repository . insertVectors ( embeddedBatch , embeddingModel )
// 清理批次数据
embeddedBatch . length = 0
}
embeddingProgress . completed += batchChunks . length
updateProgress ? . ( {
completedChunks : embeddingProgress.completed ,
totalChunks : contentChunks.length ,
totalFiles : filesToIndex.length ,
} )
// 定期内存清理
await this . memoryCleanup ( batchCount )
}
} else {
// 不支持批量处理的提供商:使用原来的逐个 处理逻辑
const limit = pLimit ( 5 0)
// 不支持批量处理的提供商:使用流式 处理逻辑
const limit = pLimit ( 1 0) // 从50降低到10, 减少并发压力
const abortController = new AbortController ( )
const tasks = contentChunks . map ( ( chunk ) = >
limit ( async ( ) = > {
if ( abortController . signal . aborted ) {
throw new Error ( 'Operation was aborted' )
}
try {
await backOff (
async ( ) = > {
const embedding = await embeddingModel . getEmbedding ( chunk . content )
const embeddedChunk = {
path : chunk.path ,
mtime : chunk.mtime ,
content : chunk.content ,
embedding ,
metadata : chunk.metadata ,
}
embeddingChunks . push ( embeddedChunk )
embeddingProgress . completed ++
updateProgress ? . ( {
completedChunks : embeddingProgress.completed ,
totalChunks : contentChunks.length ,
totalFiles : filesToIndex.length ,
} )
} ,
{
numOfAttempts : 5 ,
startingDelay : 1000 ,
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
} catch ( error ) {
abortController . abort ( )
throw error
}
} ) ,
)
await Promise . all ( tasks )
}
// all embedding generated, batch insert
if ( embeddingChunks . length > 0 ) {
// batch insert all vectors
let inserted = 0
while ( inserted < embedding Chunks. length ) {
const chunksToInsert = embeddingChunks . slice (
inserted ,
Math . min ( inserted + insertBatchSize , embeddingChunks . length )
// 流式处理:分批处理并立即插入
for ( let i = 0 ; i < contentChunks . length ; i += insertBatchSize ) {
if ( abortController . signal . aborted ) {
throw new Error ( 'Operation was aborted' )
}
batchCount ++
const batchChunks = contentChunks . slice ( i , Math . min ( i + insertBatchSize , content Chunks . length ) )
const embeddedBatch : InsertVector [ ] = [ ]
const tasks = batchChunks . map ( ( chunk ) = >
limit ( async ( ) = > {
if ( abortController . signal . aborted ) {
throw new Error ( 'Operation was aborted' )
}
try {
await backOff (
async ( ) = > {
const embedding = await embeddingModel . getEmbedding ( chunk . content )
const embeddedChunk = {
path : chunk.path ,
mtime : chunk.mtime ,
content : chunk.content ,
embedding ,
metadata : chunk.metadata ,
}
embeddedBatch . push ( embeddedChunk )
} ,
{
numOfAttempts : 3 , // 减少重试次数
startingDelay : 500 , // 减少延迟
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
} catch ( error ) {
abortController . abort ( )
throw error
}
} ) ,
)
await this . repository . insertVectors ( chunksToInsert , embeddingModel )
inserted += chunksToInsert . length
await Promise . all ( tasks )
// 立即插入当前批次
if ( embeddedBatch . length > 0 ) {
await this . repository . insertVectors ( embeddedBatch , embeddingModel )
// 清理批次数据
embeddedBatch . length = 0
}
embeddingProgress . completed += batchChunks . length
updateProgress ? . ( {
completedChunks : embeddingProgress.completed ,
totalChunks : contentChunks.length ,
totalFiles : filesToIndex.length ,
} )
// 定期内存清理
await this . memoryCleanup ( batchCount )
}
}
} catch ( error ) {
@@ -244,6 +304,9 @@ export class VectorManager {
console . error ( 'Error embedding chunks:' , error )
throw error
}
} finally {
// 最终清理
this . forceGarbageCollection ( )
}
}
@@ -252,125 +315,160 @@ export class VectorManager {
chunkSize : number ,
file : TFile
) {
// Delete existing vectors for the files
await this . repository . deleteVectorsForSingleFile (
file . path ,
embeddingModel ,
)
// Embed the files
const textSplitter = RecursiveCharacterTextSplitter . fromLanguage (
'markdown' ,
{
chunkSize ,
} ,
)
const fileContent = await this . app . vault . cachedRead ( file )
const fileDocuments = await textSplitter . createDocuments ( [
fileContent ,
] )
const contentChunks : InsertVector [ ] = fileDocuments . map ( ( chunk ) : InsertVector = > {
return {
path : file.path ,
mtime : file.stat.mtime ,
content : chunk.pageContent ,
embedding : [ ] ,
metadata : {
startLine : Number ( chunk . metadata . loc . lines . from ) ,
endLine : Number ( chunk . metadata . loc . lines . to ) ,
} ,
}
} )
const embeddingChunks : InsertVector [ ] = [ ]
const insertBatchSize = 64 // 数据库插入批量大小
try {
if ( embeddingModel . supportsBatch ) {
// 支持批量处理的提供商:使用批量处理逻辑
const embeddingBatchSize = 64 // API批量处理大小
for ( let i = 0 ; i < contentChunks . length ; i += embeddingBatchSize ) {
console . log ( ` Embedding batch ${ i / embeddingBatchSize + 1 } of ${ Math . ceil ( contentChunks . length / embeddingBatchSize ) } ` )
const batchChunks = contentChunks . slice ( i , Math . min ( i + embeddingBatchSize , contentChunks . length ) )
const batchTexts = batchChunks . map ( chunk = > chunk . content )
await backOff (
async ( ) = > {
const batchEmbeddings = await embeddingModel . getBatchEmbeddings ( batchTexts )
// 合并embedding结果到chunk数据
for ( let j = 0 ; j < batchChunks . length ; j ++ ) {
const embeddedChunk : InsertVector = {
path : batchChunks [ j ] . path ,
mtime : batchChunks [ j ] . mtime ,
content : batchChunks [ j ] . content ,
embedding : batchEmbeddings [ j ] ,
metadata : batchChunks [ j ] . metadata ,
}
embeddingChunks . push ( embeddedChunk )
}
} ,
{
numOfAttempts : 5 ,
startingDelay : 1000 ,
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
// Delete existing vectors for the files
await this . repository . deleteVectorsForSingleFile (
file . path ,
embeddingModel ,
)
// Embed the files
const textSplitter = RecursiveCharacterTextSplitter . fromLanguage (
'markdown' ,
{
chunkSize ,
} ,
)
let fileContent = await this . app . vault . cachedRead ( file )
// 清理null字节, 防止PostgreSQL UTF8编码错误
fileContent = fileContent . replace ( /\0/g , '' )
const fileDocuments = await textSplitter . createDocuments ( [
fileContent ,
] )
const contentChunks : InsertVector [ ] = fileDocuments . map ( ( chunk ) : InsertVector = > {
return {
path : file.path ,
mtime : file.stat.mtime ,
content : chunk.pageContent.replace ( /\0/g , '' ) , // 再次清理,确保安全
embedding : [ ] ,
metadata : {
startL ine : Number ( chunk . metadata . loc . lines . from ) ,
endLine : Number ( chunk . metadata . loc . lines . to ) ,
} ,
}
} else {
// 不支持批量处理的提供商:使用原来的逐个处理逻辑
const limit = pLimit ( 50 )
const abortController = new AbortController ( )
const tasks = contentChunks . map ( ( chunk ) = >
limit ( async ( ) = > {
} )
// 减少批量大小以降低内存压力
const insertBatchSize = 16 // 从64降低到16
let batchCount = 0
try {
if ( embeddingModel . supportsBatch ) {
// 支持批量处理的提供商:使用流式处理逻辑
const embeddingBatchSize = 16 // 从64降低到16
for ( let i = 0 ; i < contentChunks . length ; i += embeddingBatchSize ) {
batchCount ++
console . log ( ` Embedding batch ${ batchCount } of ${ Math . ceil ( contentChunks . length / embeddingBatchSize ) } ` )
const batchChunks = contentChunks . slice ( i , Math . min ( i + embeddingBatchSize , contentChunks . length ) )
const batchTexts = batchChunks . map ( chunk = > chunk . content )
const embeddedBatch : InsertVector [ ] = [ ]
await backOff (
async ( ) = > {
const batchEmbeddings = await embeddingModel . getBatchEmbeddings ( batchTexts )
// 合并embedding结果到chunk数据
for ( let j = 0 ; j < batchChunks . length ; j ++ ) {
const embeddedChunk : InsertVector = {
path : batchChunks [ j ] . path ,
mtime : batchChunks [ j ] . mtime ,
content : batchChunks [ j ] . content ,
embedding : batchEmbeddings [ j ] ,
metadata : batchChunks [ j ] . metadata ,
}
embeddedBatch . push ( embeddedChunk )
}
} ,
{
numOfAttempts : 3 , // 减少重试次数
startingDelay : 500 , // 减少延迟
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
// 立即插入当前批次
if ( embeddedBatch . length > 0 ) {
await this . repository . insertVectors ( embeddedBatch , embeddingModel )
// 清理批次数据
embeddedBatch . length = 0
}
// 定期内存清理
await this . memoryCleanup ( batchCount )
}
} else {
// 不支持批量处理的提供商:使用流式处理逻辑
const limit = pLimit ( 10 ) // 从50降低到10
const abortController = new AbortController ( )
// 流式处理:分批处理并立即插入
for ( let i = 0 ; i < contentChunks . length ; i += insertBatchSize ) {
if ( abortController . signal . aborted ) {
throw new Error ( 'Operation was aborted' )
}
try {
await backOff (
async ( ) = > {
const embedding = await embeddingModel . getEmbedding ( chunk . content )
const embeddedChunk = {
path : chunk.path ,
mtime : chunk.mtime ,
content : chunk.content ,
embedding ,
metadata : chunk.metadata ,
}
embeddingChunks . push ( embeddedChunk )
} ,
{
numOfAttempts : 5 ,
startingDelay : 1000 ,
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
} catch ( error ) {
abortController . abort ( )
throw error
batchCount ++
const batchChunks = contentChunks . slice ( i , Math . min ( i + insertBatchSize , contentChunks . length ) )
const embeddedBatch : InsertVector [ ] = [ ]
const tasks = batchChunks . map ( ( chunk ) = >
limit ( async ( ) = > {
if ( abortController . signal . aborted ) {
throw new Error ( 'Operation was aborted' )
}
try {
await backOff (
async ( ) = > {
const embedding = await embeddingModel . getEmbedding ( chunk . content )
const embeddedChunk = {
path : chunk.path ,
mtime : chunk.mtime ,
content : chunk.content ,
embedding ,
metadata : chunk.metadata ,
}
embeddedBatch . push ( embeddedChunk )
} ,
{
numOfAttempts : 3 , // 减少重试次数
startingDelay : 500 , // 减少延迟
timeMultiple : 1.5 ,
jitter : 'full' ,
} ,
)
} catch ( error ) {
abortController . abort ( )
throw error
}
} ) ,
)
await Promise . all ( tasks )
// 立即插入当前批次
if ( embeddedBatch . length > 0 ) {
await this . repository . insertVectors ( embeddedBatch , embeddingModel )
// 清理批次数据
embeddedBatch . length = 0
}
} ) ,
)
await Promise . all ( tasks )
}
// all embedding generated, batch insert
if ( embeddingChunks . length > 0 ) {
let inserted = 0
while ( inserted < embeddingChunks . length ) {
const chunksToInsert = embeddingChunks . slice ( inserted , Math . min ( inserted + insertBatchSize , embeddingChunks . length ) )
await this . repository . insertVectors ( chunksToInsert , embeddingModel )
inserted += chunksToInsert . length
// 定期内存清理
await this . m emoryCleanup ( batchCount )
}
}
} catch ( error ) {
console . error ( 'Error embedding chunks:' , error )
} finally {
// 最终清理
this . forceGarbageCollection ( )
}
} catch ( error ) {
console . error ( 'Error embedding chunks:' , error )
console . warn ( ` 跳过文件 ${ file . path } : ` , error . message )
new Notice ( ` 跳过文件 ${ file . name } : ${ error . message } ` )
}
}
@@ -424,25 +522,32 @@ export class VectorManager {
// Check for updated or new files
filesToIndex = await Promise . all (
filesToIndex . map ( async ( file ) = > {
const fileChunks = await this . repository . getVectorsByFilePath (
file . p ath,
embeddingModel ,
)
if ( fileChunks . length === 0 ) {
// File is not indexed, so we need to index it
const fileContent = await this . app . vault . cachedRead ( file )
if ( fileContent . length === 0 ) {
// Ignore empty files
return null
try {
const fileChunks = await this . repository . getVectorsByFileP ath(
file . path ,
embeddingModel ,
)
if ( fileChunks . length === 0 ) {
// File is not indexed, so we need to index it
let fileContent = await this . app . vault . cachedRead ( file )
// 清理null字节, 防止PostgreSQL UTF8编码错误
fileContent = fileContent . replace ( /\0/g , '' )
if ( fileContent . length === 0 ) {
// Ignore empty files
return null
}
return file
}
return fil e
const outOfDate = file . stat . mtime > fileChunks [ 0 ] . mtim e
if ( outOfDate ) {
// File has changed, so we need to re-index it
return file
}
return null
} catch ( error ) {
console . warn ( ` 跳过文件 ${ file . path } : ` , error . message )
return null
}
const outOfDate = file . stat . mtime > fileChunks [ 0 ] . mtime
if ( outOfDate ) {
// File has changed, so we need to re-index it
return file
}
return null
} ) ,
) . then ( ( files ) = > files . filter ( Boolean ) )