【问题标题】:How send a message from eventhub to another eventhub?如何将消息从 eventthub 发送到另一个 eventthub?
【发布时间】:2016-10-17 18:01:23
【问题描述】:

我想发送 eventthub 客户,然后将其下载到天气等示例数据并发送另一个 eventthub。我的代码无法正常工作。没有错误但数据没有发送到数据库。

public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
                try
            {
                foreach (EventData message in messages)
                {
                        string data = Encoding.UTF8.GetString(message.GetBytes());
                        NewClient Client = JsonConvert.DeserializeObject<NewClient>(data);
                    if (Client.City != null && Client.Street != null )
                    {

                        GoogleGeoApi GeoClient = new GoogleGeoApi();
                        GeoClient.SetCoordinates(Client.City, Client.Street);
                        WeatherApi WeatherApiobject = new WeatherApi();
                        WeatherApiobject.GetJson(GeoClient.convertlat, GeoClient.convertlng);
                        string weatherdata = WeatherApiobject.sendEvent;
                        SenderEvent NewSenderEvent = new SenderEvent();
                        NewSenderEvent.DataSender(weatherdata, ConstFile.WeatherEventHubName);
                        //StartH(ConstFile.WeatherEventHubName).Wait();
                    }

                    Interlocked.Increment(ref this.totalMessages);
                    this.LastMessageOffset = message.Offset;
                }

                if (this.IsClosed)
                {
                    this.IsReceivedMessageAfterClose = true;
                }

                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(1))
                {
                    lock (this)
                    {
                        this.checkpointStopWatch.Reset();
                        return context.CheckpointAsync();
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("{0} > Event Hub Exception: {1}", DateTime.Now, ex.Message);
            }

            return Task.FromResult<object>(null);
        }

我将添加我的接收器 eventthub 如下所示: https://code.msdn.microsoft.com/Service-Bus-Event-Hub-45f43fc3/view/SourceCode#content

【问题讨论】:

  • 这个类SenderEvent是什么,那个类负责将数据发送到另一个EventHub吗?如果是这样,请发布该代码。
  • 粘贴下面的代码。两种消息(客户端和天气数据)的 SenderEvent 相同
  • 应用程序是否无一例外地到达粘贴代码的eventhubclient.Send(data1); 行?什么进程正在监听第二个事件中心?
  • 希望您能很好理解。我没有任何例外。在下面的评论中,我添加了代码 eventthub 监听。

标签: c# azure azure-eventhub


【解决方案1】:
 public class SenderEvent
    {
        public void DataSender(string data, string eventhubname)
        {
            var eventhubclient = EventHubClient.CreateFromConnectionString(ConstFile.eventHubConnectionString, eventhubname);       
            EventData data1 = new EventData(Encoding.UTF8.GetBytes(data));
            eventhubclient.Send(data1);
        }

【讨论】:

    【解决方案2】:

    您可以使用流分析非常轻松地做到这一点!您的第一个事件中心是流分析中的 input。然后,您可以针对流编写 query(从 [Input1] 中选择 * ...这将为您提供一切)。您可以output 将流返回到另一个事件中心。

    【讨论】:

      【解决方案3】:
      private static async Task StartHost(string eventHubName)
              {
                  string eventProcessorHostName = "1";
                  string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", ConstFile.storageAccountName, ConstFile.storageAccountKey);
                  host = new EventProcessorHost(
                      eventProcessorHostName,
                      eventHubName,
                      ConstFile.ConsumerGroup,
                      ConstFile.eventHubConnectionString,
                      storageConnectionString, eventHubName.ToLowerInvariant());
      
                  factory = new DemoEventProcessorFactory(eventProcessorHostName);
      
                  try
                  {
                      var options = new EventProcessorOptions();
                      options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
                      await host.RegisterEventProcessorFactoryAsync(factory);
                  }
                  catch (Exception exception)
                  {
                      Console.ForegroundColor = ConsoleColor.Red;
                      Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message);
                      Console.ResetColor();
                  }
              }
          }
      }
      

      【讨论】:

        【解决方案4】:

        这取决于:

        1. 如果您的事件中心是低延迟 - 您可以使用事件中心触发器(天蓝色 每当事件中心收到新的消息时将运行的函数 事件)。在您的 azure func 中,您的使用代码类似于 the example
        2. 如果您有很多活动 - 您可以尝试 azure stream 分析过滤掉您的天气数据并将其发送到不同的 活动中心。您可以像这样构建管道:

          EventHub1 -> AzureStreamAnalytics -> EventHub2

        【讨论】:

          猜你喜欢
          • 2021-08-26
          • 2020-12-14
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多