mqttnet消息推送与接收[通俗易懂]

mqttnet消息推送与接收[通俗易懂]创建 windows 服务网上有很多 不多述 服务端做好后一定要写 bat 安装卸载文件 install bat echo 请稍等 MqttNetServi 服务安装启动中 echo off title 安装 windows 服务 MqttNetServi sc create

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中…………
@echo off
@title 安装windows服务:MqttNetServiceAddUserAndPassword
@sc create MqttNetServiceAddUserAndPassword binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe”
@sc config MqttNetServiceAddUserAndPassword start= auto
@sc start MqttNetServiceAddUserAndPassword
@echo.MqttNetServiceAddUserAndPassword启动完毕
pause

//binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe” 当前路径,也可指定

delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中……….
@echo off
@sc stop MqttNetServiceAddUserAndPassword
@sc delete MqttNetServiceAddUserAndPassword
@echo off
@echo.MqttNetServiceAddUserAndPassword卸载完毕
@pause

服务端:

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;

namespace MqttNetServiceAddUserAndPassword
{

public partial class Service1 : ServiceBase
{

private readonly static object locker = new object();
private MqttServer mqttServer = null;
private System.Timers.Timer timer = null;

private GodSharp.Sockets.SocketServer socketService = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零;
private List subClientIDs = new List();
public Service1()
{

InitializeComponent();
//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
timer = new System.Timers.Timer();
timer.AutoReset = true;
timer.Enabled = true;
timer.Interval = 5000;
timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)
{

//开启服务
//CreateMQTTServer();

Task.Run(CreateMQTTServer);

if (timer.Enabled == false)
{

timer.Enabled = true;
timer.Start();
}
//创建socket服务端
//CreateServerSocket();
// SocketServer.StartSocketService();
}

protected override void OnStop()
{

if (timer.Enabled == true)
{

timer.Enabled = false;
timer.Stop();
}
}
///


/// 开启服务
///

private async Task CreateMQTTServer()
{

if (mqttServer == null)
{

var optionsBuilder = new MqttServerOptionsBuilder();
optionsBuilder.WithConnectionValidator(c =>
{

if (c.ClientId.Length < 5 || !c.ClientId.StartsWith(“Eohi_”))
{

c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
return;
}

if (c.Username != “user” || c.Password != “123456”)
{

c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
return;
}
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
});
//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。
//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))
//指定端口
optionsBuilder.WithDefaultEndpointPort(1884);
//连接记录数,默认 一般为2000
//optionsBuilder.WithConnectionBacklog(2000);
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
string msg = null;

//将发送的消息加到日志
mqttServer.ApplicationMessageReceived += (s, e) =>
{

msg = @”发送消息的客户端id:” + e.ClientId + “\r\n”
+ “发送时间:” + DateTime.Now + “\r\n”
+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\r\n”
+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\r\n”
+ “————————————————–\r\n”
;
WriteMsgLog(msg);
};
await mqttServer.StartAsync(optionsBuilder.Build());

}
}
#region 记录日志
///


/// 消息记录日志
///

///
private void WriteMsgLog(string msg)
{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{

FileStream fs;
fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{

using (StreamWriter sw = new StreamWriter(fs))
{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);
}
}
}
private void PubMessage(string topic, string msg)
{

if (mqttServer != null)
{

lock (locker)
{

var message = new MqttApplicationMessageBuilder();
message.WithTopic(topic);
message.WithPayload(msg);
mqttServer.PublishAsync(message.Build());
}
}
}
///


///客户端链接日志 客户端接入
///

///
private void WriteClientLinkLog(string msg)
{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{

FileStream fs;
fs = File.Create(path);
fs.Close();
}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{

using (StreamWriter sw = new StreamWriter(fs))
{

sw.WriteLine(msg);
}
}
}
///


/// 通过定时器将客户端链接信息写入日志
///

///
///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)


{



// List

dic = new List();
if (mqttServer != null)
{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();
if (subclients.Count > 0)
{

string subclientcount = @”客户端接入的总数为:” + (subclients.Count – 1).ToString() + “\r\n”
+ “——————————————————- \r\n”;
WriteClientLinkLog(subclientcount);
PubMessage(“ClientsCount”, (subclients.Count – 1).ToString());
List clientids = new List();
//连接客户端的个数
// dic.Add(SetServiceM.SetService( “ClientCount”, subclients.Count.ToString()));
// var dicclientlink = new Dictionary();

foreach (var item in subclients)
{

if (!subClientIDs.Contains(item.ClientId))
{

subClientIDs.Add(item.ClientId);
string msg = @”连接客户端ID:” + item.ClientId + “\r\n”
+ “连接时间:” + DateTime.Now + “\r\n”
+ “协议版本:” + item.ProtocolVersion + “\r\n”
+ “最后收到的非保持活包:” + item.LastNonKeepAlivePacketReceived + “\r\n”
+ “最后收到的包:” + item.LastPacketReceived + “\r\n”
+ “挂起的应用程序消息:” + item.PendingApplicationMessages + “\r\n”
+ “————————————————” + “\r\n”;
WriteClientLinkLog(msg);
PubMessage(“clientlink”, msg);
// mqttServer.PublishAsync(“clientlink”, msg);
// dicclientlink.Add(item.ClientId, msg);
}
clientids.Add(item.ClientId);
}
if (subClientIDs.Count >= 2000)
{

subClientIDs.Clear();
}
var exceptlist = subClientIDs.Except(clientids).ToList();
// var dicclientoutline = new Dictionary();
if (exceptlist.Count > 0)
{

exceptlist.ForEach(u =>
{

string msgoutline = @”客户端下线ID:” + u + “\r\n”
+ “客户端下线时间:” + DateTime.Now.ToString() + “\r\n”
+ “———————————————————— \r\n”
;
WriteClientLinkLog(msgoutline);
subClientIDs.Remove(u);
PubMessage(“clientlink”, msgoutline);
// mqttServer.PublishAsync(“clientlink”, msgoutline);
// dicclientoutline.Add(“OutLineID_” + u, msgoutline);
});
}
连接客户端的id
//dic.Add(SetServiceM.SetService(“clientlink”, JsonConvert.SerializeObject(dicclientlink)));
客户端下线的时间
//dic.Add(SetServiceM.SetService(“clientoutline”, JsonConvert.SerializeObject(dicclientoutline)));
//SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic)));
}
else
{

string subclientcount = @”暂无客户端接入!” + “\r\n”
+ “——————————————————– \r\n”;
WriteClientLinkLog(subclientcount);
}
}
}
///


/// 客户端下线时间
///

///
public void WriteClientOutLineLog(string msg)
{

string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientOutLineLog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{

FileStream fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{

using (StreamWriter sw = new StreamWriter(fs))
{

sw.WriteLine(msg);
}
}
}
//windows服务里的服务端
private void CreateServerSocket()
{

if (socketService == null)
{

// IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(“127.0.0.1”), 9001);
socketService = new GodSharp.Sockets.SocketServer(“127.0.0.1”, 9001, ProtocolType.Tcp); //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socketService.Start();
socketService.Listen(10);
Thread thread = new Thread(new ThreadStart(new Action(() =>
{

while (true)
{

// socketClient = socketService.Clients[0];
// string data = “sql|” ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result}
// client.Send(Encoding.Default.GetBytes(msg));
}
})));
}
else
{

CreateServerSocket();
}
}

#endregion
}

}

服务端桌面显示程序:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace MQTTNetFrm
{

public partial class Form1 : Form
{

private ServiceController ServiceController = null;
private MqttClientOptions options = null;
private MqttClient mqttClient = null;

public Form1()
{

InitializeComponent();


new Thread(new ThreadStart(GetServiceStatus)).Start();

Task.Run(LinkClientService).Wait();
}
///


/// 获取当前ip地址
///

///
private string GetLocalIP()
{

string ip = null;
var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList();
iplist.ForEach(u =>
{

if (u.AddressFamily == AddressFamily.InterNetwork)
ip= u.ToString();
});
return ip;
}
private async Task LinkClientService()
{

var m = “Eohi_Frm_” + Guid.NewGuid().ToString();
options = new MqttClientOptions
{

ClientId = m,
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{

Server = GetLocalIP(),
Port = 1884,
},
Credentials = new MqttClientCredentials()
{

Username = “user”,
Password = “123456”
}

};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
try
{

await mqttClient.ConnectAsync(options);
but_submsg_Click();
this.Invoke(new Action(() => { lab_serverstatus.Text = “连接正常,服务运行中…………”; }));
}
catch (Exception ex)
{

}

}
private async void but_submsg_Click()
{

if (mqttClient != null)
{

await mqttClient.SubscribeAsync(new TopicFilter(“ClientsCount”, MqttQualityOfServiceLevel.AtMostOnce));
await mqttClient.SubscribeAsync(new TopicFilter(“clientlink”, MqttQualityOfServiceLevel.AtMostOnce));
await mqttClient.SubscribeAsync(new TopicFilter(“msglog”, MqttQualityOfServiceLevel.AtMostOnce));
mqttClient.ApplicationMessageReceived += (s, e) =>
{

this.Invoke(new Action(() =>
{

var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
if (msg.Length<=5)
{

lab_clientcount.Text = msg;
}
if (msg.Length>10)
{

if (msg.StartsWith(“连接”) )
rtb_clientlog.AppendText(msg);
rtb_msglog.AppendText(msg);
}

}));
};

}
}
private void GetServiceStatus()
{

ServiceController[] serviceControllers = ServiceController.GetServices();
if (serviceControllers.Length > 0)
{

serviceControllers.ToList().ForEach(u =>
{

if (u.DisplayName == “MqttNetServiceAddUserAndPassword”)
{

if (ServiceController == null)
{

ServiceController = u;
}
if (u.Status == ServiceControllerStatus.Running)
{

lab_serverstatus.Text = “服务运行中…………”;
}
else
{

lab_serverstatus.Text = “服务已停止…………”;
}
}
});
}
}
private void button2_Click(object sender, EventArgs e)
{

if (tabControl1.SelectedTab == tabPage1)
{

rtb_clientlog.Text = “”;
}
else
{

rtb_msglog.Text = “”;
}
}

private void Form1_Load(object sender, EventArgs e)
{

}
}

}

客户端:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Windows.Forms;

namespace MqttClientTest01
{

public partial class Form1 : Form
{

private MqttClient mqttClient = null;
private System.Timers.Timer timer = null;
private int CountLink = 0;
private MqttClientOptions options = null;
public Form1()
{

InitializeComponent();
创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
//timer = new System.Timers.Timer();
//timer.AutoReset = true;
//timer.Interval = 1000;
//timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService);
}

private void LinkMqttNetService(object sender, ElapsedEventArgs e)
{

if (mqttClient == null)
{

// RunAsync();
CountLink++;
}
if (CountLink >= 5)
{

MessageBox.Show(“连接多次失败,请确认各参数是否正确!”);
CountLink = 0;
timer.Enabled = false;
}
}
private void but_linkserver_Click(object sender, EventArgs k)
{

LinkClientService();
//CountLink = 0;
//timer.Enabled = true;
//timer.Start();
}
///


/// 链接客户端
///

public async void LinkClientService()
{

var m = “Eohi_” + Guid.NewGuid().ToString();
options = new MqttClientOptions
{

ClientId = m,
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{

Server = txtb_serverip.Text.Trim(),
Port = Convert.ToInt32(txtb_serverport.Text.Trim()),
},
Credentials = new MqttClientCredentials()
{

Username = tb_username.Text,
Password = tb_userpwd.Text
}

};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
try
{

await mqttClient.ConnectAsync(options);
this.Invoke(new Action(() =>
{

lab_linkstatus.Text = “连接成功!”;
lab_linktimer.Text = DateTime.Now.ToString();
}));
mqttClient.Disconnected += async (s, e) =>
{

if (e.ClientWasConnected==false)
{

try
{

await mqttClient.ConnectAsync(options);
this.Invoke(new Action(() =>
{

lab_linkstatus.Text = “连接成功!”;
lab_linktimer.Text = DateTime.Now.ToString();
}));
}
catch (Exception ex)
{

lab_linkstatus.Text = “连接失败!”+ex.Message;
lab_linktimer.Text = DateTime.Now.ToString();
}

}
};
}
catch (Exception ex)
{

lab_linkstatus.Text = “连接失败!请检查ip/端口” ;
lab_linktimer.Text = DateTime.Now.ToString();
}

}

private void tb_username_TextChanged(object sender, EventArgs e)
{

}

private void but_clientsend_Click(object sender, EventArgs e) { if (mqttClient != null) { var message = new MqttApplicationMessageBuilder(); message.WithTopic(txtb_msgtopic.Text.Trim()); message.WithPayload(rtb_pubmsg.Text.Trim()); message.WithExactlyOnceQoS(); message.WithRetainFlag(); mqttClient.PublishAsync(message.Build()); } } private async void but_submsg_Click(object sender, EventArgs k) { if (mqttClient != null) { await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce)); mqttClient.ApplicationMessageReceived += (s, e) => { this.Invoke(new Action(() => { rtb_submsgclient.AppendText(“ClientID=” + e.ClientId + “\n”); rtb_submsgclient.AppendText(”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}” + “\n”); rtb_submsgclient.AppendText(”+ Retain = {e.ApplicationMessage.Retain}” + “\n”);

}));

};

}
}

private void button1_Click(object sender, EventArgs e)
{

rtb_submsgclient.Text = “”;
}
}
}

编程小号
上一篇 2025-07-25 22:11
下一篇 2025-09-08 12:30

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/hz/119712.html