We are developing a .Net Core service that shall be hosted in Azure Service Fabric. This SF Service needs to interact with 10,000 devices registered in Azure IoT Hub via it's AMQP 1.0 SSL TLS endpoints. Each IoT Hub devices has it's own security tokens and connection string provided by the IoT Hub service.
For our scenario we need to listen to all cloud-to-devices messages coming from the 10,000 IoT Hub device instances and "route" these to a central Service Bus topic to which the actual "gateways" in the field listen to. So basically we want to forward messages from 10,000 Service Bus Queues into one central Queue.
What is the best approach to handle these 10,000 AMQP listners from a SF Service? Is there a way we can reuse AMQP connections, sessions or links so we cache/share resources? And how can we dynamically spread the load of connection maintenance over the 5 nodes in the SF cluster?
We are evaluating these Nuget packages for the implementation:
Microsoft.Azure.ServiceBus
AMQPNetLite
Microsoft.Azure.Devices.Client
We are doing some tests using the Microsoft.Azure.Devices.Client
lib, see a simplified code sample below:
using System;
using System.Fabric;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.ServiceFabric.Services.Runtime;
namespace ID.Monitoring.MonServer.ServiceFabric.ServiceBus
{
/// <summary>
/// An instance of this class is created for each service instance by the Service Fabric runtime.
/// </summary>
internal sealed class ServiceBus : StatelessService
{
private readonly DeviceClient _deviceClient;
private ConnectionStatus _status;
public ServiceBus(StatelessServiceContext context)
: base(context)
{
_deviceClient = DeviceClient.CreateFromConnectionString("HostName=id-monitoring-dev.azure-devices.net;DeviceId=100;SharedAccessSignature=SharedAccessSignature sr=id-monitoring-dev.azure-devices.net%2Fdevices%2F100&sig={token}&se=1553265888", TransportType.Amqp_Tcp_Only);
}
/// <summary>
/// This is the main entry point for your service instance.
/// </summary>
/// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service instance.</param>
protected override async Task RunAsync(CancellationToken cancellationToken)
{
_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangeHandler);
while (true)
{
if (_status != ConnectionStatus.Connected)
{
await _deviceClient.OpenAsync();
}
var receivedMessage = await _deviceClient.ReceiveAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
if (receivedMessage != null)
{
var messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
//TODO: handle incoming message and publish to common
await _deviceClient.CompleteAsync(receivedMessage).ConfigureAwait(false);
}
}
}
private void ConnectionStatusChangeHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
_status = status;
}
}
}
Question: Does this scale well to 10,000 Service Fabric service instances? Or are there more efficient ways to have this many AMQP Service Bus Listners maintained from a Service Fabric Service environment? Is there a way we can apply AMQP connection multiplexing maybe?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…