feat: API3支持event stream

This commit is contained in:
ikechan8370 2023-03-30 11:39:30 +08:00
parent cc785261eb
commit a4d8488a55
2 changed files with 112 additions and 30 deletions

View file

@ -1,7 +1,16 @@
import { v4 as uuidv4 } from 'uuid'
import {Config, defaultChatGPTAPI, officialChatGPTAPI} from '../utils/config.js'
import _ from 'lodash'
import { Config, officialChatGPTAPI } from './config.js'
import fetch from 'node-fetch'
import delay from 'delay'
import _ from 'lodash'
// import { createParser } from 'eventsource-parser'
let createParser
try {
createParser = (await import('eventsource-parser')).createParser
} catch (e) {
console.warn('未安装eventsource-parser请在插件目录下执行pnpm i')
}
let proxy
if (Config.proxy) {
try {
@ -87,6 +96,7 @@ export class OfficialChatGPTClient {
}
const res = await this._fetch(url, option)
if (res.status === 403) {
await delay(500)
return await this.sendMessage(prompt, opts)
}
if (res.status !== 200) {
@ -97,35 +107,106 @@ export class OfficialChatGPTClient {
throw new Error(body)
}
}
// todo accept as stream
const decoder = new TextDecoder('utf-8')
const bodyBytes = await res.arrayBuffer()
const bodyText = decoder.decode(bodyBytes)
const events = bodyText.split('\n\n').filter(item => !_.isEmpty(item))
let fullResponse
for (let i = 0; i < events.length; i++) {
let event = events[i]
event = _.trimStart(event, 'data: ')
try {
let tmp = JSON.parse(event)
if (tmp.message) {
fullResponse = tmp
if (createParser) {
let conversationResponse
const responseP = new Promise(
// eslint-disable-next-line no-async-promise-executor
async (resolve, reject) => {
let response
function onMessage (data) {
if (data === '[DONE]') {
return resolve({
error: null,
response,
conversationId,
messageId,
conversationResponse
})
}
try {
const _checkJson = JSON.parse(data)
} catch (error) {
// console.log('warning: parse error.')
return
}
try {
const convoResponseEvent = JSON.parse(data)
conversationResponse = convoResponseEvent
if (convoResponseEvent.conversation_id) {
conversationId = convoResponseEvent.conversation_id
}
if (convoResponseEvent.message?.id) {
messageId = convoResponseEvent.message.id
}
const partialResponse =
convoResponseEvent.message?.content?.parts?.[0]
if (partialResponse) {
if (Config.debug) {
logger.info(JSON.stringify(convoResponseEvent))
}
response = partialResponse
}
} catch (err) {
console.warn('fetchSSE onMessage unexpected error', err)
reject(err)
}
}
const parser = createParser((event) => {
if (event.type === 'event') {
onMessage(event.data)
}
})
res.body.on('readable', async () => {
logger.mark('成功连接到chat.openai.com准备读取数据流')
let chunk
while ((chunk = res.body.read()) !== null) {
let str = chunk.toString()
parser.feed(str)
}
})
}
} catch (err) {
console.log(event)
)
let response = await responseP
return {
text: response.response,
conversationId: response.conversationId,
id: response.messageId,
parentMessageId
}
} else {
logger.warn('未安装eventsource-parser强烈建议安装以提高API3响应性能在插件目录下执行pnpm i或pnpm add -w eventsource-parser')
const decoder = new TextDecoder('utf-8')
const bodyBytes = await res.arrayBuffer()
const bodyText = decoder.decode(bodyBytes)
const events = bodyText.split('\n\n').filter(item => !_.isEmpty(item))
let fullResponse
for (let i = 0; i < events.length; i++) {
let event = events[i]
event = _.trimStart(event, 'data: ')
try {
let tmp = JSON.parse(event)
if (tmp.message) {
fullResponse = tmp
}
} catch (err) {
// console.log(event)
}
}
if (Config.debug) {
logger.mark(JSON.stringify(fullResponse))
}
if (!fullResponse?.message) {
throw new Error(bodyText || 'unkown error, please check log')
}
return {
text: fullResponse.message.content.parts[0],
conversationId: fullResponse.conversation_id,
id: fullResponse.message.id,
parentMessageId
}
}
if (Config.debug) {
logger.mark(fullResponse)
}
if (!fullResponse?.message) {
throw new Error(bodyText || 'unkown error, please check log')
}
return {
text: fullResponse.message.content.parts[0],
conversationId: fullResponse.conversation_id,
id: fullResponse.message.id,
parentMessageId
}
}
}