【问题标题】:Keeping Data in memory将数据保存在内存中
【发布时间】:2011-10-31 05:21:40
【问题描述】:

我有一个 http 处理程序,我将每个请求存储到内存中的并发队列集合中。一段时间后,我将集合批量插入到数据库中。

这是个坏主意吗?因为量很大,这似乎是 IMO 更好的方法。

我确实看到了一些差异(命中数与数据库中存储的元素数),由于线程,当我刷新并发集合时,我锁定它并批量插入其内容,然后清空集合。然后从集合中移除锁。

有更好的做法吗?或者你做过类似的事情吗?

【问题讨论】:

  • 你为什么要这样做?
  • 因为数据访问速度很慢,如果我一次执行一个插入或事务。
  • 对于您面临的线程问题,为什么不锁定集合,将原始集合复制到不同的集合(比如说 newCollection),清除并删除原始集合的锁定并使用 newCollection用于插入数据库。使用这种方法,新请求不会被长时间阻塞。
  • 另外,为什么要批量插入?您可以在插入之前设置另一个旨在收集 3-4 个请求的线程。
  • 这就是我在批量插入时所做的。

标签: c# .net multithreading memory collections


【解决方案1】:

抱歉,我想说这是个坏主意。存在以下问题:

  • 如果应用程序池在数据写入数据库之前回收,您将丢失数据
  • 将所有数据保留在同一个集合中会导致在插入数据以及将数据写入磁盘并清除集合时需要锁定该集合。这可能会导致整个网站在批量插入期间暂停。
  • 您的代码会因为额外的步骤而变得更加复杂。修复线程问题很难

我们编写的 Web 应用程序在峰值负载时每秒向 SQL Server 数据库写入 1000 行。

尝试先编写尽可能简单的应用程序,然后对其进行性能测试。

您可以插入数据库的速度很大程度上取决于您的硬件,但您也可以在程序中做一些事情:

  • 表上只有一个索引(聚集)。密钥自动编号。
  • 确保尽快释放与数据库的连接。

【讨论】:

  • 我知道你的要点,我提到了锁定。你是如何每秒向 sql server 写入 1000 行的?你能每秒对 sql server 做 10000 行吗?
【解决方案2】:

我已经完成了与您在下面的代码中描述的几乎完全相同的事情。它的线程安全并且有一个刷新方法,您可以调用它来刷新和挂起写入。一旦达到要写入的对象的阈值数量,它就会将队列(在我的情况下为列表)发送到不同的线程进行保存。请注意,它使用 manualResetEvent 来处理最后刷新数据(您可以等待的重置事件限制为 64 个,这就是为什么如果我们有超过 64 个后台线程等待写入,则手动等待,但这应该除非您的数据库真的很慢,否则几乎不会发生)。这段代码用于处理流入其中的千万条记录(从内存中写入 20m 行大约需要 5 分钟,但是在保存服务器上作为数据库运行,所以没有网络跳跃……SQL 肯定可以处理数千使用 BulkSqlCopy 对象和 IDataReader 每秒行数),因此它应该处理您的请求负载(当然这将取决于您正在编写的内容和您的数据库,但我认为代码可以完成任务!)。

此外,为了方便批量写入,我创建了一个 IDataReader 的最小实现来流式传输我的数据。您需要这样做才能使用下面的代码。

public class DataImporter<T>
{

    public DataImporter(string tableName, string readerName)
    {
        _tableName = tableName;
        _readerName = readerName;
    }

    /// <summary>
    /// This is the size of our bulk staging list.
    /// </summary>
    /// <remarks>
    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,
    /// so records may not be going into the database in sizes of this staging value.
    /// </remarks>
    private int _bulkStagingListSize = 20000;
    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();
    private string _tableName = String.Empty;
    private string _readerName = String.Empty;

    public void QueueForImport(T record)
    {
        lock (_listLock)
        {
            _items.Add(record);
            if (_items.Count > _bulkStagingListSize)
            {
                SaveItems(_items);
                _items = new List<T>();
            }
        }
    }

    /// <summary>
    /// This method should be called at the end of the queueing work to ensure to clear down our list
    /// </summary>
    public void Flush()
    {
        lock (_listLock)
        {
            SaveItems(_items);
            _items = new List<T>();
            while (_tasksWaiting.Count > 64)
            {
                Thread.Sleep(2000);
            }
            WaitHandle.WaitAll(_tasksWaiting.ToArray());
        }
    }

    private void SaveItems(List<T> items)
    {
        ManualResetEvent evt = new ManualResetEvent(false);
        _tasksWaiting.Add(evt);
        IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);
        Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);
        ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);

    }

    private void saveData(object info)
    {
        using (new ActivityTimer("Saving bulk data to " + _tableName))
        {
            Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;
            IDataReader r = stateInfo.Item2;
            try
            {
                Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);
            }
            catch (Exception ex)
            {
                //Do something
            }
            finally
            {
                _tasksWaiting.Remove(stateInfo.Item1);
                stateInfo.Item1.Set();
            }
        }
    }

    private object _listLock = new object();

    private List<T> _items = new List<T>();
}

下面提到的 DataReaderFactory 只是选择了正确的 IDataReader 实现以用于流式传输,如下所示:

internal static class DataReaderFactory
{
    internal static IDataReader GetReader<T>(string typeName, List<T> items)
    {
        IDataReader reader = null;
        switch(typeName)
        {
            case "ProductRecordDataReader":
                reader =  new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;
                break;
            case "RetailerPriceRecordDataReader":
                reader =  new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;
                break;
            default:
                break;
        }
        return reader;
    }
}

我在这种情况下使用的数据读取器实现(尽管此代码适用于任何数据读取器)如下:

/// <summary>
/// This class creates a data reader for ProductRecord data.  This is used to stream the records
/// to the SqlBulkCopy object.
/// </summary>
public class ProductRecordDataReader:IDataReader
{
    public ProductRecordDataReader(List<ProductRecord> products)
    {
        _products = products.ToList();
    }

    List<ProductRecord> _products;

    int currentRow;
    int rowCounter = 0;
    public int FieldCount
    {
        get
        {
            return 14;
        }
    }


    #region IDataReader Members

    public void Close()
    {
        //Do nothing.
    }

    public bool Read()
    {
        if (rowCounter < _products.Count)
        {
            currentRow = rowCounter;
            rowCounter++;
            return true;
        }
        else
        {
            return false;
        }

    }

    public int RecordsAffected
    {
        get { throw new NotImplementedException(); }
    }

    public string GetName(int i)
    {
        switch (i)
        {
            case 0:
                return "ProductSku";
            case 1:
                return "UPC";
            case 2:
                return "EAN";
            case 3:
                return "ISBN";
            case 4:
                return "ProductName";
            case 5:
                return "ShortDescription";
            case 6:
                return "LongDescription";
            case 7:
                return "DFFCategoryNumber";
            case 8:
                return "DFFManufacturerNumber";
            case 9:
                return "ManufacturerPartNumber";
            case 10:
                return "ManufacturerModelNumber";
            case 11:
                return "ProductImageUrl";
            case 12:
                return "LowestPrice";
            case 13:
                return "HighestPrice";
            default:
                return null;
        }

    }

    public int GetOrdinal(string name)
    {
        switch (name)
        {
            case "ProductSku":
                return 0;
            case "UPC":
                return 1;
            case "EAN":
                return 2;
            case "ISBN":
                return 3;
            case "ProductName":
                return 4;
            case "ShortDescription":
                return 5;
            case "LongDescription":
                return 6;
            case "DFFCategoryNumber":
                return 7;
            case "DFFManufacturerNumber":
                return 8;
            case "ManufacturerPartNumber":
                return 9;
            case "ManufacturerModelNumber":
                return 10;
            case "ProductImageUrl":
                return 11;
            case "LowestPrice":
                return 12;
            case "HighestPrice":
                return 13;
            default:
                return -1;
        }

    }

    public object GetValue(int i)
    {
        switch (i)
        {
            case 0:
                return _products[currentRow].ProductSku;
            case 1:
                return _products[currentRow].UPC;
            case 2:
                return _products[currentRow].EAN;
            case 3:
                return _products[currentRow].ISBN;
            case 4:
                return _products[currentRow].ProductName;
            case 5:
                return _products[currentRow].ShortDescription;
            case 6:
                return _products[currentRow].LongDescription;
            case 7:
                return _products[currentRow].DFFCategoryNumber;
            case 8:
                return _products[currentRow].DFFManufacturerNumber;
            case 9:
                return _products[currentRow].ManufacturerPartNumber;
            case 10:
                return _products[currentRow].ManufacturerModelNumber;
            case 11:
                return _products[currentRow].ProductImageUrl;
            case 12:
                return _products[currentRow].LowestPrice;
            case 13:
                return _products[currentRow].HighestPrice;
            default:
                return null;
        }

    }

    #endregion

    #region IDisposable Members

    public void Dispose()
    {
        //Do nothing;
    }

    #endregion

    #region IDataRecord Members

    public bool NextResult()
    {
        throw new NotImplementedException();
    }

    public int Depth
    {
        get { throw new NotImplementedException(); }
    }

    public DataTable GetSchemaTable()
    {
        throw new NotImplementedException();
    }

    public bool IsClosed
    {
        get { throw new NotImplementedException(); }
    }

    public bool GetBoolean(int i)
    {
        throw new NotImplementedException();
    }

    public byte GetByte(int i)
    {
        throw new NotImplementedException();
    }

    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }

    public char GetChar(int i)
    {
        throw new NotImplementedException();
    }

    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }

    public IDataReader GetData(int i)
    {
        throw new NotImplementedException();
    }

    public string GetDataTypeName(int i)
    {
        throw new NotImplementedException();
    }

    public DateTime GetDateTime(int i)
    {
        throw new NotImplementedException();
    }

    public decimal GetDecimal(int i)
    {
        throw new NotImplementedException();
    }

    public double GetDouble(int i)
    {
        throw new NotImplementedException();
    }

    public Type GetFieldType(int i)
    {
        throw new NotImplementedException();
    }

    public float GetFloat(int i)
    {
        throw new NotImplementedException();
    }

    public Guid GetGuid(int i)
    {
        throw new NotImplementedException();
    }

    public short GetInt16(int i)
    {
        throw new NotImplementedException();
    }

    public int GetInt32(int i)
    {
        throw new NotImplementedException();
    }

    public long GetInt64(int i)
    {
        throw new NotImplementedException();
    }

    public string GetString(int i)
    {
        throw new NotImplementedException();
    }

    public int GetValues(object[] values)
    {
        throw new NotImplementedException();
    }

    public bool IsDBNull(int i)
    {
        throw new NotImplementedException();
    }

    public object this[string name]
    {
        get { throw new NotImplementedException(); }
    }

    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }

    #endregion
}

最后批量加载数据方法如下:

    public void BulkLoadData(IDataReader reader, string tableName)
    {
        using (SqlConnection cnn = new SqlConnection(cnnString))
        {
            SqlBulkCopy copy = new SqlBulkCopy(cnn);
            copy.DestinationTableName = tableName;
            copy.BatchSize = 10000;
            cnn.Open();
            copy.WriteToServer(reader);
        }
    }

但是,说了这么多,我建议您不要在 asp.net 中使用此代码,因为有人在另一个答案中指出了原因(特别是在 IIS 中回收工作进程)。我建议您使用一个非常轻量级的队列首先将请求数据发送到另一个不会重新启动的服务(我们使用 ZeroMQ 将请求流式传输并从我正在编写的 ASP.NET 应用程序中记录数据....性能非常好)。

迈克。

【讨论】:

    【解决方案3】:

    我确实看到了一些差异 [...] 由于线程

    这里的基本内容是使用 2 个队列并循环它们。 1 个用于接收,1 个用于插入。您只需要锁定接收,几乎没有争用。

    【讨论】:

    • 我使用一个队列进行接收,而不是将其转储到可以批量插入的列表中。我使用并发队列,所以我不需要锁定。我只在将队列的内容转储到列表时锁定队列,然后批量插入和删除锁定。你怎么看?我能做得更好吗?
    • 好吧,copy-to-list 比 db insert 快,但还是 2 个队列可能会更好。这取决于。并发队列会使切换变得有点棘手。
    • ConcurrentQueue 存在问题,您是否让它变得非常大?见connect.microsoft.com/VisualStudio/feedback/details/552868/…
    【解决方案4】:

    您可以做的其他事情是将其发送到数据库中的磁盘,例如 sqlite(以避免池recicle 问题)并将其发送到您的sql server 数据库。

    我使用响应式扩展来创建插入队列并以良好的速度工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-04-13
      • 2011-01-21
      • 2021-06-24
      • 2016-11-11
      • 2016-01-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多