前言
最近有群友向我请教如何在 CEF 中实现 Server-Sent Events(SSE) 类型的自定义资源控制器,目前主流的 AI 平台都采用了相似的技术对生成的内容进行流式返回,而该群友的需求是在 CEF 框架中对某个 URL 进行拦截并实时通过后端的 ASP.NET Core 流式返回 AICG 的内容。
之前我一直致力于 WinForm 与 Web 页面的结合,对 SSE 的知识了解甚少,通过 MDN 对《使用服务器发送事件》内容的学习,了解了其基本原理,其本质就是使用长连接保证了浏览器与服务器之间的流持续存在不断开,服务器根据后端的需求不定时向浏览器端发送事件(event)或者数据(message)等固定格式的 utf8 字符串,并以“\n\n”作为一条数据的终止信息。与 WebSokect 不同的是,这种通信方式是单向的,只能由服务器周期性向客户端发送数据,客户端使用 EventSource 实例来接收这些数据。有关于 Server-Sent Events 的相关内容请参考 MDN 的描述,在此不做过多介绍,本文的焦点将重点放在如何使用 CEF 的 CefRequestHandler 接口中的 GetResourceRequestHandler 方法通过返回自定义的 CefResourceRequestHandler 实例来进行请求拦截并实现一个简单的 SSE 长链接数据流。
根据该群友的需求需要基于 CefSharp 框架进行实现,因为平时本人比较熟悉的框架是 CefGlue 对 CefSharp 并不太熟悉,因此本示例如有纰漏还请各位读者之争。
实现
首先,用 Visual Studio 2022 创建一个“Windows 窗体应用程序”类型的项目,并使用 NuGet 包管理其安装 “CefShapr.WinForms” 包,版本根据实际需求进行选择。
然后,在默认的窗体 Form1.cs 的构造函数中添加 CefSharp 的浏览器控件,并把该浏览器控件实例的 RequestHandler 换成我们自己的请求控制器。- public partial class Form1 : Form
- {
- private ChromiumWebBrowser browser;
- public Form1()
- {
- InitializeComponent();
- browser = new ChromiumWebBrowser();
- browser.Dock = DockStyle.Fill;
- this.Controls.Add(browser);
- browser.Load("https://test.local/sse");
- browser.RequestHandler = new MyRequestHandler();
- }
- }
复制代码 在自定义的请求控制器 MyRequestHandler 里,我们只需要重载 GetResourceRequestHandler 方法,通过该重载方法提供自定义的 ResourceRequestHandler。- class MyRequestHandler : RequestHandler
- {
- protected override IResourceRequestHandler GetResourceRequestHandler(IWebBrowser chromiumWebBrowser, IBrowser browser, IFrame frame, IRequest request, bool isNavigation, bool isDownload, string requestInitiator, ref bool disableDefaultHandling)
- {
- return new MyResourceRequestHandler();
- }
- }
复制代码 在自定义的资源请求控制器 MyResourceRequestHandler 中,重载 GetResourceHandler 方法,在该方法中对指定 URL 进行拦截并将该请求映射到我们自己的 ResourceHandler 实例上,这样就能在该实例中实现我们的目标需求。- class MyResourceRequestHandler : ResourceRequestHandler
- {
- protected override IResourceHandler GetResourceHandler(IWebBrowser chromiumWebBrowser, IBrowser browser, IFrame frame, IRequest request)
- {
- if(request.Url == "https://test.local/sse")
- {
- return new MyResourceHandler();
- }
- return base.GetResourceHandler(chromiumWebBrowser, browser, frame, request);
- }
- }
复制代码 下面展示的代码将是本文的重点 MyResourceHandler 类,该类实现了 CefSharp 的 IResourceHandler 接口以此来实现一个完成的资源控制器,我们在该类的 Read 方法里用对客户端数据流进行阻塞实现长连接,并在需要的时候向该流中写入数据以实现 Server-Sent Events 的效果。
Sever-Sent Events 数据结构体
使用 ServerSentEvent 类来存储需要向客户端发送的 SSE 数据。- public sealed class ServerSentEvent
- {
- public string Id { get; set; }
- public string Event { get; set; }
- public string Data { get; set; }
- public uint? Retry { get; set; }
- public override string ToString()
- {
- var sb = new StringBuilder();
- if (Retry.HasValue)
- {
- sb.Append($"retry: {Retry.Value}\n");
- }
- if (!string.IsNullOrWhiteSpace(Event))
- {
- sb.Append($"event: {Event}\n");
- }
- else
- {
- sb.Append("event: message\n");
- }
- if (!string.IsNullOrWhiteSpace(Id))
- {
- sb.Append($"id: {Id}\n");
- }
- var lines = Data?.Split(new[] { "\r\n", "\n" }, StringSplitOptions.RemoveEmptyEntries) ?? [""];
- foreach (var line in lines)
- {
- sb.Append($"data: {line}\n");
- }
- sb.Append("\n");
- return sb.ToString();
- }
- }
复制代码 IResourceHandler 的实现
在这里如果继承 CefSharp 封装好的 ResourceHandler 类将不能很好的控制数据流,因此我们采用实现接口的方式使用更底层的代码实现对数据流的手动控制。MyResourceHandler 实现了 IResourceHandler 接口,用于自定义 CEF(Chromium Embedded Framework)资源处理,模拟 SSE(Server-Sent Events)流。- class MyResourceHandler : IResourceHandler
- {
- // 用于异步等待新事件到来,配合 async/await 实现数据推送。
- TaskCompletionSource<bool> _taskCompletionSource = null;
- // 线程安全的事件队列,存储待发送的 ServerSentEvent。
- ConcurrentBag<ServerSentEvent> _events = new ConcurrentBag<ServerSentEvent>();
- // 标记连接是否活跃。
- bool _isConnected = true;
- // 连接断开,通知等待中的任务终止。
- public void Cancel()
- {
- _isConnected = false;
- _taskCompletionSource?.SetResult(false);
- _taskCompletionSource = null;
- }
- public void Dispose()
- {
- }
- public void GetResponseHeaders(IResponse response, out long responseLength, out string redirectUrl)
- {
- response.SetHeaderByName("X-Accel-Buffering", "no", true);
- response.SetHeaderByName("Content-Type", "text/event-stream,utf-8", true);
- response.SetHeaderByName("Cache-Control", "no-cache", true);
- response.SetHeaderByName("Access-Control-Allow-Origin", "*", true);
- response.MimeType = "text/event-stream";
- redirectUrl = string.Empty;
- responseLength = 0;
- }
- public bool Open(IRequest request, out bool handleRequest, ICallback callback)
- {
- _taskCompletionSource = new TaskCompletionSource<bool>();
- handleRequest = true;
- _isConnected = true;
- SimulateDataPushing();
- return true;
- }
- private async void SimulateDataPushing()
- {
- while (_isConnected)
- {
- await Task.Delay(1000); // Simulate a delay for pushing data
- var sse = new ServerSentEvent
- {
- Id = Guid.NewGuid().ToString(),
- Event = "message",
- Data = $"Current time: {DateTime.Now:HH:mm:ss}",
-
- };
- _events.Add(sse);
- _taskCompletionSource?.TrySetResult(true);
- }
- }
- public bool Read(Stream dataOut, out int bytesRead, IResourceReadCallback callback)
- {
- HandleSSEAsync(dataOut, callback);
- bytesRead = 0;
- return true;
- }
- private async void HandleSSEAsync(Stream dataOut, IResourceReadCallback callback)
- {
- if (!_isConnected || _taskCompletionSource is null)
- {
- callback.Continue(0);
- return;
- }
- await _taskCompletionSource.Task;
- if (!_isConnected)
- {
- callback.Continue(0);
- return;
- }
- _taskCompletionSource = new TaskCompletionSource<bool>();
- if (_events.TryTake(out var msg))
- {
- var data = msg.ToString();
- var bytes = Encoding.UTF8.GetBytes(data);
- dataOut.Write(bytes, 0, bytes.Length);
- dataOut.Flush();
- callback.Continue(bytes.Length);
- }
- }
- #region NotImplementedException
- public bool ProcessRequest(IRequest request, ICallback callback)
- {
- throw new NotImplementedException();
- }
- public bool ReadResponse(Stream dataOut, out int bytesRead, ICallback callback)
- {
- throw new NotImplementedException();
- }
- public bool Skip(long bytesToSkip, out long bytesSkipped, IResourceSkipCallback callback)
- {
- bytesSkipped = bytesToSkip;
- return true;
- }
- #endregion
- }
复制代码 上面代码主要进行了以下操作:
- GetResponseHeaders 方法为浏览器请求设置了 HTTP 响应头,声明 SSE 流(text/event-stream),禁止缓存,并允许跨域访问。
- Open 方法将在客户端连接的时候被触发,在示例中,按照 CEF 的官方文档,让参数 handleRequest 为 true,并返回 true 表示立即处理该请求。其中的异步方法 SimulateDataPushing 实现的是数据推送的模拟,这个模拟每秒模拟生成一个新的服务器事件,加入队列,并唤醒等待的读取操作。
- Read 方法将在数据流被读取时被触发,这也是实现长连接的关键。这里按照 CEF 的官方文档,让参数 bytesRead 为 0,并返回 true 表示异步处理 Read 请求,这里使用异步方法 HandleSSEAsync 来处理队列 _events 里的数据,当有新事件到来后,将事件内容写入输出流,通知 CEF 继续读取,读取完成后新建 TaskCompletionSource 继续阻塞改方法。
以上,就完成了 SSE 类型资源控制器的实现。启动项目,直接访问该拦截地址 https://test.local/sse,实现了长连接并按预期收到了周期性的服务器发送事件。
结论
对以上代码的扩展,接入了 AICG 后浏览器作为 SSE 客户端正常收到了我们自定义的 SSE 资源控制器发送过来的数据。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |