一,、分布式消息總線在很多MIS項目之中都有這樣的需求,,需要一個及時、高效的的通知機(jī)制,,即比如當(dāng)使用者A完成了任務(wù)X,,就需要立即告知使用者B任務(wù)X已經(jīng)完成,在通常的 情況下,,開發(fā)人中都是在使用者B所使用的程序之中寫數(shù)據(jù)庫輪循代碼,,這樣就會產(chǎn)品一個很嚴(yán)重的兩個問題,第一個問題是延遲,,輪循機(jī)制要定時執(zhí)行,,必須會引 起延遲,第二個問題是數(shù)據(jù)庫壓力過大,,當(dāng)進(jìn)行高頻度的輪循會生產(chǎn)大量的數(shù)據(jù)庫查詢,,并且如果有大量的使用者進(jìn)行輪循,那數(shù)據(jù)庫的壓力就更大了,。 那么在這個時間,,就需要一套能支持發(fā)布-訂閱模式的分布式消息總線,那這個問題就可以很好的解決了,,比如采用一些成熟的消息總線進(jìn)行實現(xiàn),,比如MSMQ或 者采用比如開源的NServiceBus的發(fā)布訂閱機(jī)制就可以實現(xiàn)處理這種需求,其系統(tǒng)結(jié)構(gòu)就會變成如下所示: 本分布式消息總線,,目前廣泛的被應(yīng)用于分布式緩存的更新通知,,當(dāng)在N百臺客戶短在使用緩存的過程之中,,某個操作修改了緩存的數(shù)據(jù),必須會導(dǎo)致其他終端緩存 的失效,,那么使用基于Socket的分布式消息總線之后,,我們可以做了修改了即可實時通知,做到緩存數(shù)據(jù)保持最新,,再比如醫(yī)療應(yīng)用之中的危機(jī)機(jī)管理,,當(dāng)發(fā) 現(xiàn)檢驗、檢查危急值之后,,需要及時通知病人做出相應(yīng)的措施,,啟動聲光報警系統(tǒng)等,再比如應(yīng)用于異構(gòu)系統(tǒng)整合,,當(dāng)檢驗系統(tǒng)做出檢驗報告,,通過消息總線進(jìn)行發(fā) 布,HIS系統(tǒng)則即時會收到檢驗報告數(shù)據(jù)而實現(xiàn)系統(tǒng)的整合,。 二,、基于Socket的實現(xiàn)目前能夠?qū)崿F(xiàn)發(fā)布訂閱模式的開源產(chǎn)品非常之多,為什么還要制造輪子呢,,其主要原因有以下幾點 1)像NServiceBus這種東西基于MSMQ,,在大量的發(fā)布者-訂閱者的情況下性能不佳。 2)此類東西太過于龐大,、不易使用和配置,。 3)學(xué)習(xí)成本過高。 那為什么要使用Socket技術(shù)進(jìn)行實現(xiàn)呢,,其主要原因是有以下幾點: 1)使用高效的Socket通信技術(shù),,高效、支持更多的客戶端,。 2)使用簡單,,不需要定義太多額外的東西,只需要定義主題和消息即可使用,。 目前本發(fā)布訂閱框架是基于AgileEAS.NET SOA中間件平臺Socket框架實現(xiàn)的,有關(guān)于些Socket框架的技術(shù)細(xì)節(jié)請參考AgileEAS.NET SOA 中間件平臺.Net Socket通信框架-介紹,、AgileEAS.NET SOA 中間件平臺.Net Socket通信框架-簡單例子-實現(xiàn)簡單的服務(wù)端客戶端消息應(yīng)答,、AgileEAS.NET SOA 中間件平臺.Net Socket通信框架-完整應(yīng)用例子-在線聊天室系統(tǒng)-下載配置、AgileEAS.NET SOA 中間件平臺.Net Socket通信框架-完整應(yīng)用例子-在線聊天室系統(tǒng)-代碼解析文章進(jìn)行了解和學(xué)習(xí),。 目前本發(fā)布訂閱框架并直接集成于AgileEAS.NET SOA Socket通信框架之中并且隨其一并發(fā)布,,下面簡單介紹一下其API: 在本框架之中定義了一個消息總線接口IMessageBus: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5: using System.Collections; 6:
7: namespace EAS.Messages 8: {
9: /// <summary> 10: /// 消息總線接口定義。 11: /// </summary> 12: public interface IMessageBus : IDisposable 13: {
14: /// <summary> 15: /// 注冊發(fā)布者,。 16: /// </summary> 17: /// <param name="publisher">發(fā)布者,。</param> 18: void AddPublisher(string publisher); 19:
20: /// <summary> 21: /// 注冊發(fā)布者,。 22: /// </summary> 23: /// <param name="publisher">發(fā)布者。</param> 24: /// <param name="topic">主題,。</param> 25: void AddPublisher(string publisher, string topic); 26:
27: /// <summary> 28: /// 發(fā)布一條消息到總線,。 29: /// </summary> 30: /// <param name="topic">主題。</param> 31: /// <param name="message">發(fā)布的消息,。</param> 32: void Publish(string topic, object message); 33:
34: /// <summary> 35: /// 訂閱消息,。 36: /// </summary> 37: /// <param name="subscriber">訂閱者。</param> 38: /// <param name="topic">主題,。</param> 39: /// <param name="notifyHandler">訂閱通知,。</param> 40: void Subscribe(object subscriber, string topic, MessageNotifyHandler notifyHandler); 41:
42: /// <summary> 43: /// 訂閱消息。 44: /// </summary> 45: /// <param name="subscriber">訂閱者,。</param> 46: /// <param name="friendName">訂閱者名稱,,用于處理離線訂閱。</param> 47: /// <param name="topic">主題,。</param> 48: /// <param name="notifyHandler">訂閱通知,。</param> 49: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler); 50:
51: /// <summary> 52: /// 訂閱消息。 53: /// </summary> 54: /// <param name="subscriber">訂閱者,。</param> 55: /// <param name="friendName">訂閱者名稱,,用于處理離線訂閱。</param> 56: /// <param name="topic">主題,。</param> 57: /// <param name="notifyHandler">訂閱通知,。</param> 58: /// <param name="changedHandler">發(fā)布者狀態(tài)變化委托。</param> 59: void Subscribe(object subscriber, string friendName, string topic, MessageNotifyHandler notifyHandler,PublisherSstatusChangedHandler changedHandler); 60:
61: /// <summary> 62: /// 退訂消息,。 63: /// </summary> 64: /// <param name="subscriber">訂閱者,。</param> 65: void Unsubscribe(object subscriber); 66:
67: /// <summary> 68: /// 退訂消息。 69: /// </summary> 70: /// <param name="subscriber">訂閱者,。</param> 71: /// <param name="topic">主題,。</param> 72: void Unsubscribe(object subscriber, string topic); 73:
74: /// <summary> 75: /// 退訂消息。 76: /// </summary> 77: /// <param name="subscriber">訂閱者,。</param> 78: /// <param name="friendName">訂閱者名稱,,用于處理離線訂閱。</param> 79: /// <param name="topic">主題,。</param> 80: void Unsubscribe(object subscriber, string friendName, string topic); 81: }
82: }
IMessageBus就基于Socket發(fā)布訂閱消息總線的靈魂接口,,也是基唯一的發(fā)布者調(diào)用者功能入口,也就是說不管你是發(fā)布者還是訂閱者都需要調(diào)用 這個接口,,如果你是發(fā)布者請調(diào)用IMessageBus接口的Publish方法向消息總線發(fā)布消息,,如果是你訂閱者請通過IMessageBus的訂閱 方法進(jìn)行訂閱,當(dāng)你訂閱了某個主題之后,,有發(fā)布者發(fā)布該主題的消息,,你即可以收到消息并調(diào)用訂閱回調(diào)函數(shù)進(jìn)行處理,。 三、實現(xiàn)一個簡單的例子現(xiàn)在我們開始一個簡單的應(yīng)用消息總線的例子,,本例子代碼解決方案由下圖4個項目組成: 其中:Demo.Messages項目定義發(fā)布者,、訂閱者所使用的消息對象和消息主題。 Demo.Publisher項目為發(fā)布者代碼,。 Demo.Subscriber項目為訂閱者代碼,。 Demo.Server項目為服務(wù)端代碼。 在Demo.Messages項目之中,,我們定義了一個消息Message: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5: using System.Xml.Serialization; 6:
7: namespace Demo.Messages 8: {
9: [Serializable]
10: public class Message 11: {
12: [XmlElement]
13: public Guid ID 14: {
15: get;
16: set;
17: }
18: }
19: }
消息Message很簡單,,只有一個屬性ID,同時 還需要定義一個消息主題: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5:
6: namespace Demo.Messages 7: {
8: public class Topics 9: {
10: public static readonly string DEMO_TOPIC = "演示消息"; 11: }
12: }
我們定義了一個消息主題為“演示消息”,。 在Demo.Publisher項目之中,,沒有太多額外的代碼,只有在Program.cs寫了以下簡單的調(diào)用代碼: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5: using EAS.Messages; 6:
7: namespace Demo.Publisher 8: {
9: class Program 10: {
11: static void Main(string[] args) 12: {
13: var container = EAS.Objects.ContainerBuilder.BuilderDefault();
14: var bus = container.GetComponentInstance("MessageBus") as IMessageBus; 15: System.Console.WriteLine("Publisher"); 16:
17: while (System.Console.ReadLine()!="exit") 18: {
19: var m = new Messages.Message { ID = Guid.NewGuid() }; 20: bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);
21: System.Console.WriteLine(string.Format("Publish:{0}", m.ID)); 22: }
23: }
24: }
25: }
從IOC容器獲取一個消息總線IMessageBus對象,,并調(diào)用Publish函數(shù)發(fā)布消息”bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);“,。 當(dāng)然了,使用了IOC容器,,就離來開配置文件了,,其App.config文件內(nèi)容如下: 1: <?xml version="1.0" encoding="utf-8"?> 2: <configuration>
3: <configSections>
4: <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" /> 5: </configSections>
6: <eas>
7: <objects>
8: <!--消息總線-->
9: <object name="MessageBus" assembly="EAS.MicroKernel" type="EAS.Sockets.Bus.SocketBus" LifestyleType="Singleton"> 10: <property name="Url" type="string" value="socket.tcp://127.0.0.1:6606/"/> 11: </object> 12: </objects>
13: </eas>
14: </configuration>
在Demo.Subscriber項目之中,使用與Demo.Publisher一模一樣的配置文件,,其Program.cs代碼如下: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5: using EAS.Messages; 6:
7: namespace Demo.Subscriber 8: {
9: class Program 10: {
11: static void Main(string[] args) 12: {
13: var container = EAS.Objects.ContainerBuilder.BuilderDefault();
14: var bus = container.GetComponentInstance("MessageBus") as IMessageBus; 15: System.Console.WriteLine("Subscriber"); 16:
17: bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify); 18: System.Console.ReadLine();
19: }
20:
21: static void MessageNotify(object m) 22: {
23: Demo.Messages.Message message = m as Demo.Messages.Message; 24: System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID)); 25: }
26: }
27: }
其中代碼bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);:完成把消息訂閱到MessageNotify通知函數(shù)之中,。 在Demo.Server項目之中,啟動服務(wù)器并且開始接收訂閱和發(fā)布: 1: using System; 2: using System.Collections.Generic; 3: using System.Linq; 4: using System.Text; 5: using EAS.Sockets; 6:
7: namespace Demo.Server 8: {
9: class Program 10: {
11: static void Main(string[] args) 12: {
13: SocketServer socketServer = new SocketServer(128); 14: socketServer.Port = 6606;
15: socketServer.Logger = new EAS.Loggers.ConsoleLogger(); 16: socketServer.Initialize();
17: System.Console.WriteLine("Server Starting..."); 18: socketServer.StartServer();
19: System.Console.WriteLine("Server Startup!"); 20: System.Console.ReadLine();
21: }
22: }
23: }
到此為止,,所有代碼均已完成,,是不是很簡單,接下來,,我們跑起來驗證一下效果,。 四、驗證效果我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,,再啟動兩個Demo.Subscriber.exe,再啟動一個Demo.Publisher.exe,,在Demo.Publisher.exe控制臺按回車鍵: OK,搞定,。 五,、源代碼下載本程序的源代碼已上傳到服務(wù)器,請通過http://42.121.30.77/downloads/eas/Demo.Pub_Sub.rar進(jìn)行下載,,如果在開發(fā)過程之中想要了解更多有關(guān)Socket通信框架以及更多AgileEAS.NET SOA中間件平臺的技術(shù)資源,請通過AgileEAS.NET SOA 網(wǎng)站:http://www.的最新下載欄目進(jìn)行下載,。 六,、問題反饋麻煩大家在通過視頻進(jìn)行學(xué)習(xí)的時候能及時把問題反饋給樓主,,或者有什么需要改進(jìn)的一些建議都請向樓主直接反饋,以下是聯(lián)系方式: AgileEAS.NET SOA 網(wǎng)站:http://www. 官方博客:http://eastjade.cnblogs.com 樓主QQ:47920381,AgileEAS.NET QQ群:113723486(AgileEAS SOA 平臺)/上限1000人 199463175(AgileEAS SOA 交流)/上限1000人 120661978(AgileEAS.NET 平臺交流)/上限1000人 郵件:[email protected],[email protected], 電話:18629261335。 |
|