我已经完成了与您在下面的代码中描述的几乎完全相同的事情。它的线程安全并且有一个刷新方法,您可以调用它来刷新和挂起写入。一旦达到要写入的对象的阈值数量,它就会将队列(在我的情况下为列表)发送到不同的线程进行保存。请注意,它使用 manualResetEvent 来处理最后刷新数据(您可以等待的重置事件限制为 64 个,这就是为什么如果我们有超过 64 个后台线程等待写入,则手动等待,但这应该除非您的数据库真的很慢,否则几乎不会发生)。这段代码用于处理流入其中的千万条记录(从内存中写入 20m 行大约需要 5 分钟,但是在保存服务器上作为数据库运行,所以没有网络跳跃……SQL 肯定可以处理数千使用 BulkSqlCopy 对象和 IDataReader 每秒行数),因此它应该处理您的请求负载(当然这将取决于您正在编写的内容和您的数据库,但我认为代码可以完成任务!)。
此外,为了方便批量写入,我创建了一个 IDataReader 的最小实现来流式传输我的数据。您需要这样做才能使用下面的代码。
public class DataImporter<T>
{
public DataImporter(string tableName, string readerName)
{
_tableName = tableName;
_readerName = readerName;
}
/// <summary>
/// This is the size of our bulk staging list.
/// </summary>
/// <remarks>
/// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,
/// so records may not be going into the database in sizes of this staging value.
/// </remarks>
private int _bulkStagingListSize = 20000;
private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();
private string _tableName = String.Empty;
private string _readerName = String.Empty;
public void QueueForImport(T record)
{
lock (_listLock)
{
_items.Add(record);
if (_items.Count > _bulkStagingListSize)
{
SaveItems(_items);
_items = new List<T>();
}
}
}
/// <summary>
/// This method should be called at the end of the queueing work to ensure to clear down our list
/// </summary>
public void Flush()
{
lock (_listLock)
{
SaveItems(_items);
_items = new List<T>();
while (_tasksWaiting.Count > 64)
{
Thread.Sleep(2000);
}
WaitHandle.WaitAll(_tasksWaiting.ToArray());
}
}
private void SaveItems(List<T> items)
{
ManualResetEvent evt = new ManualResetEvent(false);
_tasksWaiting.Add(evt);
IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);
Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);
ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);
}
private void saveData(object info)
{
using (new ActivityTimer("Saving bulk data to " + _tableName))
{
Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;
IDataReader r = stateInfo.Item2;
try
{
Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);
}
catch (Exception ex)
{
//Do something
}
finally
{
_tasksWaiting.Remove(stateInfo.Item1);
stateInfo.Item1.Set();
}
}
}
private object _listLock = new object();
private List<T> _items = new List<T>();
}
下面提到的 DataReaderFactory 只是选择了正确的 IDataReader 实现以用于流式传输,如下所示:
internal static class DataReaderFactory
{
internal static IDataReader GetReader<T>(string typeName, List<T> items)
{
IDataReader reader = null;
switch(typeName)
{
case "ProductRecordDataReader":
reader = new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;
break;
case "RetailerPriceRecordDataReader":
reader = new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;
break;
default:
break;
}
return reader;
}
}
我在这种情况下使用的数据读取器实现(尽管此代码适用于任何数据读取器)如下:
/// <summary>
/// This class creates a data reader for ProductRecord data. This is used to stream the records
/// to the SqlBulkCopy object.
/// </summary>
public class ProductRecordDataReader:IDataReader
{
public ProductRecordDataReader(List<ProductRecord> products)
{
_products = products.ToList();
}
List<ProductRecord> _products;
int currentRow;
int rowCounter = 0;
public int FieldCount
{
get
{
return 14;
}
}
#region IDataReader Members
public void Close()
{
//Do nothing.
}
public bool Read()
{
if (rowCounter < _products.Count)
{
currentRow = rowCounter;
rowCounter++;
return true;
}
else
{
return false;
}
}
public int RecordsAffected
{
get { throw new NotImplementedException(); }
}
public string GetName(int i)
{
switch (i)
{
case 0:
return "ProductSku";
case 1:
return "UPC";
case 2:
return "EAN";
case 3:
return "ISBN";
case 4:
return "ProductName";
case 5:
return "ShortDescription";
case 6:
return "LongDescription";
case 7:
return "DFFCategoryNumber";
case 8:
return "DFFManufacturerNumber";
case 9:
return "ManufacturerPartNumber";
case 10:
return "ManufacturerModelNumber";
case 11:
return "ProductImageUrl";
case 12:
return "LowestPrice";
case 13:
return "HighestPrice";
default:
return null;
}
}
public int GetOrdinal(string name)
{
switch (name)
{
case "ProductSku":
return 0;
case "UPC":
return 1;
case "EAN":
return 2;
case "ISBN":
return 3;
case "ProductName":
return 4;
case "ShortDescription":
return 5;
case "LongDescription":
return 6;
case "DFFCategoryNumber":
return 7;
case "DFFManufacturerNumber":
return 8;
case "ManufacturerPartNumber":
return 9;
case "ManufacturerModelNumber":
return 10;
case "ProductImageUrl":
return 11;
case "LowestPrice":
return 12;
case "HighestPrice":
return 13;
default:
return -1;
}
}
public object GetValue(int i)
{
switch (i)
{
case 0:
return _products[currentRow].ProductSku;
case 1:
return _products[currentRow].UPC;
case 2:
return _products[currentRow].EAN;
case 3:
return _products[currentRow].ISBN;
case 4:
return _products[currentRow].ProductName;
case 5:
return _products[currentRow].ShortDescription;
case 6:
return _products[currentRow].LongDescription;
case 7:
return _products[currentRow].DFFCategoryNumber;
case 8:
return _products[currentRow].DFFManufacturerNumber;
case 9:
return _products[currentRow].ManufacturerPartNumber;
case 10:
return _products[currentRow].ManufacturerModelNumber;
case 11:
return _products[currentRow].ProductImageUrl;
case 12:
return _products[currentRow].LowestPrice;
case 13:
return _products[currentRow].HighestPrice;
default:
return null;
}
}
#endregion
#region IDisposable Members
public void Dispose()
{
//Do nothing;
}
#endregion
#region IDataRecord Members
public bool NextResult()
{
throw new NotImplementedException();
}
public int Depth
{
get { throw new NotImplementedException(); }
}
public DataTable GetSchemaTable()
{
throw new NotImplementedException();
}
public bool IsClosed
{
get { throw new NotImplementedException(); }
}
public bool GetBoolean(int i)
{
throw new NotImplementedException();
}
public byte GetByte(int i)
{
throw new NotImplementedException();
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public char GetChar(int i)
{
throw new NotImplementedException();
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public IDataReader GetData(int i)
{
throw new NotImplementedException();
}
public string GetDataTypeName(int i)
{
throw new NotImplementedException();
}
public DateTime GetDateTime(int i)
{
throw new NotImplementedException();
}
public decimal GetDecimal(int i)
{
throw new NotImplementedException();
}
public double GetDouble(int i)
{
throw new NotImplementedException();
}
public Type GetFieldType(int i)
{
throw new NotImplementedException();
}
public float GetFloat(int i)
{
throw new NotImplementedException();
}
public Guid GetGuid(int i)
{
throw new NotImplementedException();
}
public short GetInt16(int i)
{
throw new NotImplementedException();
}
public int GetInt32(int i)
{
throw new NotImplementedException();
}
public long GetInt64(int i)
{
throw new NotImplementedException();
}
public string GetString(int i)
{
throw new NotImplementedException();
}
public int GetValues(object[] values)
{
throw new NotImplementedException();
}
public bool IsDBNull(int i)
{
throw new NotImplementedException();
}
public object this[string name]
{
get { throw new NotImplementedException(); }
}
public object this[int i]
{
get { throw new NotImplementedException(); }
}
#endregion
}
最后批量加载数据方法如下:
public void BulkLoadData(IDataReader reader, string tableName)
{
using (SqlConnection cnn = new SqlConnection(cnnString))
{
SqlBulkCopy copy = new SqlBulkCopy(cnn);
copy.DestinationTableName = tableName;
copy.BatchSize = 10000;
cnn.Open();
copy.WriteToServer(reader);
}
}
但是,说了这么多,我建议您不要在 asp.net 中使用此代码,因为有人在另一个答案中指出了原因(特别是在 IIS 中回收工作进程)。我建议您使用一个非常轻量级的队列首先将请求数据发送到另一个不会重新启动的服务(我们使用 ZeroMQ 将请求流式传输并从我正在编写的 ASP.NET 应用程序中记录数据....性能非常好)。
迈克。