【问题标题】:Filter by id in graphene-sqlalchemy query在 graphene-sqlalchemy 查询中按 id 过滤
【发布时间】:2023-03-30 15:17:01
【问题描述】:

如何设置 graphene-sqlalchemy 以通过 id 过滤对象?

我想运行查询:

{
  marker(markerId: 1) {
    markerId
    title
  }
}

我希望得到一个 MarkerId 为 1 的 Marker 对象,但我在“Query”类型的字段“marker”上收到错误“Unknown argument“markerId”。”

我有两个文件:

schema.py

import graphene
from graphene_sqlalchemy import SQLAlchemyObjectType
from model import db_session, Marker as MarkerModel

class Marker(SQLAlchemyObjectType):
    class Meta:
        model = MarkerModel

class Query(graphene.ObjectType):
    marker = graphene.Field(Marker)
    markers = graphene.List(Marker)

    def resolve_markers(self, args, context, info):
        return db_session.query(MarkerModel).all()

    def resolve_marker(self, args, context, info):
        return db_session.query(MarkerModel).first()

schema = graphene.Schema(query=Query)

模型.py

import sqlalchemy as db
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from instance.config import settings

engine = db.create_engine(settings["SQLALCHEMY_DATABASE_URI"])
sm = sessionmaker(bind=engine)
db_session = scoped_session(sm)

Base = declarative_base()

Base.query = db_session.query_property()


class Marker(Base):
    __tablename__ = "marker"
    marker_id = db.Column(db.Integer, primary_key=True)
    latitude = db.Column(db.Float)
    longitude = db.Column(db.Float)
    title = db.Column(db.String(100))
    blurb = db.Column(db.String(65535))

    def __repr__(self):
        return "<Marker %d: %s>".format([self.marker_id, self.title])

感谢您的帮助!

【问题讨论】:

    标签: sqlalchemy graphql graphene-python


    【解决方案1】:

    可以在https://github.com/somada141/demo-graphql-sqlalchemy-falcon 下找到完整的工作演示。

    考虑以下 SQLAlchemy ORM 类:

    class Author(Base, OrmBaseMixin):
        __tablename__ = "authors"
    
        author_id = sqlalchemy.Column(
            sqlalchemy.types.Integer(),
            primary_key=True,
        )
    
        name_first = sqlalchemy.Column(
            sqlalchemy.types.Unicode(length=80),
            nullable=False,
        )
    
        name_last = sqlalchemy.Column(
            sqlalchemy.types.Unicode(length=80),
            nullable=False,
        )
    

    像这样简单地包裹在SQLAlchemyObjectType 中:

    class TypeAuthor(SQLAlchemyObjectType):
        class Meta:
            model = Author
    

    并通过以下方式暴露:

    author = graphene.Field(
        TypeAuthor,
        author_id=graphene.Argument(type=graphene.Int, required=False),
        name_first=graphene.Argument(type=graphene.String, required=False),
        name_last=graphene.Argument(type=graphene.String, required=False),
    )
    
    @staticmethod
    def resolve_author(
        args,
        info,
        author_id: Union[int, None] = None,
        name_first: Union[str, None] = None,
        name_last: Union[str, None] = None,
    ):
        query = TypeAuthor.get_query(info=info)
    
        if author_id:
            query = query.filter(Author.author_id == author_id)
    
        if name_first:
            query = query.filter(Author.name_first == name_first)
    
        if name_last:
            query = query.filter(Author.name_last == name_last)
    
        author = query.first()
    
        return author
    

    一个 GraphQL 查询,例如:

    query GetAuthor{
      author(authorId: 1) {
        nameFirst
      }
    }
    

    将产生响应:

    {
      "data": {
        "author": {
          "nameFirst": "Robert"
        }
      }
    }
    

    如您所见,您可以在 graphene.Field 实例化期间通过 graphene.Argument 类传递命名参数,这些参数也必须在解析器方法中命名。然而,将两者结合起来,您可以进行各种过滤。

    【讨论】:

      【解决方案2】:

      定义查询时需要指定marker_id作为查询参数。

      ...
      class Query(graphene.ObjectType):
          marker = graphene.Field(Marker, marker_id=graphene.String())
          markers = graphene.List(Marker)
      
          def resolve_markers(self, args, context, info):
              return db_session.query(MarkerModel).all()
      
          def resolve_marker(self, args, context, info, marker_id):
              return db_session.query(MarkerModel).filter(MarkerModel.id == marker_id).first()
      ...
      

      【讨论】:

        【解决方案3】:

        我没有使用过 sql-alchemy,但我想您必须将 Node 接口添加到您的模型中,例如:

        class Marker(SQLAlchemyObjectType):
            class Meta:
                model = MarkerModel
                interfaces = (Node,)
        

        【讨论】:

          【解决方案4】:

          用于过滤石墨烯的通用解决方案。在网上阅读了大量的 cmets 之后,我做了这个 -

          考虑这是你的对象,它引用了一个名为 Post 的 db.Model

          class PostObject(SQLAlchemyObjectType):
              class Meta:
                  model = Post
                  interfaces = (graphene.relay.Node, )
          

          然后进行查询:

          class Query(graphene.ObjectType):
              node = graphene.relay.Node.Field()
              all_posts = FilteredConnectionField(PostObject)
          

          把它写成一个单独的文件中的类

          import graphene
          import sqlalchemy
          from graphene_sqlalchemy import SQLAlchemyConnectionField
          
          class FilteredConnectionField(SQLAlchemyConnectionField):
          
              def __init__(self, type, *args, **kwargs):
                  fields = {}
                  columns = input_type._meta.model.__table__.c
                  for col in columns:
                      fields[col.name] = self.sql_graphene_type_mapper(col.type)
                  kwargs.update(fields)
                  super().__init__(type, *args, **kwargs)
          
              @classmethod
              def get_query(cls, model, info, sort=None, **args):
                  query = super().get_query(model, info, sort=sort, **args)
                  omitted = ('first', 'last', 'hasPreviousPage', 'hasNextPage', 'startCursor', 'endCursor')
                  for name, val in args.items():
                      if name in omitted: continue
                      col = getattr(model, name, None)
                      if col:
                          query = query.filter(col == val)
                  return query
          
              def sql_graphene_type_mapper(self, col_type):
                  if isinstance(col_type, sqlalchemy.Integer): return graphene.Int()
                  elif isinstance(col_type, sqlalchemy.Boolean): return graphene.Boolean()
                  elif isinstance(col_type, sqlalchemy.DateTime): return graphene.types.DateTime()
                  elif isinstance(col_type, (sqlalchemy.FLOAT, sqlalchemy.BIGINT, sqlalchemy.NUMERIC )): return graphene.Float()
                  else:
                      return graphene.String()
          

          我希望这个课程可以帮助其他人。 更多实例类型转换映射可以在线搜索和浏览grapheneconverter.py文件。

          【讨论】:

            【解决方案5】:

            如果您想要按 ID 显示对象,您可以使用石墨烯继电器。在这种情况下,ID 将是中继节点 ID,您可以根据自己的 ID 需要进行更改。

            你可以在下面的链接中得到一个很好的例子:

            https://github.com/alexisrolland/flask-graphene-sqlalchemy

            【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2023-03-11
            • 2018-08-09
            • 2019-06-10
            • 2019-10-27
            • 2015-01-15
            • 2011-11-27
            相关资源
            最近更新 更多