MQTTnetを使って、PC上でMQTT通信を行います。Broker のネットワークアドレスはローカルアドレスとし、Publisherからtopic「tomosoft/test/topic」にメッセージ「abcdefghijk」を送信します。
MQTTのpublish/subscribeモデルを次に示します。
Brokerの作成
次の手順でBroker を作成します。
- 「ツール」メニューから「NuGetパッケージマネージャー」→「ソリューションのNuGetパッケージの管理」を選択します。
- 「MQTTnet」を選択してインストールします。
- 次のBrokerコード「MQTTBroker」を作成します。
MQTTBroker
using MQTTnet.Server;
using MQTTnet;
using System.Text;
using static System.Console;
// 預設 port 是 1883
var options = new MqttServerOptionsBuilder()
//Set endpoint to localhost
.WithDefaultEndpoint();
// Create a new mqtt server
var server = new MqttFactory().CreateMqttServer(options.Build());
//Add Interceptor for logging incoming messages
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
WriteLine("Start MQTTnet Server ...");
// Start the server
await server.StartAsync();
WriteLine("Press any key to stop Server ...");
ReadKey();
Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
// Convert Payload to string
var payload = arg.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload);
WriteLine(
" TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
DateTime.Now,
arg.ClientId,
arg.ApplicationMessage?.Topic,
payload,
arg.ApplicationMessage?.QualityOfServiceLevel,
arg.ApplicationMessage?.Retain);
return Task.CompletedTask;
}
Publisherの作成
次の手順でPublisherを作成します。
「MQTTnet.Extensions.ManagedClient 」を選択してインストールします。
次のPublisher コード「MQTTPublisher 」を作成します。
MQTTPublisher
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet;
using System.Text.Json;
using static System.Console;
var _mqttClient = new MqttFactory().CreateManagedMqttClient();
var clientId = "Publisher";
var server = "192.168.10.105";
var topic = "tomosoft/test/topic";
var message = "abcdefghijk";
var builder = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server);
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
// Set up handlers
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync;
WriteLine($"{clientId} MQTT Broker ({server}) ....");
await _mqttClient.StartAsync(options);
while (true)
{
var json = JsonSerializer.Serialize(new { message, sent = DateTime.UtcNow });
await _mqttClient.EnqueueAsync(topic, json);
// Wait until the queue is fully processed.
SpinWait.SpinUntil(() => _mqttClient.PendingApplicationMessagesCount == 0, 10000);
WriteLine($"Pending messages = {_mqttClient.PendingApplicationMessagesCount}");
WriteLine("Send next data");
var dummy = ReadLine();
}
Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
WriteLine("Connected");
return Task.CompletedTask;
};
Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
WriteLine("Disconnected");
return Task.CompletedTask;
};
Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg)
{
WriteLine("Connection failed check network or broker!");
return Task.CompletedTask;
}
Subscriber の作成
次の手順でSubscriber を作成します。
「MQTTnet.Extensions.ManagedClient 」を選択してインストールし、次のSubscriber コード「MQTTSubscriber 」を作成します。
MQTTSubscriber
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet;
using System.Text.Json;
using static System.Console;
using System.Text;
using MQTTnet.Packets;
var _mqttClient = new MqttFactory().CreateManagedMqttClient();
var clientId = "Subscriber";
var server = "192.168.10.105";
var builder = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server);
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
.WithClientOptions(builder.Build())
.Build();
// Set up handlers
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync;
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
var topicFilters = new List<MqttTopicFilter> { new MqttTopicFilter { Topic = "tomosoft/test/#" } };
await _mqttClient.SubscribeAsync(topicFilters);
WriteLine($"{clientId} MQTT Broker ({server}) ....");
await _mqttClient.StartAsync(options);
WriteLine("Press any key to stop receive message ...");
ReadKey();
Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
WriteLine("Connected");
return Task.CompletedTask;
};
Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
WriteLine("Disconnected");
return Task.CompletedTask;
};
Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg)
{
WriteLine("Connection failed check network or broker!");
return Task.CompletedTask;
}
Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var topic = arg?.ApplicationMessage?.Topic;
var payloadText = Encoding.UTF8.GetString(
arg?.ApplicationMessage?.Payload ?? Array.Empty<byte>());
WriteLine($"Received: Topic:{topic}, Payload:{payloadText}");
return Task.CompletedTask;
}






