【发布时间】:2020-11-29 22:11:11
【问题描述】:
我即将使用sqlalchemy 创建查询select join,例如:
SELECT position.*, device.name as devicename
FROM `position` JOIN `device`
ON position.id_device = device.id
ORDER BY position.id_device
所以在 python 文件中,我创建如下查询:
result = sql.query(Position, Device).\
join(Device, Position.id_device == Device.id).\
order_by(Position.id_device).first()
我得到了结果:
[
{
"id": 1,
"size": "600x300",
"price": 150,
"id_category": 0,
"id_device": 1,
"impression": 9999,
"status": 1,
"name": "Home Top Banner",
"id_website": 1
},
{
"deleted_at": null,
"status": 1,
"name": "Desktop",
"id": 1
}
]
我想要的结果作为上面的mysql 查询是:
[
{
"id": 1,
"size": "600x300",
"price": 150,
"id_category": 0,
"id_device": 1,
"impression": 9999,
"status": 1,
"name": "Home Top Banner",
"id_website": 1,
"devicename": "Desktop",
}
]
主要代码流程:
[...]
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
[...]
from sqlalchemy.orm import Session
def db_session() -> Generator:
"""
Get database connection with DI (Dependencies Injection)
"""
try:
dbsession = SessionLocal()
yield dbsession
finally:
dbsession.close()
@router.get('/{ad_type}')
def ad_unit(
ad_type: str,
sql: Session = Depends(db_session)
):
"""Display ads unit of the available product"""
result = get_position(sql, ad_type=ad_type)
return result
[...]
def get_position(sql: Session, ad_type: str):
result = sql.query(Position, Device).\
join(Device, Position.id_device == Device.id).\
order_by(Position.id_device).first()
[...]
模型类:
from sqlalchemy import Column
from sqlalchemy.dialects import mysql
from app.settings.mysql_settings import Base
class Position(Base):
"""A class used to represent position table (ads price list & dimensions)"""
__tablename__ = 'position'
id = Column(mysql.INTEGER(display_width=11, unsigned=True), primary_key=True,
index=True, autoincrement=True)
id_device = Column(mysql.INTEGER(display_width=11), nullable=True)
id_website = Column(mysql.INTEGER(display_width=11), nullable=True)
id_category = Column(mysql.INTEGER(display_width=11), nullable=True)
name = Column(mysql.VARCHAR(length=50, collation="utf8_unicode_ci"), nullable=True)
price = Column(mysql.INTEGER(display_width=11), nullable=True)
status = Column(mysql.TINYINT(display_width=1), nullable=True, default=1)
size = Column(mysql.TEXT(collation="utf8_unicode_ci"), nullable=True)
impression = Column(mysql.DOUBLE(), nullable=False, default=0)
class Device(Base):
"""A class used to represent a list of device type for ads unit e.g: Mobile, Desktop, etc."""
__tablename__ = 'device'
id = Column(mysql.INTEGER(display_width=11), primary_key=True,
index=True, autoincrement=True)
name = Column(mysql.VARCHAR(length=50, collation="latin1_swedish_ci"), nullable=True)
status = Column(mysql.TINYINT(display_width=1), nullable=True, default=1)
deleted_at = Column(mysql.TIMESTAMP(), nullable=True)
我尝试了很多方法来获得我想要的结果,通过合并/联合结果和其他方法,但它不起作用。我认为有一种特定的方法可以通过sqlalchemy 获得我想要的结果,但我在文档中找不到它,因为参考资料太多了。
任何帮助将不胜感激,谢谢。
【问题讨论】:
-
一种 – hacky – 取决于您如何序列化为 JSON(未包含在您的问题中)的方式是
query(Position.__table__, Device.name.label('devicename'))。 -
我得到了一个数组
[ 1, 1, 1, 0, "Home Top Banner", 150, 1, "600x300", 9999, "Desktop" ],而不是一个对象数组 -
那是您的 JSON 编码器将结果对象视为一个元组(在某种程度上它是)。如果您添加一个将row proxy 转换为dict 列表的步骤,它将如您所愿。比如
result = [dict(result)]。 -
嗯,也许你是对的,我稍后会弄清楚
-
@metaphor 如果我查看the examples here,您似乎不应该将
Device包含两次。试试这个:sql.query(Position).join(Device, Position.id_device == Device.id).order_by(Position.id_device).first()
标签: python mysql sqlalchemy fastapi