【发布时间】:2021-11-18 18:19:16
【问题描述】:
目前正在将一个项目移植到 .Net 5,我一直在努力解决 MassTransit 和 Quartz 调度程序配置。
未安排的消息可以正常工作,但安排的消息却不行。
我能够在 Quartz DB 中看到计划的作业,但是在自动触发作业时出现以下错误:
Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
在这个项目中有
- 一个网络应用程序,应该发送消息和预定消息。
- 一个 Windows 服务 (BackgroundService),应该使用消息并发送消息(非预定消息)。
网站
在 startup.cs 我有:
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
services.AddSingleton(scheduler);
services.AddMassTransit(x =>
{
x.AddMessageScheduler(busSettings.QuartzQueueAddress);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
以这种方式发送非预定消息:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
await target.Send(message);
虽然预定的发送是这样的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);
分别通过 DI 注入以下内容: IBusControl _bus 用于非预定消息 IMessageScheduler _scheduler 用于预定消息。
Windows 服务
在 Program.cs 我有:
services.AddTransient<ProcessBuildReminderDeploySetConsumer>();
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
scheduler.Start();
services.AddSingleton(scheduler);
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
{
var partitioner = cfg.CreatePartitioner(16);
epc.Consumer(() =>
new ScheduleMessageConsumer(scheduler),
x => x.Message<ScheduleMessage>(
m => m.UsePartitioner(
partitioner,
p => p.Message.CorrelationId)
)
);
epc.Consumer(
() => new CancelScheduledMessageConsumer(scheduler),
x => x.Message<CancelScheduledMessage>(
m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
)
);
cfg.UseMessageScheduler(epc.InputAddress);
cfg.Durable = true;
// THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
// BUT THOSE CLASSES DO NOT EXIST ANYMORE
//var specification =
//new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
//cfg.AddBusFactorySpecification(specification);
});
cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
{
// Enforce loading only 1 message at a time for now
e.PrefetchCount = 1;
e.UseMessageRetry(r =>
{
r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
});
e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
使用直接发送到队列的消息没有问题。 关于定时消息,由于 Quartz 没有正确触发,因此无法消费。
我确信可能有一些配置问题导致了这个错误,但是在不同的配置方式之间+版本之间的变化我无法弄清楚。
【问题讨论】:
标签: .net rabbitmq masstransit quartz.net