转换器
转换器是适配不同 LLM 提供商 API 差异的核心机制。它们在不同格式之间转换请求和响应,处理认证,并管理提供商特定的功能。
理解转换器
什么是转换器?
转换器是一个插件,它可以:
- 转换请求:从统一格式转换为提供商特定格式
- 转换响应:从提供商格式转换回统一格式
- 处理认证:为提供商 API 处理认证
- 修改请求:添加或调整参数
数据流
┌─────────────────┐
│ 传入请求 │ (来自 Claude Code 的 Anthropic 格式)
└────────┬────────┘
│
▼
┌─────────────────────────────────┐
│ transformRequestOut │ ← 将传入请求解析为统一格式
└────────┬────────────────────── ──┘
│
▼
┌─────────────────────────────────┐
│ UnifiedChatRequest │
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ transformRequestIn (可选) │ ← 在发送前修改统一请求
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 提供商 API 调用 │
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ transformResponseIn (可选) │ ← 将提供商响应转换为统一格式
└────────┬────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ transformResponseOut (可选) │ ← 将统一响应转换为 Anthropic 格式
└────────┬────────────────────────┘
│
▼
┌─────────────────┐
│ 传出响应 │ (返回给 Claude Code 的 Anthropic 格式)
└─────────────────┘
转换器接口
所有转换器都实现以下接口:
interface Transformer {
// 将统一请求转换为提供商特定格式
transformRequestIn?: (
request: UnifiedChatRequest,
provider: LLMProvider,
context: TransformerContext
) => Promise<Record<string, any>>;
// 将提供商请求转换为统一格式
transformRequestOut?: (
request: any,
context: TransformerContext
) => Promise<UnifiedChatRequest>;
// 将提供商 响应转换为统一格式
transformResponseIn?: (
response: Response,
context?: TransformerContext
) => Promise<Response>;
// 将统一响应转换为提供商格式
transformResponseOut?: (
response: Response,
context: TransformerContext
) => Promise<Response>;
// 自定义端点路径(可选)
endPoint?: string;
// 转换器名称(用于自定义转换器)
name?: string;
// 自定义认证处理器(可选)
auth?: (
request: any,
provider: LLMProvider,
context: TransformerContext
) => Promise<any>;
// Logger 实例(自动注入)
logger?: any;
}
关键类型
UnifiedChatRequest
interface UnifiedChatRequest {
messages: UnifiedMessage[];
model: string;
max_tokens?: number;
temperature?: number;
stream?: boolean;
tools?: UnifiedTool[];
tool_choice?: any;
reasoning?: {
effort?: ThinkLevel; // "none" | "low" | "medium" | "high"
max_tokens?: number;
enabled?: boolean;
};
}
UnifiedMessage
interface UnifiedMessage {
role: "user" | "assistant" | "system" | "tool";
content: string | null | MessageContent[];
tool_calls?: Array<{
id: string;
type: "function";
function: {
name: string;
arguments: string;
};
}>;
tool_call_id?: string;
thinking?: {
content: string;
signature?: string;
};
}
内置转换器
anthropic
将请 求转换为兼容 Anthropic 风格的 API:
{
"transformers": [
{
"name": "anthropic",
"providers": ["deepseek", "groq"]
}
]
}
功能:
- 在 Anthropic 消息格式和 OpenAI 格式之间转换
- 处理工具调用和工具结果
- 支持思考/推理内容块
- 管理流式响应
deepseek
专门用于 DeepSeek API 的转换器:
{
"transformers": [
{
"name": "deepseek",
"providers": ["deepseek"]
}
]
}
功能:
- DeepSeek 特定的推理格式
- 处理响应中的
reasoning_content - 支持思考预算令牌
gemini
用于 Google Gemini API 的转换器:
{
"transformers": [
{
"name": "gemini",
"providers": ["gemini"]
}
]
}
maxtoken
限制请求中的 max_tokens:
{
"transformers": [
{
"name": "maxtoken",
"options": {
"max_tokens": 8192
},
"models": ["deepseek,deepseek-chat"]
}
]
}
customparams
向请求中注入自定义参数:
{
"transformers": [
{
"name": "customparams",
"options": {
"include_reasoning": true,
"custom_header": "value"
}
}
]
}
创建自定义转换器
简单转换器:修改请求
最简单的转换器只修改发送到提供商之前的请求。
示例:为所有请求添加自定义头
// custom-header-transformer.js
module.exports = class CustomHeaderTransformer {
name = 'custom-header';
constructor(options) {
this.headerName = options?.headerName || 'X-Custom-Header';
this.headerValue = options?.headerValue || 'default-value';
}
async transformRequestIn(request, provider, context) {
// 添加自定义头(将被 auth 方法使用)
request._customHeaders = {
[this.headerName]: this.headerValue
};
return request;
}
async auth(request, provider) {
const headers = {
'authorization': `Bearer ${provider.apiKey}`,
...request._customHeaders
};
return {
body: request,
config: { headers }
};
}
};
在配置中使用:
{
"transformers": [
{
"name": "custom-header",
"path": "/path/to/custom-header-transformer.js",
"options": {
"headerName": "X-My-Header",
"headerValue": "my-value"
}
}
]
}
中级转换器:请求/响应转换
此示例展示如何在不同 API 格式之间转换。
示例:Mock API 格式转换器
// mockapi-transformer.js
module.exports = class MockAPITransformer {
name = 'mockapi';
endPoint = '/v1/chat'; // 自定义端点
// 从 MockAPI 格式转换为统一格式
async transformRequestOut(request, context) {
const messages = request.conversation.map(msg => ({
role: msg.sender,
content: msg.text
}));
return {
messages,
model: request.model_id,
max_tokens: request.max_tokens,
temperature: request.temp
};
}
// 从统一格式转换为 MockAPI 格式
async transformRequestIn(request, provider, context) {
return {
model_id: request.model,
conversation: request.messages.map(msg => ({
sender: msg.role,
text: typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content)
})),
max_tokens: request.max_tokens || 4096,
temp: request.temperature || 0.7
};
}
// 将 MockAPI 响应转换为统一格式
async transformResponseIn(response, context) {
const data = await response.json();
const unifiedResponse = {
id: data.request_id,
object: 'chat.completion',
created: data.timestamp,
model: data.model,
choices: [{
index: 0,
message: {
role: 'assistant',
content: data.reply.text
},
finish_reason: data.stop_reason
}],
usage: {
prompt_tokens: data.tokens.input,
completion_tokens: data.tokens.output,
total_tokens: data.tokens.input + data.tokens.output
}
};
return new Response(JSON.stringify(unifiedResponse), {
status: response.status,
statusText: response.statusText,
headers: { 'Content-Type': 'application/json' }
});
}
};
高级转换器:流式响应处理
此示例展示如何处理流式响应。
示例:向流式响应添加自定义元数据
// streaming-metadata-transformer.js
module.exports = class StreamingMetadataTransformer {
name = 'streaming-metadata';
constructor(options) {
this.metadata = options?.metadata || {};
this.logger = null; // 将由系统注入
}
async transformResponseOut(response, context) {
const contentType = response.headers.get('Content-Type');
// 处理流式响应
if (contentType?.includes('text/event-stream')) {
return this.transformStream(response, context);
}
// 处理非流式响应
return response;
}
async transformStream(response, context) {
const decoder = new TextDecoder();
const encoder = new TextEncoder();
const transformedStream = new ReadableStream({
start: async (controller) => {
const reader = response.body.getReader();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
controller.enqueue(encoder.encode(line + '\n'));
continue;
}
const data = line.slice(6).trim();
if (data === '[DONE]') {
controller.enqueue(encoder.encode(line + '\n'));
continue;
}
try {
const chunk = JSON.parse(data);
// 添加自定义元数据
if (chunk.choices && chunk.choices[0]) {
chunk.choices[0].metadata = this.metadata;
}
// 记录日志以便调试
this.logger?.debug({
chunk,
context: context.req.id
}, '转换流式数据块');
const modifiedLine = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(encoder.encode(modifiedLine));
} catch (parseError) {
// 如果解析失败,透传原始行
controller.enqueue(encoder.encode(line + '\n'));
}
}
}
} catch (error) {
this.logger?.error({ error }, '流式转换错误');
controller.error(error);
} finally {
controller.close();
reader.releaseLock();
}
}
});
return new Response(transformedStream, {
status: response.status,
statusText: response.statusText,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
}
};
真实示例:推理内容转换器
这是基于代码库中实际的 reasoning.transformer.ts。
// reasoning-transformer.ts
import { Transformer, TransformerOptions } from "@musistudio/llms";
export class ReasoningTransformer implements Transformer {
static TransformerName = "reasoning";
enable: boolean;
constructor(private readonly options?: TransformerOptions) {
this.enable = this.options?.enable ?? true;
}
// 转换请求以添加推理参数
async transformRequestIn(request: UnifiedChatRequest): Promise<UnifiedChatRequest> {
if (!this.enable) {
request.thinking = {
type: "disabled",
budget_tokens: -1,
};
request.enable_thinking = false;
return request;
}
if (request.reasoning) {
request.thinking = {
type: "enabled",
budget_tokens: request.reasoning.max_tokens,
};
request.enable_thinking = true;
}
return request;
}
// 转换响应以将 reasoning_content 转换为 thinking 格式
async transformResponseOut(response: Response): Promise<Response> {
if (!this.enable) return response;
const contentType = response.headers.get("Content-Type");
// 处理非流式响应
if (contentType?.includes("application/json")) {
const jsonResponse = await response.json();
if (jsonResponse.choices[0]?.message.reasoning_content) {
jsonResponse.thinking = {
content: jsonResponse.choices[0].message.reasoning_content
};
}
return new Response(JSON.stringify(jsonResponse), {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
}
// 处理流式响应
if (contentType?.includes("stream")) {
// [流式转换代码在这里]
// 参见代码库中的完整实现
}
return response;
}
}
转换器注册
方法 1:静态名称(基于类)
在 TypeScript/ES6 中创建转换器时使用:
export class MyTransformer implements Transformer {
static TransformerName = "my-transformer";
async transformRequestIn(request: UnifiedChatRequest): Promise<any> {
// 转换逻辑
return request;
}
}
方法 2:实例名称(基于实例)
用于 JavaScript 转换器:
module.exports = class MyTransformer {
constructor(options) {
this.name = 'my-transformer';
this.options = options;
}
async transformRequestIn(request, provider, context) {
// 转换逻辑
return request;
}
};