MQTTnetを使って、PC上でMQTT通信を行います。Broker のネットワークアドレスはローカルアドレスとし、Publisherからtopic「tomosoft/test/topic」にメッセージ「abcdefghijk」を送信します。

MQTTのpublish/subscribeモデルを次に示します。

Brokerの作成

次の手順でBroker を作成します。

  1. 「ツール」メニューから「NuGetパッケージマネージャー」→「ソリューションのNuGetパッケージの管理」を選択します。
  2. 「MQTTnet」を選択してインストールします。
  3. 次のBrokerコード「MQTTBroker」を作成します。
  4. MQTTBroker

    using MQTTnet.Server;
    using MQTTnet;
    using System.Text;
    using static System.Console;
    
    // 預設 port 是 1883
    var options = new MqttServerOptionsBuilder()
        //Set endpoint to localhost
        .WithDefaultEndpoint();
    
    // Create a new mqtt server
    var server = new MqttFactory().CreateMqttServer(options.Build());
    //Add Interceptor for logging incoming messages
    server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
    
    WriteLine("Start MQTTnet Server ...");
    // Start the server
    await server.StartAsync();
    WriteLine("Press any key to stop Server ...");
    ReadKey();
    
    Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
    {
        // Convert Payload to string
        var payload = arg.ApplicationMessage?.Payload == null ? null : Encoding.UTF8.GetString(arg.ApplicationMessage?.Payload);
    
    
        WriteLine(
            " TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
    
            DateTime.Now,
            arg.ClientId,
            arg.ApplicationMessage?.Topic,
            payload,
            arg.ApplicationMessage?.QualityOfServiceLevel,
            arg.ApplicationMessage?.Retain);
        return Task.CompletedTask;
    }
    

Publisherの作成

次の手順でPublisherを作成します。

「MQTTnet.Extensions.ManagedClient 」を選択してインストールします。

次のPublisher コード「MQTTPublisher 」を作成します。

MQTTPublisher

using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet;
using System.Text.Json;
using static System.Console;

var _mqttClient = new MqttFactory().CreateManagedMqttClient();

var clientId = "Publisher";
var server = "192.168.10.105";
var topic = "tomosoft/test/topic";
var message = "abcdefghijk";
var builder = new MqttClientOptionsBuilder()
                .WithClientId(clientId)
                .WithTcpServer(server);
var options = new ManagedMqttClientOptionsBuilder()
                    .WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
                    .WithClientOptions(builder.Build())
                    .Build();
// Set up handlers
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync;

WriteLine($"{clientId} MQTT Broker ({server}) ....");
await _mqttClient.StartAsync(options);

while (true)
{
    var json = JsonSerializer.Serialize(new { message, sent = DateTime.UtcNow });
    await _mqttClient.EnqueueAsync(topic, json);

    // Wait until the queue is fully processed.
    SpinWait.SpinUntil(() => _mqttClient.PendingApplicationMessagesCount == 0, 10000);
    WriteLine($"Pending messages = {_mqttClient.PendingApplicationMessagesCount}");

    WriteLine("Send next data");
    var dummy = ReadLine();

}
Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
    WriteLine("Connected");
    return Task.CompletedTask;
};
Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
    WriteLine("Disconnected");
    return Task.CompletedTask;
};
Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg)
{
    WriteLine("Connection failed check network or broker!");
    return Task.CompletedTask;
}

Subscriber の作成

次の手順でSubscriber を作成します。

「MQTTnet.Extensions.ManagedClient 」を選択してインストールし、次のSubscriber コード「MQTTSubscriber 」を作成します。

MQTTSubscriber

using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet;
using System.Text.Json;
using static System.Console;
using System.Text;
using MQTTnet.Packets;

var _mqttClient = new MqttFactory().CreateManagedMqttClient();

var clientId = "Subscriber";
var server = "192.168.10.105";
var builder = new MqttClientOptionsBuilder()
                .WithClientId(clientId)
                .WithTcpServer(server);

var options = new ManagedMqttClientOptionsBuilder()
                    .WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
                    .WithClientOptions(builder.Build())
                    .Build();
// Set up handlers
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync;
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync;
_mqttClient.ConnectingFailedAsync += _mqttClient_ConnectingFailedAsync;
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;

var topicFilters = new List<MqttTopicFilter> { new MqttTopicFilter { Topic = "tomosoft/test/#" } };
await _mqttClient.SubscribeAsync(topicFilters);

WriteLine($"{clientId} MQTT Broker ({server}) ....");
await _mqttClient.StartAsync(options);
WriteLine("Press any key to stop receive message ...");
ReadKey();

Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
    WriteLine("Connected");
    return Task.CompletedTask;
};
Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
    WriteLine("Disconnected");
    return Task.CompletedTask;
};
Task _mqttClient_ConnectingFailedAsync(ConnectingFailedEventArgs arg)
{
    WriteLine("Connection failed check network or broker!");
    return Task.CompletedTask;
}

Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
    var topic = arg?.ApplicationMessage?.Topic;
    var payloadText = Encoding.UTF8.GetString(
            arg?.ApplicationMessage?.Payload ?? Array.Empty<byte>());

    WriteLine($"Received: Topic:{topic}, Payload:{payloadText}");
    return Task.CompletedTask;
}

MQTT通信の実行

  1. Brokerの実行
  2. 作成したBrokerを実行します。

  3. Publisherの実行
  4. 作成したPublisherを実行すると、Brokerに受信したメッセージが表示されます。「Enter」キーを押すごとに、topic「tomosoft/test/topic」にメッセージ「abcdefghijk」を送信します。

  5. Subscriberの実行
  6. 作成したSubscriberを実行すると、Publisherから送信されたメッセージが受信されます。