From a4d8488a55a784915caaf1dc5b5ad9acde0117ad Mon Sep 17 00:00:00 2001 From: ikechan8370 Date: Thu, 30 Mar 2023 11:39:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20API3=E6=94=AF=E6=8C=81event=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 3 +- utils/message.js | 139 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 112 insertions(+), 30 deletions(-) diff --git a/package.json b/package.json index 63d7ee3..a1d75c6 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-recaptcha": "^3.6.8", "puppeteer-extra-plugin-stealth": "^2.11.2", - "sharp": "^0.31.3" + "sharp": "^0.31.3", + "eventsource-parser": "^1.0.0" } } \ No newline at end of file diff --git a/utils/message.js b/utils/message.js index 911c0ca..b1ef380 100644 --- a/utils/message.js +++ b/utils/message.js @@ -1,7 +1,16 @@ import { v4 as uuidv4 } from 'uuid' -import {Config, defaultChatGPTAPI, officialChatGPTAPI} from '../utils/config.js' -import _ from 'lodash' +import { Config, officialChatGPTAPI } from './config.js' import fetch from 'node-fetch' +import delay from 'delay' +import _ from 'lodash' +// import { createParser } from 'eventsource-parser' +let createParser +try { + createParser = (await import('eventsource-parser')).createParser +} catch (e) { + console.warn('未安装eventsource-parser,请在插件目录下执行pnpm i') +} + let proxy if (Config.proxy) { try { @@ -87,6 +96,7 @@ export class OfficialChatGPTClient { } const res = await this._fetch(url, option) if (res.status === 403) { + await delay(500) return await this.sendMessage(prompt, opts) } if (res.status !== 200) { @@ -97,35 +107,106 @@ export class OfficialChatGPTClient { throw new Error(body) } } - // todo accept as stream - const decoder = new TextDecoder('utf-8') - const bodyBytes = await res.arrayBuffer() - const bodyText = decoder.decode(bodyBytes) - const events = bodyText.split('\n\n').filter(item => !_.isEmpty(item)) - let fullResponse - for (let i = 0; i < events.length; i++) { - let event = events[i] - event = _.trimStart(event, 'data: ') - try { - let tmp = JSON.parse(event) - if (tmp.message) { - fullResponse = tmp + if (createParser) { + let conversationResponse + const responseP = new Promise( + // eslint-disable-next-line no-async-promise-executor + async (resolve, reject) => { + let response + function onMessage (data) { + if (data === '[DONE]') { + return resolve({ + error: null, + response, + conversationId, + messageId, + conversationResponse + }) + } + try { + const _checkJson = JSON.parse(data) + } catch (error) { + // console.log('warning: parse error.') + return + } + try { + const convoResponseEvent = JSON.parse(data) + conversationResponse = convoResponseEvent + if (convoResponseEvent.conversation_id) { + conversationId = convoResponseEvent.conversation_id + } + + if (convoResponseEvent.message?.id) { + messageId = convoResponseEvent.message.id + } + + const partialResponse = + convoResponseEvent.message?.content?.parts?.[0] + if (partialResponse) { + if (Config.debug) { + logger.info(JSON.stringify(convoResponseEvent)) + } + response = partialResponse + } + } catch (err) { + console.warn('fetchSSE onMessage unexpected error', err) + reject(err) + } + } + + const parser = createParser((event) => { + if (event.type === 'event') { + onMessage(event.data) + } + }) + res.body.on('readable', async () => { + logger.mark('成功连接到chat.openai.com,准备读取数据流') + let chunk + while ((chunk = res.body.read()) !== null) { + let str = chunk.toString() + parser.feed(str) + } + }) } - } catch (err) { - console.log(event) + ) + let response = await responseP + return { + text: response.response, + conversationId: response.conversationId, + id: response.messageId, + parentMessageId + } + } else { + logger.warn('未安装eventsource-parser,强烈建议安装以提高API3响应性能,在插件目录下执行pnpm i或pnpm add -w eventsource-parser') + const decoder = new TextDecoder('utf-8') + const bodyBytes = await res.arrayBuffer() + const bodyText = decoder.decode(bodyBytes) + const events = bodyText.split('\n\n').filter(item => !_.isEmpty(item)) + let fullResponse + for (let i = 0; i < events.length; i++) { + let event = events[i] + event = _.trimStart(event, 'data: ') + try { + let tmp = JSON.parse(event) + if (tmp.message) { + fullResponse = tmp + } + } catch (err) { + // console.log(event) + } + } + if (Config.debug) { + logger.mark(JSON.stringify(fullResponse)) + } + if (!fullResponse?.message) { + throw new Error(bodyText || 'unkown error, please check log') + } + return { + text: fullResponse.message.content.parts[0], + conversationId: fullResponse.conversation_id, + id: fullResponse.message.id, + parentMessageId } - } - if (Config.debug) { - logger.mark(fullResponse) - } - if (!fullResponse?.message) { - throw new Error(bodyText || 'unkown error, please check log') - } - return { - text: fullResponse.message.content.parts[0], - conversationId: fullResponse.conversation_id, - id: fullResponse.message.id, - parentMessageId } } }