【问题标题】:Azure Event Grid subscription to Console ApplicationAzure 事件网格订阅控制台应用程序
【发布时间】:2021-02-22 02:36:45
【问题描述】:

我想在C#控制台应用程序中订阅Azure事件网格,实际上我正在实现eShopContainer项目中的EventBus示例,我需要订阅一个主题并监听消息,处理并打印发送的消息之前用于实现 EventBus 的另一个 C# 控制台应用程序。那么,¿如何使用 C# 控制台应用程序做到这一点?

这是我将消息存储在队列存储中的天蓝色门户:

azure portal subscriptions

这是所有消息所在的队列:

all messages

所以,我需要订阅并获取所有消息!

【问题讨论】:

标签: c# azure-eventgrid


【解决方案1】:

在 Azure 事件网格模型中使用控制台订阅者基本上有三种方法。下图展示了它们:

请注意,hybrid connectionngrok tunnel 在我的Azure Event Grid Tester 中使用。看看他们的实现。

以下代码 sn -p 是在控制台应用程序中使用 HybridConnectionListener 的示例:

using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
            HybridConnectionListener listener = null;

            try
            {
                listener = new HybridConnectionListener(connectionString);
                listener.Connecting += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.White;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Online += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Offline += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Blue;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline, listener:{listener.Address}");
                    Console.ResetColor();
                };

                listener.RequestHandler = (context) =>
                {
                    try
                    {
                        if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type", StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"], "Notification", StringComparison.CurrentCultureIgnoreCase))
                            throw new Exception("Received message is not for EventGrid subscriber");

                        string jsontext = null;
                        using (var reader = new StreamReader(context.Request.InputStream))
                        {
                            var jtoken = JToken.Parse(reader.ReadToEnd());
                            if (jtoken is JArray)
                                jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JObject)
                                jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JValue)
                                throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
                        }

                        Console.ForegroundColor = ConsoleColor.DarkYellow;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ", context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine($"{jsontext}");
                                             
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
                    }
                    finally
                    {
                        context.Response.StatusCode = HttpStatusCode.NoContent;
                        context.Response.Close();
                        Console.ResetColor();
                    }
                };
                await listener.OpenAsync(TimeSpan.FromSeconds(60));
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                Console.ResetColor();
            }

            Console.ReadLine();

            if(listener != null)
                await listener.CloseAsync();
        }
    }
}

在事件处理程序目标的 AEG 订阅中使用混合连接,所有事件都将传递到控制台应用程序,如下面的屏幕 sn-p 所示:

更新:

下面的例子展示了一个订阅者的实现,它带有一个到 signalR 服务的输出绑定。在这种情况下,我们需要构建两个 HttpTrigger 函数,一个用于订阅者,另一个用于 signalR 客户端,以获取特定 userId 的 url 和访问令牌:

  1. HttpTriggerGetSignalRinfo 函数:

运行.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"

using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, SignalRConnectionInfo connectionInfo, ILogger log)
{
    log.LogInformation($"Info.Url={connectionInfo.Url}");

    return new OkObjectResult(new 
    { 
        url = connectionInfo.Url, 
        accessToken = connectionInfo.AccessToken,
    }); 
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get"
      ]
    },
    {
      "type": "signalRConnectionInfo",
      "name": "connectionInfo",
      "hubName": "%AzureSignalRHubName%",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "userId": "{query.userid}",
      "direction": "in"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}
  1. signalR 客户端 - 控制台应用:

     using Microsoft.AspNetCore.SignalR.Client;
     using Newtonsoft.Json;
     using Newtonsoft.Json.Linq;
     using System;
     using System.Configuration;
     using System.Net.Http;
     using System.Threading.Tasks;
    
     namespace ConsoleApp4
     {
         class Program
         {
             static async Task Main(string[] args)
             {
                 HubConnection connection = null;
                 string userId = ConfigurationManager.AppSettings.Get("userId");
                 string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo");
    
                 try
                 {
                     using (var client = new HttpClient())
                     {
                         var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}");
                         string jsontext = await rsp.Content.ReadAsStringAsync();
                         var info = JsonConvert.DeserializeAnonymousType(jsontext, new { url = "", accessToken = "" });
    
                         connection = new HubConnectionBuilder()
                             .WithUrl(info.url, option =>
                             {
                             option.AccessTokenProvider = () =>
                                 {
                                     return Task.FromResult(info.accessToken);
                                 };
                             }).Build();
    
                         Console.ForegroundColor = ConsoleColor.Green;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}");
                         Console.ResetColor();
                     }
    
                     connection.On<string, string>("SendMessage", (string headers, string message) =>
                     {
                         Console.ForegroundColor = ConsoleColor.DarkYellow;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}");
                         Console.ForegroundColor = ConsoleColor.Yellow;
                         Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}");
                         Console.ResetColor();
                     });
    
                     await connection.StartAsync();              
                 }
                 catch (Exception ex)
                 {
                     Console.ForegroundColor = ConsoleColor.Red;
                     Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                     Console.ResetColor();
                 }
                 Console.ReadLine();
                 if (connection != null)
                     await connection.StopAsync();
             }       
         }
     }
    
  2. HttpTriggerSendMsgToSignalR 函数 - 订阅者

运行.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, IAsyncCollector<SignalRMessage> signalRMessages, ILogger log)
{   
    string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}")); 
    log.LogInformation($"Method: {req.Method} Headers: {headers}");    
          
    if (req.Method == HttpMethod.Options.ToString())
    {
        log.LogInformation("CloudEventSchema validation");               
        req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
        return (ActionResult)new OkResult();
    }
    
    var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
    string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim(); 

    if(eventTypeHeader == "SubscriptionValidation") 
    {       
        if(jtoken is JArray)
            jtoken = jtoken.SingleOrDefault<JToken>();

        if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
        {
            log.LogInformation("EventGridSchema validation");
            return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});         
        }           
        return new BadRequestObjectResult($"Not valid event schema");
    }   
    else if(eventTypeHeader == "Notification") 
    {          
        await signalRMessages.AddAsync(
            new SignalRMessage
            {
                // the message will only be sent to these user IDs or if this property not exit, the bindig path will be used it
                Target = "SendMessage",
                Arguments = new[] { headers, jtoken.ToString() }
            });        
        return (ActionResult)new OkResult();  
    }
     
    return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "options",
        "post"
      ]
    },
    {
      "type": "signalR",
      "name": "signalRMessages",
      "hubName": "%AzureSignalRHubName%/users/{query.userid}",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "direction": "out"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}

注意,webhook 事件处理程序用于订阅者有两个原因,例如传递 CloudEvent 消息和通过 url 查询字符串参数配置 signalR 客户端 userId。

  1. 在控制台应用上显示 userid=abcd 的事件:

请注意,signalR 客户端实例允许为同一用户 ID 多播消息,而不是混合连接,其中消息在侦听器实例之间平衡。

【讨论】:

  • 谢谢兄弟!你救了我!
  • 我刚刚为 signalR 服务添加了一个实现。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-01-26
  • 2019-12-23
  • 2020-02-22
  • 2018-12-12
  • 2020-11-21
  • 1970-01-01
  • 2018-10-10
相关资源
最近更新 更多