【问题标题】:How to implement a FIFO queue that supports namespaces如何实现支持命名空间的 FIFO 队列
【发布时间】:2010-10-18 21:45:35
【问题描述】:

我正在使用以下方法来处理基于 Google App Engine db.Model (see this question) 的 FIFO 队列。

from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import run_wsgi_app

class QueueItem(db.Model):
  created = db.DateTimeProperty(required=True, auto_now_add=True)
  data = db.BlobProperty(required=True)

  @staticmethod
  def push(data):
    """Add a new queue item."""
    return QueueItem(data=data).put()

  @staticmethod
  def pop():
    """Pop the oldest item off the queue."""
    def _tx_pop(candidate_key):
      # Try and grab the candidate key for ourselves. This will fail if
      # another task beat us to it.
      task = QueueItem.get(candidate_key)
      if task:
        task.delete()
      return task
    # Grab some tasks and try getting them until we find one that hasn't been
    # taken by someone else ahead of us
    while True:
      candidate_keys = QueueItem.all(keys_only=True).order('created').fetch(10)
      if not candidate_keys:
        # No tasks in queue
        return None
      for candidate_key in candidate_keys:
        task = db.run_in_transaction(_tx_pop, candidate_key)
        if task:
          return task

此队列按预期工作(非常好)。

现在我的代码有一个方法可以访问延迟队列调用的这个 FIFO 队列:

def deferred_worker():
        data= QueueItem.pop()
        do_something_with(data)

我想增强此方法和队列数据结构,添加一个 client_ID 参数,表示需要访问自己的队列的特定客户端。 比如:

def deferred_worker(client_ID):
        data= QueueItem_of_this_client_ID.pop() # I need to implement this
        do_something_with(data)

如何将队列编码为可识别 client_ID?

约束:
- 客户端数量是动态的,不是预定义的
- 任务队列不是一个选项(1. 最多十个队列 2. 我想完全控制我的队列)

您知道如何使用新的Namespaces api 添加此行为吗(请记住,我不是从 webapp.RequestHandler 调用 db.Model)?
另一种选择:我可以在 QueueItem 中添加一个client_ID db.StringProperty,使用它有一个 pull 方法的过滤器:

QueueItem.all(keys_only=True).filter(client_ID=an_ID).order('created').fetch(10)

有更好的主意吗?

【问题讨论】:

  • 在您的应用中,客户端会话是匿名的还是经过身份验证的?
  • @Paulo 这是一个班级客户;我没有用户。
  • 我认为 namespaces api 的目标是多租户问题域,所以如果你将你的类行为映射到这个问题域,那么它是可能的。

标签: python google-app-engine queue fifo


【解决方案1】:

假设您的“客户端类”实际上是客户端调用的请求处理程序,您可以执行以下操作:

from google.appengine.api import users
from google.appengine.api.namespace_manager import set_namespace

class ClientClass(webapp.RequestHandler):
  def get(self):
    # For this example let's assume the user_id is your unique id.
    # You could just as easily use a parameter you are passed.
    user = users.get_current_user()
    if user:
       # If there is a user, use their queue.  Otherwise the global queue.
       set_namespace(user.user_id())

    item = QueueItem.pop()
    self.response.out.write(str(item))

    QueueItem.push('The next task.')

或者,您也可以设置命名空间app-wide

通过设置默认命名空间,对数据存储的所有调用都将在该命名空间“内”,除非您另有明确指定。请注意,要获取和运行任务,您必须知道命名空间。因此,您可能希望在默认命名空间中维护一个命名空间列表以进行清理。

【讨论】:

  • @Robert 我不必在 QueueItem.pop() 语句之后将_namespace(default_namespace) 设置为默认值吗?执行其他数据库操作的并发任务不可能使用错误的命名空间吗?当您调用 set_namespace(user.user_id()) 时,它是为所有应用程序全局设置还是仅为 webapp.RequestHandler 线程设置?
  • 没有。如果您按照建议设置命名空间,set_namespace 将仅适用于 那个 请求。获取命名空间的调用由 API 底层代码根据需要进行。
  • @Robert 如果您查看 Namespaces Api 页面,这里有一个示例,该示例使用 finally 来恢复已保存的命名空间。为什么?
  • 我假设您正在谈论“在数据存储区中”(code.google.com/appengine/docs/python/multitenancy/…) 示例。他们在不同的命名空间中运行请求。它们更新当前命名空间“-global-”命名空间内的计数器。他们希望确保如果更新“-global-”计数器的调用失败,他们会返回“正确”命名空间,否则任何进一步的 API 调用都将在“-global-”命名空间内。
  • 你推迟了任务,对吧? deferred 在后端使用任务队列。任务队列应该保留命名空间。
【解决方案2】:

正如我在回答您对我的原始答案的询问时所说的那样,您无需执行任何操作即可使用命名空间:构建队列的数据存储区已经支持命名空间。只需根据需要设置命名空间,如the docs 中所述。

【讨论】:

  • 对不起,我不明白。当我调用 set_namespace(..) 时,它是否为所有应用程序全局设置?此调用的范围是否会引发并发问题,为其他并发调用设置错误的命名空间?
  • 为当前请求全局设置。在任何情况下,Python 运行时都是单线程的,所以并发请求不是问题。
猜你喜欢
  • 1970-01-01
  • 2023-04-03
  • 2014-08-24
  • 2012-04-30
  • 1970-01-01
  • 2020-03-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多