【问题标题】:Publish/Consume: Wait on Subscribe, filter messages and Dispose发布/消费:等待订阅、过滤消息和处置
【发布时间】:2016-12-24 17:00:56
【问题描述】:

使用 Rx.Net 3

通过使用 Quartz.Net 调度程序,我构建了一个工作流管理器,以通过嵌入式 Web 服务器链接作业(在完成的作业上使用 Quartz Joblistener)。 应用程序实例化一个 Subject 的实例(单例)。

Web 服务获取数据并启动工作流,注入唯一 ID。此唯一 ID 通过工作流传播。 Joblistener 委托检测到特定作业的结束,并在注入的 Subject 实例上调用 OnNext,其 Type 包含唯一 ID 和 DB 表 ID。

这个想法是每次调用的网络服务订阅主题并等待传入​​的消息/事件并根据唯一 ID 过滤它们。找到后处理订阅,收集生成的数据并将其返回给调用者。

如何让我的 Subscribe() 等待传入的消息、过滤它们和 Dispose(),而不会过早完成 Web 服务。

【问题讨论】:

标签: c# .net system.reactive reactivex


【解决方案1】:

您无需手动处理订阅。 任何像TakeFirst 这样的边界运算符都会向OnCompleted 发出信号,这会导致序列被释放。您还可以使用await observables 来避免编写回调。

例如, dbId = await AsyncCommunication.FirstAsync(x => x.Key == id)

【讨论】:

  • 感谢提供一些内部行为。虽然我会遇到并发问题。我需要订阅,调用作业调度程序并等待事件。当首先调用作业调度程序时,我可能会在使用 FirstAsync() 时错过该事件。
【解决方案2】:
// model
public class AsyncCommunicationObject
{
    public string Key { get; }
    public string Value { get; }

    public AsyncCommunicationObject(string key, string value)
    {
        Key = key;
        Value = value;
    }
}

// injectable singleton 
public static Subject<AsyncCommunicationObject> AsyncCommunication { get; set; } = new Subject<AsyncCommunicationObject>();

// in web service           
System.Threading.EventWaitHandle waitHandle = new System.Threading.AutoResetEvent(false);
string yourID = some ID

var subscription = _asyncCommunication (injected)
    .Where(x => x.Key == yourID)
    .Take(1)
    .Subscribe(
        x =>
        {
            dbId = x.Value;
            waitHandle.Set();
        }
    );

_schedulerCore.ExecuteJob(upload.JobId, jobDataMap);

waitHandle.WaitOne();
waitHandle.Reset();

subscription.Dispose();

// in job listener
_asyncCommunication.OnNext(new AsyncCommunicationObject(your ID, some value)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-03-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-03
    • 2016-11-25
    • 2017-07-17
    • 2022-10-08
    相关资源
    最近更新 更多