feat: 增强 Chat 能力并补充单机部署方案

完善会话消息删除、Qwen 联网搜索/深度思考参数与 SSE 来源事件,同时增加请求体日志与 TS6 配置兼容调整,并新增 Ubuntu+PM2+Nginx 的部署文档与脚本以支持可回滚发布。

Made-with: Cursor
This commit is contained in:
2026-04-23 22:31:18 +08:00
parent 32303d099a
commit 132f51705e
12 changed files with 685 additions and 60 deletions

View File

@@ -8,6 +8,32 @@ import { PrismaModule } from './prisma/prisma.module';
import configuration from './config/configuration';
import { validateEnv } from './config/validation';
function sanitizeBody(input: unknown): unknown {
if (!input || typeof input !== 'object') return input;
if (Array.isArray(input)) return input.map((v) => sanitizeBody(v));
const out: Record<string, unknown> = {};
for (const [key, value] of Object.entries(input as Record<string, unknown>)) {
if (/(token|password|secret|authorization)/i.test(key)) {
out[key] = '[REDACTED]';
continue;
}
out[key] = sanitizeBody(value);
}
return out;
}
function getRequestBody(req: unknown): unknown {
if (!req || typeof req !== 'object') return undefined;
const request = req as Record<string, unknown>;
const body = request.body;
if (body !== undefined) return body;
const raw = request.raw;
if (raw && typeof raw === 'object') {
return (raw as Record<string, unknown>).body;
}
return undefined;
}
@Module({
imports: [
ConfigModule.forRoot({
@@ -17,6 +43,18 @@ import { validateEnv } from './config/validation';
}),
LoggerModule.forRoot({
pinoHttp: {
customSuccessObject(req, res, base) {
return {
...base,
body: sanitizeBody(getRequestBody(req)),
};
},
customErrorObject(req, res, error, base) {
return {
...base,
body: sanitizeBody(getRequestBody(req)),
};
},
transport:
process.env.NODE_ENV !== 'production'
? {

View File

@@ -101,6 +101,33 @@ export class ChatSessionService {
return this.mapSession(deleted);
}
async deleteMessage(
userId: bigint,
sessionIdStr: string,
messageIdStr: string,
) {
const sessionId = parseBigIntId(sessionIdStr, 'sessionId');
const messageId = parseBigIntId(messageIdStr, 'messageId');
await this.assertSessionOwned(userId, sessionId);
const row = await this.prisma.chatMessage.findUnique({
where: { id: messageId },
});
if (!row || row.sessionId !== sessionId) {
throw new NotFoundException('消息不存在');
}
const deleted = await this.prisma.chatMessage.delete({
where: { id: messageId },
});
await this.prisma.$executeRaw(
Prisma.sql`UPDATE chat_sessions SET updated_at = NOW() WHERE id = ${sessionId}`,
);
return this.mapMessage(deleted);
}
async updateSessionTitle(userId: bigint, sessionIdStr: string, title: string) {
const sessionId = parseBigIntId(sessionIdStr, 'sessionId');
await this.assertSessionOwned(userId, sessionId);
@@ -257,4 +284,22 @@ export class ChatSessionService {
updatedAt: row.updatedAt.toISOString(),
};
}
private mapMessage(row: {
id: bigint;
role: string;
content: string;
tokenCount: number;
provider: string | null;
createdAt: Date;
}) {
return {
id: String(row.id),
role: row.role,
content: row.content,
tokenCount: row.tokenCount,
provider: row.provider,
createdAt: row.createdAt.toISOString(),
};
}
}

View File

@@ -26,6 +26,7 @@ import { ClientJwtAuthGuard } from '../../auth/client-jwt-auth.guard';
import { ChatSessionService } from '../application/chat-session.service';
import { PaginationQueryDto } from '../dto/chat-session-query.dto';
import {
ChatMessageRowDto,
ChatMessageListResponseDto,
ChatSessionListResponseDto,
ChatSessionRowDto,
@@ -117,6 +118,28 @@ export class ChatSessionsController {
return this.chatSessions.deleteSession(userId, sessionId);
}
@Delete(':sessionId/messages/:messageId')
@ApiOperation({ summary: '删除会话中的指定消息' })
@ApiParam({
name: 'sessionId',
description: '会话 ID数字字符串',
example: '1',
})
@ApiParam({
name: 'messageId',
description: '消息 ID数字字符串',
example: '100',
})
@ApiOkResponse({ type: ChatMessageRowDto })
async removeMessage(
@Req() req: ClientJwtRequest,
@Param('sessionId') sessionId: string,
@Param('messageId') messageId: string,
) {
const userId = BigInt(req.user.userId);
return this.chatSessions.deleteMessage(userId, sessionId, messageId);
}
@Patch(':sessionId/title')
@UsePipes(
new ValidationPipe({

View File

@@ -66,6 +66,7 @@ export class ChatController {
type: 'string',
example:
'event: meta\\ndata: {"requestId":"chatcmpl_xxx","platform":"qwen","model":"qwen-plus","sessionId":"1"}\\n\\n' +
'event: sources\\ndata: {"items":[{"title":"示例来源","url":"https://example.com"}]}\\n\\n' +
'event: delta\\ndata: {"delta":"你好"}\\n\\n' +
'event: usage\\ndata: {"promptTokens":10,"completionTokens":20,"totalTokens":30}\\n\\n' +
'event: done\\ndata: {"finishReason":"stop"}\\n\\n',
@@ -96,50 +97,70 @@ export class ChatController {
reply.raw.setHeader('X-Accel-Buffering', 'no');
reply.raw.flushHeaders?.();
const response = await this.router.routeAndStream(body);
reply.raw.write(
formatSse('meta', {
requestId: response.requestId,
platform: response.providerCode,
model: response.model,
sessionId: String(sessionId),
}),
);
let assistantText = '';
for (const chunk of response.chunks) {
assistantText += chunk.content;
reply.raw.write(formatSse('delta', { delta: chunk.content }));
await sleep(120);
}
// 在 done 之前完成用户消息与标题落库,确保前端紧接着查列表能看到最新标题。
try {
await this.chatSessions.persistUserMessageAndTitle(sessionId, body.messages);
const response = await this.router.routeAndStream(body);
reply.raw.write(
formatSse('meta', {
requestId: response.requestId,
platform: response.providerCode,
model: response.model,
sessionId: String(sessionId),
}),
);
if (response.sources?.length) {
reply.raw.write(
formatSse('sources', {
items: response.sources,
}),
);
}
let assistantText = '';
for (const chunk of response.chunks) {
assistantText += chunk.content;
reply.raw.write(formatSse('delta', { delta: chunk.content }));
await sleep(120);
}
// 在 done 之前完成用户消息与标题落库,确保前端紧接着查列表能看到最新标题。
try {
await this.chatSessions.persistUserMessageAndTitle(sessionId, body.messages);
} catch (err) {
this.logger.error(
{ err, sessionId: String(sessionId) },
'persist user message/title failed',
);
}
reply.raw.write(formatSse('usage', response.usage));
reply.raw.write(formatSse('done', { finishReason: 'stop' }));
reply.raw.end();
try {
await this.chatSessions.persistAssistantMessage(
sessionId,
assistantText,
response.providerCode,
response.usage,
);
} catch (err) {
this.logger.error(
{ err, sessionId: String(sessionId) },
'persist chat roundtrip failed',
);
}
} catch (err) {
this.logger.error(
{ err, sessionId: String(sessionId) },
'persist user message/title failed',
);
}
reply.raw.write(formatSse('usage', response.usage));
reply.raw.write(formatSse('done', { finishReason: 'stop' }));
reply.raw.end();
try {
await this.chatSessions.persistAssistantMessage(
sessionId,
assistantText,
response.providerCode,
response.usage,
);
} catch (err) {
this.logger.error(
{ err, sessionId: String(sessionId) },
'persist chat roundtrip failed',
'stream chat failed',
);
if (!reply.raw.writableEnded) {
const message = err instanceof Error ? err.message : 'stream failed';
reply.raw.write(formatSse('error', { code: 'STREAM_FAILED', message }));
reply.raw.write(formatSse('done', { finishReason: 'error' }));
reply.raw.end();
}
}
}
}

View File

@@ -1,6 +1,13 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { Type } from 'class-transformer';
import { IsArray, IsIn, IsOptional, IsString, ValidateNested } from 'class-validator';
import {
IsArray,
IsBoolean,
IsIn,
IsOptional,
IsString,
ValidateNested,
} from 'class-validator';
import { StreamChatRequest } from '@shared/ai-gateway/types/chat.types';
export class ChatMessageDto {
@@ -43,6 +50,24 @@ export class StreamChatBodyDto implements StreamChatRequest {
@IsString()
platform?: string;
@ApiPropertyOptional({
type: Boolean,
description: '是否启用联网搜索(当前仅 qwen 平台生效)',
example: true,
})
@IsOptional()
@IsBoolean()
enableWebSearch?: boolean;
@ApiPropertyOptional({
type: Boolean,
description: '是否启用深度思考(当前仅 qwen 平台生效)',
example: true,
})
@IsOptional()
@IsBoolean()
enableThinking?: boolean;
@ApiProperty({
type: () => ChatMessageDto,
isArray: true,

View File

@@ -1,4 +1,4 @@
import { Injectable, InternalServerErrorException } from '@nestjs/common';
import { Injectable, InternalServerErrorException, Logger } from '@nestjs/common';
import { ProviderStreamResult, StreamChatRequest } from '../types/chat.types';
import { AiProvider } from './provider.interface';
import { request } from 'undici';
@@ -6,6 +6,7 @@ import { request } from 'undici';
@Injectable()
export class QwenProvider implements AiProvider {
readonly code = 'qwen';
private readonly logger = new Logger(QwenProvider.name);
supports(model?: string): boolean {
if (!model) return true;
@@ -23,16 +24,28 @@ export class QwenProvider implements AiProvider {
}
const model = req.model || 'qwen-plus';
const upstreamBody = {
model,
stream: false, // 先用非流式,统一在网关层拆分为 SSE
stream: false,
messages: (req.messages || []).map((m) => ({
role: m.role,
content: m.content,
})),
...(req.enableThinking ? { enable_thinking: true } : {}),
...(req.enableWebSearch
? {
// OpenAI 兼容 Chat Completions 官方参数enable_search
enable_search: true,
// 官方文档建议:如需确保触发联网,可开启强制搜索
search_options: {
forced_search: true,
},
}
: {}),
};
this.logRequestSummary(req, model, baseUrl);
const { statusCode, body } = await request(
`${baseUrl}/chat/completions`,
{
@@ -52,23 +65,24 @@ export class QwenProvider implements AiProvider {
);
}
const json = (await body.json()) as any;
const final = (await body.json()) as any;
this.logDebugSummary(final, req.enableWebSearch);
const choice = json.choices?.[0];
const finalChoice = final.choices?.[0];
const content: string =
choice?.message?.content ??
finalChoice?.message?.content ??
'[Qwen] 未返回内容,请检查请求参数或模型配置。';
const promptTokens: number = json.usage?.prompt_tokens ?? 0;
const completionTokens: number = json.usage?.completion_tokens ?? 0;
const promptTokens: number = final.usage?.prompt_tokens ?? 0;
const completionTokens: number = final.usage?.completion_tokens ?? 0;
const totalTokens: number =
json.usage?.total_tokens ??
Math.max(1, promptTokens + completionTokens);
final.usage?.total_tokens ?? Math.max(1, promptTokens + completionTokens);
const chunks = this.splitText(content, 24).map((c) => ({ content: c }));
const sources = this.extractSources(final);
return {
requestId: json.id || `qwen_${Date.now()}`,
requestId: final.id || `qwen_${Date.now()}`,
providerCode: this.code,
model,
chunks,
@@ -77,9 +91,98 @@ export class QwenProvider implements AiProvider {
completionTokens,
totalTokens,
},
...(sources.length > 0 ? { sources } : {}),
};
}
private extractSources(
json: any,
): Array<{ title?: string; url: string; snippet?: string }> {
const candidates = [
...(json?.web_search?.results ?? []),
...(json?.search_info?.results ?? []),
...(json?.citations ?? []),
...(json?.references ?? []),
];
const map = new Map<string, { title?: string; url: string; snippet?: string }>();
for (const item of candidates) {
const url = item?.url ?? item?.link;
if (!url || typeof url !== 'string') continue;
if (!map.has(url)) {
map.set(url, {
url,
title: typeof item?.title === 'string' ? item.title : undefined,
snippet:
typeof item?.snippet === 'string'
? item.snippet
: typeof item?.content === 'string'
? item.content
: undefined,
});
}
}
return [...map.values()];
}
private logDebugSummary(json: any, enableWebSearch?: boolean) {
if (process.env.NODE_ENV === 'production') return;
const choice = json?.choices?.[0];
const message = choice?.message ?? {};
const content = message?.content;
const reasoning = message?.reasoning_content ?? message?.reasoning;
const sourceCandidates = [
...(json?.web_search?.results ?? []),
...(json?.search_info?.results ?? []),
...(json?.citations ?? []),
...(json?.references ?? []),
];
this.logger.debug(
{
enableWebSearch: !!enableWebSearch,
id: json?.id,
finishReason: choice?.finish_reason,
hasContent: typeof content === 'string' && content.length > 0,
contentType: typeof content,
contentPreview:
typeof content === 'string' ? content.slice(0, 120) : undefined,
hasReasoning: typeof reasoning === 'string' && reasoning.length > 0,
reasoningPreview:
typeof reasoning === 'string' ? reasoning.slice(0, 120) : undefined,
sourceCount: sourceCandidates.length,
usage: json?.usage,
},
'qwen upstream response summary',
);
}
private logRequestSummary(
req: StreamChatRequest,
model: string,
baseUrl: string,
) {
if (process.env.NODE_ENV === 'production') return;
const messages = req.messages || [];
const lastUserMessage = [...messages]
.reverse()
.find((m) => m.role === 'user')
?.content;
const totalChars = messages.reduce((sum, m) => sum + (m.content?.length || 0), 0);
this.logger.debug(
{
model,
baseUrl,
enableWebSearch: !!req.enableWebSearch,
enableThinking: !!req.enableThinking,
messagesCount: messages.length,
totalChars,
lastUserMessageLength: lastUserMessage?.length || 0,
},
'qwen upstream request summary',
);
}
private splitText(text: string, size: number) {
const result: string[] = [];
for (let i = 0; i < text.length; i += size) {

View File

@@ -10,6 +10,10 @@ export interface StreamChatRequest {
platform?: string; // qwen | deepseek | volc | auto | demo
/** 已有会话 ID不传则本次对话新建会话并落库 */
sessionId?: string;
/** 仅 qwen 生效:是否启用联网搜索工具 */
enableWebSearch?: boolean;
/** 仅 qwen 生效:是否开启深度思考 */
enableThinking?: boolean;
messages: ChatMessage[];
}
@@ -29,5 +33,6 @@ export interface ProviderStreamResult {
model: string;
chunks: ProviderStreamChunk[];
usage: ProviderUsage;
sources?: Array<{ title?: string; url: string; snippet?: string }>;
}