找回密码
 立即注册
首页 业界区 业界 Maui 实践:用 Channel 实现数据库查询时读取速度与内存 ...

Maui 实践:用 Channel 实现数据库查询时读取速度与内存占用的平衡

夔新梅 2025-7-17 13:52:25
作者:夏群林 原创 2025.7.17
我们在进行数据库查询时,通常并不是为了取得整个表的数据,而是某些符合过滤条件的记录。比如:
  1. var unassociatedSudokus = await _dbContext.DbSudokus
  2.                 .Where(s => !relatedSudokuIds.Contains(s.ID))
  3.                 .ToListAsync();
复制代码
这里 relatedSudokuIds 元素不多,过滤条件 s => !relatedSudokuIds.Contains(s.ID) 简单,运算量不大。所以,一切正常。
但我们经常会加入别的过滤条件,比如:
  1. var unassociatedSudokus = await _dbContext.DbSudokus
  2.                 .Where(s => !relatedSudokuIds.Contains(s.ID) && string.Join(string.Empty, s.Ans) == targetAns)
  3.                 .ToListAsync();
复制代码
过滤条件 string.Join(string.Empty, s.Ans) == targetAns 还算简单,但因为在读取数据表的时候现场装配 s.Ans,速度变慢,记录量大时,延迟明显。
推荐方案,是在数据库设计层面优化,添加一个存储拼接后结果的字段并建立索引。一般来说,可以从根本上解决读取速读慢的问题。
偏偏我的应用,DbSudokus 表非常大,而需要这种查询的场景,却不多。我舍不得多加一个 Column,让 DbSudokus  数据表无谓地臃肿。
另一个方案,数据库加载阶段简单过滤,将拼接之类的复杂过滤运算放在数据库加载之后,在内存里过滤:
  1.   var unassociatedSudokus = await _dbContext.DbSudokus
  2.             .Where(s => !relatedSudokuIds.Contains(s.ID))
  3.             .ToListAsync();
  4.   unassociatedSudokus = [.. unassociatedSudokus.Where(s => string.Join(string.Empty, s.Ans) == targetAns)];
复制代码
代价是一次性加载全部数据,内存占用过多。尤其在我的应用的情形,数据表本身很大,过滤后的结果集很小,总觉得不划算。
于是想到用 Channel。
Channel 主要是通过流处理的方式来平衡性能和内存占用。原理是:

  • 先从数据库分批读取数据(避免一次性加载全部数据)
  • 通过 Channel 将数据逐个或批量传递到消费者
  • 在消费者端进行内存中的字符串拼接和比较等耗时运算
  • 只保留符合条件的结果
这样既避免了在数据库中执行复杂操作,如字符串拼接等,可能无法有效利用索引,又避免了一次性加载所有数据导致的高内存占用。数据一边读取一边处理,通过批次大小和通道容量,可以限制同时加载到内存中的数据量,而且流处理,不需要等待全部数据加载完成。预计到结果集很小时,大部分数据需要被过滤掉,使用 Channel 优势明显,在流处理读取过程中尽早过滤掉不需要的数据,自然降低了内存占用。
我的想法,把过滤条件切分两部分:
第一,简单的部分,放在数据库加载阶段,有 Channel  的生产者处理,并且可接受消费者的通知,提前结束数据库读入:
  1. // 生产者任务:支持提前终止
  2. var producerTask = Task.Run(async () =>
  3. {
  4.     try
  5.     {
  6.         var page = 0;
  7.         while (!stopProcessing) // 当消费者发现足够结果时可以提前停止
  8.         {
  9.             var batch = await _dbContext.DbSudokus
  10.                 .Where(s => !relatedSudokuIds.Contains(s.ID))
  11.                 .Skip(page * batchSize)
  12.                 .Take(batchSize)
  13.                 .ToListAsync(cancellationToken);
  14.             if (batch.Count == 0)
  15.                 break;
  16.             foreach (var sudoku in batch)
  17.             {
  18.                 // 再次检查是否需要停止,避免写入多余数据
  19.                 if (stopProcessing) break;
  20.                 await channel.Writer.WriteAsync(sudoku, cancellationToken);
  21.             }
  22.             page++;
  23.         }
  24.     }
  25.     finally
  26.     {
  27.         channel.Writer.Complete();
  28.     }
  29. });
复制代码
第二,复杂的部分,
  1. // 消费者任务:找到足够结果后可以提前停止
  2. var consumerTask = Task.Run(async () =>
  3. {
  4.     await foreach (var sudoku in channel.Reader.ReadAllAsync(cancellationToken))
  5.     {
  6.         // 检查是否已经找到足够的结果
  7.         if (maxResults.HasValue && result.Count >= maxResults.Value)
  8.         {
  9.             stopProcessing = true;
  10.             break;
  11.         }
  12.         // 内存中过滤
  13.         var ansString = string.Join(string.Empty, sudoku.Ans);
  14.         if (ansString == targetAns)
  15.         {
  16.             lock (result)
  17.             {
  18.                 result.Add(sudoku);
  19.             }
  20.         }
  21.     }
  22. });
复制代码
如果知道大概的结果数量,可以设置 maxResults 参数,得到额外的提前终止的好处。例如,如果通常只需要找到 1-2 条匹配结果,就可以将 maxResults 设为 2,系统会在找到 2 条结果后立即停止所有操作。
进一步,我们可以把上面的做法泛型化,核心是分离数据库端和内存端筛选逻辑,以兼顾性能和灵活性。具体做法,是把筛选逻辑包装成委托,作为参数传入。
最后,给出我的实现代码。这是一个 LINQ 风格 IQueryable 扩展方法,具有高度通用性,适用于任何实体类型和筛选场景,调用很方便。替换现有代码,几乎没有侵入性。我就是在自己应用的生产性代码中,原行替换的。
  1. using Microsoft.EntityFrameworkCore;
  2. using System.Linq.Expressions;
  3. namespace Zhally.Sudoku.Data;
  4. public static class QueryFilterExtensions
  5. {
  6.     /// <summary>
  7.     /// 流式筛选IQueryable数据,平衡性能和内存占用
  8.     /// </summary>
  9.     /// <typeparam name="T">实体类型</typeparam>
  10.     /// <param name="query">原始查询</param>
  11.     /// <param name="productionFilter">数据库端筛选表达式(生产阶段)</param>
  12.     /// <param name="consumptionFilter">内存端筛选委托(消费阶段)</param>
  13.     /// <param name="batchSize">批次大小</param>
  14.     /// <param name="maxResults">最大结果数量(达到后提前终止)</param>
  15.     /// <returns>筛选后的结果列表</returns>
  16.     public static async Task<List<T>> FilterWithChannelAsync<T>(
  17.         this IQueryable<T> query,
  18.         Expression<Func<T, bool>> productionFilter,
  19.         Func<T, bool> consumptionFilter,
  20.         int batchSize = 100,
  21.         int? maxResults = null)
  22.         where T : class
  23.     {
  24.         // 创建有界通道控制内存占用
  25.         var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(100)
  26.         {
  27.             FullMode = BoundedChannelFullMode.Wait,
  28.             SingleReader = true,
  29.             SingleWriter = true
  30.         });
  31.         var result = new List<T>();
  32.         var cancellationToken = CancellationToken.None;
  33.         bool stopProcessing = false;
  34.         // 消费者任务:处理并筛选数据
  35.         var consumerTask = Task.Run(async () =>
  36.         {
  37.             await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
  38.             {
  39.                 // 检查是否已达到最大结果数
  40.                 if (maxResults.HasValue && result.Count >= maxResults.Value)
  41.                 {
  42.                     stopProcessing = true;
  43.                     break;
  44.                 }
  45.                 // 应用内存筛选条件
  46.                 if (consumptionFilter(item))
  47.                 {
  48.                     lock (result)
  49.                     {
  50.                         result.Add(item);
  51.                     }
  52.                 }
  53.             }
  54.         });
  55.         // 生产者任务:从数据库分批读取数据
  56.         var producerTask = Task.Run(async () =>
  57.         {
  58.             try
  59.             {
  60.                 var page = 0;
  61.                 while (!stopProcessing)
  62.                 {
  63.                     // 应用数据库筛选并分页查询
  64.                     var batch = await query
  65.                         .Where(productionFilter)
  66.                         .Skip(page * batchSize)
  67.                         .Take(batchSize)
  68.                         .ToListAsync(cancellationToken);
  69.                     if (batch.Count == 0)
  70.                         break; // 没有更多数据
  71.                     // 将批次数据写入通道
  72.                     foreach (var item in batch)
  73.                     {
  74.                         if (stopProcessing) break;
  75.                         await channel.Writer.WriteAsync(item, cancellationToken);
  76.                     }
  77.                     page++;
  78.                 }
  79.             }
  80.             finally
  81.             {
  82.                 channel.Writer.Complete(); // 通知消费者数据已写完
  83.             }
  84.         });
  85.         // 等待所有任务完成
  86.         await Task.WhenAll(producerTask, consumerTask);
  87.         return result;
  88.     }
  89. }
复制代码
这种设计特别适合以下场景:

  • 需要在数据库端做初步筛选,再在内存中做复杂筛选
  • 预期结果集较小,但源数据集可能很大
  • 希望平衡数据库负载和内存占用
还可以根据实际需求调整批次大小和通道容量,以获得最佳性能。
使用示例:
  1. var result = await _dbContext.DbSudokus
  2.     .FilterWithChannelAsync(
  3.         // 数据库端筛选:排除关联的Sudoku
  4.         s => !relatedSudokuIds.Contains(s.ID),
  5.         // 内存端筛选:比较拼接后的答案
  6.         s => string.Join(string.Empty, s.Ans) == targetAns,
  7.         batchSize: 100,
  8.         maxResults: null
  9.     );
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册