feat: pg引入

This commit is contained in:
archer
2023-04-18 22:35:55 +08:00
parent a540ee944a
commit 9e951fbc15
17 changed files with 260 additions and 150 deletions

View File

@@ -6,12 +6,9 @@ import type { ChatCompletionRequestMessage } from 'openai';
import { ChatModelNameEnum } from '@/constants/model';
import { pushSplitDataBill } from '@/service/events/pushBill';
import { generateVector } from './generateVector';
import { connectRedis } from '../redis';
import { VecModelDataPrefix } from '@/constants/redis';
import { customAlphabet } from 'nanoid';
import { openaiError2 } from '../errorCode';
import { connectPg } from '@/service/pg';
import { ModelSplitDataSchema } from '@/types/mongoSchema';
const nanoid = customAlphabet('abcdefghijklmnopqrstuvwxyz1234567890', 12);
export async function generateQA(next = false): Promise<any> {
if (process.env.queueTask !== '1') {
@@ -25,7 +22,7 @@ export async function generateQA(next = false): Promise<any> {
let dataId = null;
try {
const redis = await connectRedis();
const pg = await connectPg();
// 找出一个需要生成的 dataItem
const data = await SplitData.aggregate([
{ $match: { textList: { $exists: true, $ne: [] } } },
@@ -139,23 +136,17 @@ export async function generateQA(next = false): Promise<any> {
SplitData.findByIdAndUpdate(dataItem._id, {
textList: dataItem.textList.slice(0, -5)
}), // 删掉后5个数据
...resultList.map((item) => {
// 插入 redis
return redis.sendCommand([
'HMSET',
`${VecModelDataPrefix}:${nanoid()}`,
'userId',
String(dataItem.userId),
'modelId',
String(dataItem.modelId),
'q',
item.q,
'text',
item.a,
'status',
'waiting'
]);
})
// 生成的内容插入 pg
pg.query(`INSERT INTO modelData (user_id, model_id, q, a, status) VALUES ${resultList
.map(
(item) =>
`('${String(dataItem.userId)}', '${String(dataItem.modelId)}', '${item.q.replace(
/\'/g,
'"'
)}', '${item.a.replace(/\'/g, '"')}', 'waiting')`
)
.join(',')}
`)
]);
console.log('生成QA成功time:', `${(Date.now() - startTime) / 1000}s`);

View File

@@ -1,9 +1,8 @@
import { connectRedis } from '../redis';
import { VecModelDataIdx } from '@/constants/redis';
import { vectorToBuffer } from '@/utils/tools';
import { ModelDataStatusEnum } from '@/constants/redis';
import { openaiCreateEmbedding, getOpenApiKey } from '../utils/openai';
import { openaiError2 } from '../errorCode';
import { connectPg } from '@/service/pg';
import type { PgModelDataItemType } from '@/types/pg';
export async function generateVector(next = false): Promise<any> {
if (process.env.queueTask !== '1') {
@@ -15,32 +14,27 @@ export async function generateVector(next = false): Promise<any> {
global.generatingVector = true;
let dataId = null;
try {
const redis = await connectRedis();
const pg = await connectPg();
// 从找出一个 status = waiting 的数据
const searchRes = await redis.ft.search(
VecModelDataIdx,
`@status:{${ModelDataStatusEnum.waiting}}`,
{
RETURN: ['q', 'userId'],
LIMIT: {
from: 0,
size: 1
}
}
);
const searchRes = await pg.query<PgModelDataItemType>(`SELECT id, q, user_id
FROM modelData
WHERE status='waiting'
LIMIT 1
`);
if (searchRes.total === 0) {
if (searchRes.rowCount === 0) {
console.log('没有需要生成 【向量】 的数据');
global.generatingVector = false;
return;
}
const dataItem: { id: string; q: string; userId: string } = {
id: searchRes.documents[0].id,
q: String(searchRes.documents[0]?.value?.q || ''),
userId: String(searchRes.documents[0]?.value?.userId || '')
id: searchRes.rows[0].id,
q: searchRes.rows[0].q,
userId: searchRes.rows[0].user_id
};
dataId = dataItem.id;
@@ -53,7 +47,7 @@ export async function generateVector(next = false): Promise<any> {
systemKey = res.systemKey;
} catch (error: any) {
if (error?.code === 501) {
await redis.del(dataItem.id);
await pg.query(`DELETE FROM modelData WHERE id = '${dataId}'`);
generateVector(true);
return;
}
@@ -69,15 +63,10 @@ export async function generateVector(next = false): Promise<any> {
apiKey: userApiKey || systemKey
});
// 更新 redis 向量和状态数据
await redis.sendCommand([
'HMSET',
dataItem.id,
'vector',
vectorToBuffer(vector),
'status',
ModelDataStatusEnum.ready
]);
// 更新 pg 向量和状态数据
await pg.query(
`UPDATE modelData SET vector = '[${vector}]', status = 'ready' WHERE id = ${dataId}`
);
console.log(`生成向量成功: ${dataItem.id}`);

34
src/service/pg.ts Normal file
View File

@@ -0,0 +1,34 @@
import { Pool } from 'pg';
export const connectPg = async () => {
if (global.pgClient) {
return global.pgClient;
}
global.pgClient = new Pool({
host: process.env.PG_HOST,
port: process.env.PG_PORT ? +process.env.PG_PORT : 5432,
user: process.env.PG_USER,
password: process.env.PG_PASSWORD,
database: process.env.PG_DB_NAME,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000
});
global.pgClient.on('connect', () => {
console.log('pg connected');
});
global.pgClient.on('error', (err) => {
console.log(err);
global.pgClient = null;
});
try {
await global.pgClient.connect();
return global.pgClient;
} catch (error) {
global.pgClient = null;
return Promise.reject(error);
}
};