【问题标题】:SQLAlchemy - mapping one class to two tablesSQLAlchemy - 将一个类映射到两个表
【发布时间】:2025-12-04 20:55:02
【问题描述】:

我有两个数据库队列实现(它们使用不同的表)并希望它们使用同一类的对象。所以,它们看起来真的很相似:

class AbstractDBQueue(object):
    def __init__(self, tablename):
        self.tablename = tablename
        self.metadata = MetaData()
        self.engine = create_engine('mysql+mysqldb://%s:%s@%s:%d/%s' % (
            settings.DATABASE.get('USER'),
            settings.DATABASE.get('PASSWORD'),
            settings.DATABASE.get('HOST') or '127.0.0.1',
            settings.DATABASE.get('PORT') or 3306,
            settings.DATABASE.get('NAME')
        ), encoding='cp1251', echo=True, pool_recycle=7200)
        self.metadata.bind = self.engine
        self.session = sessionmaker(bind=self.engine)()

    def setup_table(self, table, entity_name):
        self.table = table
        newcls = type(entity_name, (SMSMessage, ), {})
        mapper(newcls, table)
        return newcls

    def put(self, message=None, many_messages=[]):
        if message:
            self.session.add(message)
        else:
            for m in many_messages:
                self.session.add(m)
        self.session.commit()

    def get(self, limit=None):
        if limit:
            q = self.session.query(self.SMSClass).limit(limit)
        else:
            q = self.session.query(self.SMSClass)
        smslist = []
        for sms in q:
            smslist.append(sms)
        self.session.expunge_all()
        return smslist

class DBQueue(AbstractDBQueue):
    """
    MySQL database driver with queue interface
    """
    def __init__(self):
        self.tablename = settings.DATABASE.get('QUEUE_TABLE')
        super(DBQueue, self).__init__(self.tablename)
        self.logger = logging.getLogger('DBQueue')
        self.SMSClass = self.setup_table(Table(self.tablename, self.metadata, autoload=True), "SMSQueue")

class DBWorkerQueue(AbstractDBQueue):
    """
    MySQL database driver with queue interface for separate workers queue
    """

    def __init__(self):
        self.tablename = settings.DATABASE.get('WORKER_TABLE')
        super(DBWorkerQueue, self).__init__(self.tablename)
        self.logger = logging.getLogger('DBQueue')
        self.SMSClass = self.setup_table(Table(self.tablename, self.metadata, autoload=True), "SMSWorkerQueue")

    def _install(self):
        self.metadata.create_all(self.engine)

SMSMessage 是我要使用的类的名称。 ma​​p_class_to_table() 函数是我在 SQLAlchemy 文档中发现的一个 hack:http://www.sqlalchemy.org/trac/wiki/UsageRecipes/EntityName

但这似乎没有帮助 - 当第一个队列实例将 SMSMessage 映射到它的表时,我传递给第二个队列的 put() 的所有对象都是隐式的投射到第一个队列的映射类,第二个数据库在 session.commit() 之后仍然是空的。

我需要同时使用两个队列,甚至可能使用线程(我认为,池连接会很有用),但我无法完成这项工作。请问您能帮忙吗?

【问题讨论】:

  • 嗯,如果先用第二个队列,是不是倒序了?
  • 我刚刚使用继承重写了代码,现在它只是引发了一个错误“类'sms.message.SMSMessage'未映射”。我应该将输入对象转换为映射类吗?看起来很奇怪。

标签: python mysql sqlalchemy


【解决方案1】:

我认为您的问题与 tablename 变量有关。这是一个class variable,它在您创建类时被定义,并且不会改变。因此,当您的两个实例使用self.tablename 访问它时,这将是相同的。要解决此问题,请将其移动到 init 函数中,并将其设为 self.tablename。每次创建新对象时都会对其进行初始化。

【讨论】:

  • 我已经更新了主题中的代码 - 重写为对两个队列类使用继承,同时将表名作为构造函数参数传递。但它也不起作用。