【问题标题】:Getting SQLite3 to work with multiple threads让 SQLite3 与多个线程一起工作
【发布时间】:2022-02-01 06:07:27
【问题描述】:

我正在用 Python 制作一个网络爬虫,它收集重定向/链接,将它们添加到数据库中,如果链接不存在,则将它们作为新行输入。我想使用多线程但遇到了麻烦,因为我必须实时检查是否存在具有给定 URL 的条目。

我最初使用sqlite3,但意识到我不能在不同的线程上同时使用它。我真的不想使用 MySQL(或类似的东西),因为它需要更多的磁盘空间并作为单独的服务器运行。有没有办法让sqlite3 使用多个线程?

【问题讨论】:

    标签: python multithreading sqlite multiprocessing


    【解决方案1】:

    Python sqlite3 模块的threadsafety 级别为 1,这意味着虽然不能在线程之间共享数据库连接,但多个线程可以使用 模块 em> 同时。因此,您可以让每个线程创建自己的数据库连接。

    这种方法的问题是 SQLite 的写入并发性很差,因此让多个线程同时执行大量 INSERTs 会给您带来可怕的“数据库已锁定”错误。您可以通过使用PRAGMA JOURNAL_MODE = 'WAL' 来改进一些事情,但仅此而已。

    如果性能是一个问题并且切换到客户端-服务器数据库不是一个选项,那么您可能需要做的是保留 URL 的内存缓存,并安排您的程序,以便您拥有一个线程将此缓存与 SQLite 数据库同步。

    【讨论】:

    • 但是 mysql 呢?它有这个安全功能吗?
    • 我只想推荐@dan04 写的内容并创建一个类来处理该连接。这样,所有线程都可以根据需要实例化该类,从而可以自动设置数据库和连接。
    • 您也可以增加timeout 连接的值,这将延长等待数据库解锁的时间。这会减慢并发写入的速度,但它们最终会全部完成。
    【解决方案2】:

    一种解决方案可能是获取锁以直接从您的程序访问数据库。这样多个线程或进程在执行请求之前会等待其他进程插入链接。

    【讨论】:

    • 不幸的是“你不能吃蛋糕也不能吃”。恐怕您想要一个合适的数据库,或者避免运行数据库。我也不确定你想要实现什么。您如何确定当您发出 SELECT 请求、检查 url 是否存在并最终在 INSERTing 之前执行其他操作时,其他进程不会潜入 INSERT?
    【解决方案3】:

    这是让 sqlite 在多个线程中工作的方法。

    将 BlockingCollection 与 ThreadPool.QueueUserWorkItem 一起使用。 任何数据库查询都按 FIFO(先进先出)顺序排队和执行。 现在,在从任何线程执行任何 SQL 事务时,数据库永远不会被锁定。 这是 C# 中的一个示例。

    public class DatabaseQueueBus
    {
        private BlockingCollection<TransportBean> _dbQueueBus = new BlockingCollection<TransportBean>(new ConcurrentQueue<TransportBean>());
        private CancellationTokenSource __dbQueueBusCancelToken;
    
        public CancellationTokenSource _dbQueueBusCancelToken { get => __dbQueueBusCancelToken; set => __dbQueueBusCancelToken = value; }
    
        public DatabaseQueueBus()
        {
            _dbQueueBusCancelToken = new CancellationTokenSource();
            DatabaseQueue();
    
        }
    
        public void AddJob(TransportBean dto)
        {
            _dbQueueBus.Add(dto);
        }
    
        private void DatabaseQueue()
        {
    
            ThreadPool.QueueUserWorkItem((param) =>
            {
                try
                {
                    do
                    {
                        string job = "";
                        TransportBean dto = _dbQueueBus.Take(_dbQueueBusCancelToken.Token);
                        try
                        {
                            job = (string)dto.DictionaryTransBean["job"];
                            switch (job)
                            {
                                case "SaveClasse":
                                    //Save to table here
                                    break;
                                case "SaveRegistrant":
                                    //Save Registrant here
                                    break;
                              
                            }
                        }
                        catch (Exception ex)
                        {//TODO: Handle this exception or not
    
                        }
    
                    } while (_dbQueueBusCancelToken.Token.IsCancellationRequested != true);
                }
                catch (OperationCanceledException)
                {
    
                }
                catch (Exception ex)
                {
    
                }
            });
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-05-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多