MQTTnetを使って、PC上でMQTT通信を行います。Broker のネットワークアドレスはローカルアドレスとし、Publisherからtopic「tomosoft/test/topic」にメッセージ「abcdefghijk」を送信します。
MQTTのpublish/subscribeモデルを次に示します。
Brokerの作成
次の手順でBroker を作成します。
- 「ツール」メニューから「NuGetパッケージマネージャー」→「ソリューションのNuGetパッケージの管理」を選択します。
- 「MQTTnet」を選択してインストールします。
- 次のBrokerコード「MQTTBroker」を作成します。
MQTTBroker
using MQTTnet.Server; using MQTTnet; using System.Text; using static System.Console; // 預設 port 是 1883 var options = new MqttServerOptionsBuilder() //Set endpoint to localhost .WithDefaultEndpoint(); // Create a new mqtt server var server = new MqttFactory().CreateMqttServer(options.Build()); //Add Interceptor for logging incoming messages server.InterceptingPublishAsync += Server_InterceptingPublishAsync; WriteLine("Start MQTTnet Server ..."); // Start the server await server.StartAsync(); WriteLine("Press any key to stop Server ..."); ReadKey(); Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { // Convert Payload to string var payload = arg.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload); WriteLine( " TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}", DateTime.Now, arg.ClientId, arg.ApplicationMessage?.Topic, payload, arg.ApplicationMessage?.QualityOfServiceLevel, arg.ApplicationMessage?.Retain); return Task.CompletedTask; }
Publisherの作成
次の手順でPublisherを作成します。
「MQTTnet.Extensions.ManagedClient 」を選択してインストールします。
次のPublisher コード「MQTTPublisher 」を作成します。
MQTTPublisher
using MQTTnet.Client; using MQTTnet.Extensions.ManagedClient; using MQTTnet; using System.Text.Json; using static System.Console; var _mqttClient = new MqttFactory().CreateManagedMqttClient(); var clientId = "Publisher"; var server = "192.168.10.105"; var topic = "tomosoft/test/topic"; var message = "abcdefghijk"; var builder = new MqttClientOptionsBuilder() .WithClientId(clientId) .WithTcpServer(server); var options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(60)) .WithClientOptions(builder.Build()) .Build(); // Set up handlers _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; _mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync; WriteLine($"{clientId} MQTT Broker ({server}) ...."); await _mqttClient.StartAsync(options); while (true) { var json = JsonSerializer.Serialize(new { message, sent = DateTime.UtcNow }); await _mqttClient.EnqueueAsync(topic, json); // Wait until the queue is fully processed. SpinWait.SpinUntil(() => _mqttClient.PendingApplicationMessagesCount == 0, 10000); WriteLine($"Pending messages = {_mqttClient.PendingApplicationMessagesCount}"); WriteLine("Send next data"); var dummy = ReadLine(); } Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { WriteLine("Connected"); return Task.CompletedTask; }; Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { WriteLine("Disconnected"); return Task.CompletedTask; }; Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg) { WriteLine("Connection failed check network or broker!"); return Task.CompletedTask; }
Subscriber の作成
次の手順でSubscriber を作成します。
「MQTTnet.Extensions.ManagedClient 」を選択してインストールし、次のSubscriber コード「MQTTSubscriber 」を作成します。
MQTTSubscriber
using MQTTnet.Client; using MQTTnet.Extensions.ManagedClient; using MQTTnet; using System.Text.Json; using static System.Console; using System.Text; using MQTTnet.Packets; var _mqttClient = new MqttFactory().CreateManagedMqttClient(); var clientId = "Subscriber"; var server = "192.168.10.105"; var builder = new MqttClientOptionsBuilder() .WithClientId(clientId) .WithTcpServer(server); var options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(60)) .WithClientOptions(builder.Build()) .Build(); // Set up handlers _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; _mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync; _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; var topicFilters = new List<MqttTopicFilter> { new MqttTopicFilter { Topic = "tomosoft/test/#" } }; await _mqttClient.SubscribeAsync(topicFilters); WriteLine($"{clientId} MQTT Broker ({server}) ...."); await _mqttClient.StartAsync(options); WriteLine("Press any key to stop receive message ..."); ReadKey(); Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { WriteLine("Connected"); return Task.CompletedTask; }; Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { WriteLine("Disconnected"); return Task.CompletedTask; }; Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg) { WriteLine("Connection failed check network or broker!"); return Task.CompletedTask; } Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { var topic = arg?.ApplicationMessage?.Topic; var payloadText = Encoding.UTF8.GetString( arg?.ApplicationMessage?.Payload ?? Array.Empty<byte>()); WriteLine($"Received: Topic:{topic}, Payload:{payloadText}"); return Task.CompletedTask; }