【问题标题】:Data processing stratgy kafka or database or any other persistance数据处理策略 kafka 或数据库或任何其他持久性
【发布时间】:2020-12-17 20:58:25
【问题描述】:

我正在研究大量(异步)数据处理策略,我在这里过度简化了问题-

我得到了一个记录集 - 比如说 -

A-event1
B-event1 
B-event2  
C-event1
C-event2
C-event3
B-event3
A-event2
A-event3
D-event1
D-event2
C-event4
A-event4
A-event4
A-event6
A-eventfinal
B-eventfinal
C-event6
C-event7
C-eventFinal
D-eventFinal

此记录集的转换将是

A-event1      B-event1         C-event1        D-event1
A-event2      B-event2         C-event2        D-event2
A-event3      B-event3         C-event3        D-eventFinal
A-event4      B-eventfinal     C-event4
A-eventFinal                   C-event5
                               C-event6
                               C-event7  
                               C-eventFinal
                   

一旦我得到最终的事件数据,那么只有这个集合准备好进行进一步处理。一旦实体达到最终,它就有资格进行进一步处理。这个单独的集合现在被发送到第三方应用程序,它会得到处理并且在成功完成后,它将返回关闭事件或确认,或者可能失败,因此,这个单独的集合已准备好被清除或保留以进行进一步纠正(如果失败),请注意,确认或关闭可能几天没有收到。所以我必须将这些数据保存在某个地方(可能是数据库、Kafka 或某种等价物)

我在这里使用 A、B C 和 D 作为实体标识符,这可能是数万(如 guid)。我还需要重新处理整个记录集的能力。

我已经阐述的几个选项是

  1. 每个标识符都有一个动态的 Kafka 主题,但任何时候它都可能维护 10 万个主题,我试图避免使用 DB。
  2. 已将整个集合放入一个 Kafka 主题并创建另一个重试主题,一种应用程序 X 不断轮询重试主题。

我对这里的任何数据处理算法持开放态度,更不用说数据丢失是不可接受的。

我明白这个解释有点抽象,请告诉我,如果您需要进一步解释,任何帮助或建议将不胜感激。

我正在寻找一种架构方法。

【问题讨论】:

  • 您需要回答的第一个问题是收到最终事件后进行后期处理的时间。这将告诉您是否可以将数据保存到文件中,或者您必须与内存中的数据相同。您必须确定执行算法所需的速度与内存以及机器中的内存量与内存量。然后确定算法。
  • 是否可以假设,所有事件的结构都是相同的,它们都有一个 id,闭包返回一个 id?
  • 是的,是的。这是正确的。每个集合的 ID 都是唯一的,这就是它如何记录为应用程序的审计跟踪。
  • @jdweng 后处理不是时间敏感的,而是基于准确性的。
  • 那么内存呢?

标签: c# .net apache-kafka architecture software-design


【解决方案1】:

您的描述对细节有点轻描淡写。但是,您可以使用数据库和某种管道(选择您的毒药)轻松解决此问题

在这个非常人为的示例中,我使用了 Dataflow,您可以使用您喜欢的任何结构或框架,但是问题仍然相同。在示例中,Dataflow 有一些事情可以毫不费力地完成。

  • 可以使用 async 和 await 模式。
  • 有序(或不有序)处理事物
  • 可以使用队列处理,可以并行处理
  • 配置最大并行度
  • 可以创建永久管道
  • 可以取消令牌等等

我不得不做出很多假设,并留下了很多想象空间。

  • 您需要考虑容错性
  • 实施取消制度
  • 调整并行度和其他选项
  • 为事件实现数据库
  • 如果您的进程出现故障,则具有故障转移和重新启动机制

示例

public enum EventType
{
   Event,
   Final,
   Finished,
   Error
}

public class EventMessage
{
   public int GroupId { get; set; }
   public int EventId { get; set; }
   public string Payload { get; set; }
   public EventType EventType { get; set; }
}

public static ConcurrentDictionary<int,List<EventMessage>> _dataStore = new ConcurrentDictionary<int,List<EventMessage>>();
private static BufferBlock<EventMessage> _start;
private static ActionBlock<EventMessage> _persistBlock;
private static ActionBlock<EventMessage> _processBlock;
private static ActionBlock<EventMessage> _finalizeBlock;
private static TransformBlock<EventMessage, EventMessage> _reprocessBlock;
private static TransformBlock<EventMessage, EventMessage> _queue;
private static Random _r = new Random();


static async Task Main(string[] args)
{

   // this is just a buffer that can receive asynchronous events
   _start = new BufferBlock<EventMessage> (new DataflowBlockOptions(){EnsureOrdered = true});

   // we need an orderly queue, the bounded capacity is 1 so we can process events in order 
   // ie so you don't process the final before all events are recevied
   _queue = new TransformBlock<EventMessage, EventMessage>(message => message, new ExecutionDataflowBlockOptions(){BoundedCapacity = 1});

   // save your events to the database
   _persistBlock = new ActionBlock<EventMessage>(PersistAction, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

   // process the final event
   _processBlock = new ActionBlock<EventMessage>(ProcessAction);

   // process the event from the 3rd party service
   _finalizeBlock = new ActionBlock<EventMessage>(FinalizeAction);

   // reprocess on failure or whatever you need to do
   _reprocessBlock = new TransformBlock<EventMessage, EventMessage>(Reprocess);

   // link it all together
   _start.LinkTo(_queue);
   _queue.LinkTo(_persistBlock, (x) => x.EventType == EventType.Event);
   _queue.LinkTo(_processBlock, (x) => x.EventType == EventType.Final);
   _queue.LinkTo(_finalizeBlock, (x) => x.EventType == EventType.Finished);
   _queue.LinkTo(_reprocessBlock, (x) => x.EventType == EventType.Error);
   _reprocessBlock.LinkTo(_start);

   // create some events
   var tasks= Enumerable.Range(1, 5).Select(CreateEvents);

   await Task.WhenAll(tasks);

   Console.ReadKey();
}

private static async Task CreateEvents(int groupId)
{
   var events = Enumerable
      .Range(1, _r.Next(2, 5))
      .Select(x => new EventMessage()
      {
         GroupId = groupId,
         EventId = x,
         EventType = EventType.Event
      });
   foreach (var e in events)
   {
      await Task.Delay(_r.Next(10, 100));
      await _start.SendAsync(e);
   }

   await _start.SendAsync(new EventMessage()
   {
      GroupId = groupId,
      Payload = $"Final Event",
      EventType = EventType.Final
   });
}
private static EventMessage Reprocess(EventMessage e)
{
   // the event come back as an error, so we push it back on the the queue
   Console.WriteLine($"Reprocessing group : {e.GroupId}");
   e.EventType = EventType.Final;
   e.Payload = e.Payload + " Error";
   return e;
}

private static async Task PersistAction(EventMessage e)
{
   // this is simulating saving the event to a db
   Console.WriteLine($"Saving event : {e.GroupId}:{e.EventId}");
   await Task.Delay(_r.Next(10, 100));
   _dataStore.AddOrUpdate(e.GroupId,
      (x) => new List<EventMessage>() {e},
      (x, l) =>
      {
         l.Add(e);
         return l;
      });
}
private static async Task ProcessAction(EventMessage e)
{
   // this is simulating reading all the events for that group from the db
   // and sending to your 3rd service
   Console.WriteLine($"Sending to service : {e.GroupId}");

   await Task.Delay(_r.Next(10, 100));

   // this is simulating receiving a result from the 3rd party service 
   // just pushes the event back in to the queue, to be finialised or reprocessed
   // choose randomly if it was a success or failure
   // obviously this would be called by something else, possibly your message queue
   if (_r.Next(0, 2) == 0)
      e.EventType = EventType.Finished;
   else
      e.EventType = EventType.Error;


   Console.WriteLine($"Service returned : {e.GroupId}, {e.EventType}");

   await _start.SendAsync(e);
}
private static void FinalizeAction(EventMessage e)
{
 // pruge the records, we are all done
   _dataStore.TryRemove(e.GroupId, out var l);

   Console.WriteLine($"*** Finalize : {e.GroupId} - {string.Join(",", l.Select(x => x.EventId))}");
}

输出

Saving event : 4:1
Saving event : 1:1
Saving event : 4:2
Saving event : 1:2
Saving event : 5:1
Saving event : 5:2
Saving event : 3:1
Saving event : 2:1
Saving event : 1:3
Saving event : 5:3
Sending to service : 1
Saving event : 5:4
Service returned : 1, Error
Sending to service : 5
Saving event : 2:2
Service returned : 5, Error
Saving event : 3:2
Saving event : 4:3
Saving event : 4:4
Sending to service : 4
Saving event : 2:3
Service returned : 4, Error
Saving event : 3:3
Sending to service : 3
Saving event : 2:4
Reprocessing group : 1
Reprocessing group : 5
Reprocessing group : 4
Service returned : 3, Error
Sending to service : 2
Reprocessing group : 3
Service returned : 2, Finished
Sending to service : 1
*** Finalize : 2 - 1,2,3,4
Service returned : 1, Finished
Sending to service : 5
*** Finalize : 1 - 1,2,3
Service returned : 5, Finished
Sending to service : 4
*** Finalize : 5 - 1,2,3,4
Service returned : 4, Finished
Sending to service : 3
*** Finalize  : 4 - 1,2,3,4
Service returned : 3, Error
Reprocessing group : 3
Sending to service : 3
Service returned : 3, Finished
*** Finalize : 3 - 1,2,3

注意:这只是一个示例,它并不意味着是一个完整的解决方案或数据流的建议,甚至你应该如何解决它。这只是为了让您了解结构化管道。

【讨论】:

    猜你喜欢
    • 2017-01-09
    • 1970-01-01
    • 2012-05-03
    • 1970-01-01
    • 2017-10-19
    • 2013-02-10
    • 2021-10-23
    • 2010-09-24
    • 2021-02-26
    相关资源
    最近更新 更多