Skip to content

Commit 98fcebc

Browse files
committed
core rabbitmq 测试。
升级sdk到1.1.0
1 parent c174de4 commit 98fcebc

24 files changed

Lines changed: 877 additions & 27 deletions

NetCoreWebApp.sln

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "NetCoreWebApp", "src\NetCor
1414
EndProject
1515
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "NetCoreApp.Logger.File", "src\NetCoreApp.Logger.File\NetCoreApp.Logger.File.xproj", "{03616715-5482-4F5C-921B-A4DDA9793DDC}"
1616
EndProject
17+
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "MQ", "src\MQ\MQ.xproj", "{E666614B-728C-48BF-A535-4B519A9A3459}"
18+
EndProject
19+
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "MQClient", "src\MQClient\MQClient.xproj", "{C6425AF8-D448-4731-8F9E-C876BDDFBC39}"
20+
EndProject
21+
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "MSServer", "src\MSServer\MSServer.xproj", "{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D}"
22+
EndProject
1723
Global
1824
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1925
Debug|Any CPU = Debug|Any CPU
@@ -28,12 +34,27 @@ Global
2834
{03616715-5482-4F5C-921B-A4DDA9793DDC}.Debug|Any CPU.Build.0 = Debug|Any CPU
2935
{03616715-5482-4F5C-921B-A4DDA9793DDC}.Release|Any CPU.ActiveCfg = Release|Any CPU
3036
{03616715-5482-4F5C-921B-A4DDA9793DDC}.Release|Any CPU.Build.0 = Release|Any CPU
37+
{E666614B-728C-48BF-A535-4B519A9A3459}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
38+
{E666614B-728C-48BF-A535-4B519A9A3459}.Debug|Any CPU.Build.0 = Debug|Any CPU
39+
{E666614B-728C-48BF-A535-4B519A9A3459}.Release|Any CPU.ActiveCfg = Release|Any CPU
40+
{E666614B-728C-48BF-A535-4B519A9A3459}.Release|Any CPU.Build.0 = Release|Any CPU
41+
{C6425AF8-D448-4731-8F9E-C876BDDFBC39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
42+
{C6425AF8-D448-4731-8F9E-C876BDDFBC39}.Debug|Any CPU.Build.0 = Debug|Any CPU
43+
{C6425AF8-D448-4731-8F9E-C876BDDFBC39}.Release|Any CPU.ActiveCfg = Release|Any CPU
44+
{C6425AF8-D448-4731-8F9E-C876BDDFBC39}.Release|Any CPU.Build.0 = Release|Any CPU
45+
{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
46+
{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D}.Debug|Any CPU.Build.0 = Debug|Any CPU
47+
{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D}.Release|Any CPU.ActiveCfg = Release|Any CPU
48+
{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D}.Release|Any CPU.Build.0 = Release|Any CPU
3149
EndGlobalSection
3250
GlobalSection(SolutionProperties) = preSolution
3351
HideSolutionNode = FALSE
3452
EndGlobalSection
3553
GlobalSection(NestedProjects) = preSolution
3654
{61604E42-0CDB-4B8A-82E1-32A8C87DB979} = {27FD4CA6-58E6-4A77-A4A4-235584780305}
3755
{03616715-5482-4F5C-921B-A4DDA9793DDC} = {27FD4CA6-58E6-4A77-A4A4-235584780305}
56+
{E666614B-728C-48BF-A535-4B519A9A3459} = {27FD4CA6-58E6-4A77-A4A4-235584780305}
57+
{C6425AF8-D448-4731-8F9E-C876BDDFBC39} = {27FD4CA6-58E6-4A77-A4A4-235584780305}
58+
{B6730AC2-8BDA-4D01-B5CE-0B3222CD615D} = {27FD4CA6-58E6-4A77-A4A4-235584780305}
3859
EndGlobalSection
3960
EndGlobal

src/Logger/project.json

Lines changed: 0 additions & 14 deletions
This file was deleted.

src/MQ/EventBus.cs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Net.Http;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Configuration;
9+
using Microsoft.Extensions.Logging;
10+
using MQ.Internal;
11+
using Newtonsoft.Json;
12+
using RabbitMQ.Client;
13+
using RabbitMQ.Client.Events;
14+
using Microsoft.Extensions.DependencyInjection;
15+
namespace MQ
16+
{
17+
public class EventBus
18+
{
19+
static SpinLock sl = new SpinLock();
20+
static JsonSerializerSettings setting = new Newtonsoft.Json.JsonSerializerSettings();
21+
static EventBusSettings settings;
22+
static ILogger _logger;
23+
static CacheManager _CacheManager;
24+
static EventBus()
25+
{
26+
setting.DateFormatString = "yyyy/MM/dd HH:mm:ss.fff";
27+
}
28+
/// <summary>
29+
///
30+
/// </summary>
31+
/// <typeparam name="T"></typeparam>
32+
/// <param name="productId"></param>
33+
/// <param name="topic">exchange</param>
34+
/// <param name="tag">routingKey</param>
35+
/// <param name="id"></param>
36+
/// <param name="message"></param>
37+
public static ulong Publish<T>(string productId, string topic, string tag, string id, T message)
38+
{
39+
var t = typeof(T);
40+
string msgStr;
41+
if (t == typeof(string))
42+
msgStr = message.ToString();
43+
else
44+
msgStr = Newtonsoft.Json.JsonConvert.SerializeObject(message, t, setting);
45+
46+
var msg = Encoding.UTF8.GetBytes(msgStr);
47+
bool reflock = false;
48+
sl.Enter(ref reflock);
49+
try
50+
{
51+
var channel = _CacheManager.GetChannel(productId);
52+
var pubNo = channel.NextPublishSeqNo;
53+
channel.BasicPublish(topic, tag, false, null, msg);
54+
return pubNo;
55+
}
56+
catch (Exception ex)
57+
{
58+
}
59+
finally
60+
{
61+
if (reflock)
62+
sl.Exit();
63+
}
64+
return 0;
65+
}
66+
/// <summary>
67+
///
68+
/// </summary>
69+
/// <typeparam name="T"></typeparam>
70+
/// <param name="productId"></param>
71+
/// <param name="topic">集群消费:queuename;广播消费:exchange</param>
72+
/// <param name="action"></param>
73+
/// <param name="options"></param>
74+
public static async void Subscribe<T>(string productId, string topic, Func<T, bool> action, SubscribeOptions options = null)
75+
{
76+
options = options ?? SubscribeOptions.Default;
77+
var queueName = topic;
78+
var channel = _CacheManager.GetChannel(productId);
79+
80+
if (options.Model == MessageModel.Broadcasting)
81+
{
82+
var hostName = System.Net.Dns.GetHostName();
83+
var ipaddress = await System.Net.Dns.GetHostEntryAsync(hostName).ConfigureAwait(false);
84+
var ip = ipaddress.AddressList.FirstOrDefault(p => p.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)?.ToString() ?? hostName;
85+
var fanoutQueueName = ip + "." + productId + "." + topic + "." + DateTime.UtcNow.Ticks;
86+
channel.QueueDeclare(queue: fanoutQueueName);
87+
channel.QueueBind(fanoutQueueName, topic, "", null);
88+
queueName = fanoutQueueName;
89+
}
90+
91+
var consumer = new EventingBasicConsumer(channel);
92+
consumer.Shutdown += (obj, ea) =>
93+
{
94+
_logger.LogError("eventbus.consumer.shutdown:" + ea.ToJson());
95+
};
96+
channel.BasicQos(0, 1, false);
97+
var tx = typeof(T) == typeof(string);
98+
consumer.Received += (obj, ea) =>
99+
{
100+
try
101+
{
102+
var body = Encoding.UTF8.GetString(ea.Body);
103+
var result = false;
104+
if (tx)
105+
result = action((T)((object)body));
106+
else
107+
{
108+
var msg = JsonConvert.DeserializeObject<T>(body, setting);
109+
result = action(msg);
110+
}
111+
if (result)
112+
{
113+
consumer.Model.BasicAck(ea.DeliveryTag, true);
114+
}
115+
else
116+
{
117+
consumer.Model.BasicReject(ea.DeliveryTag, false);
118+
}
119+
}
120+
catch (Exception ex)
121+
{
122+
_logger.LogError(0, ex, "eventbus.subscribe.consumer");
123+
}
124+
};
125+
_logger.LogInformation("eventbus.subscribe." + queueName);
126+
channel.BasicConsume(queueName, false, consumer);
127+
}
128+
129+
public static void Exit()
130+
{
131+
((IDisposable)_CacheManager).Dispose();
132+
}
133+
134+
public static void Configure(IConfigurationRoot config, IServiceProvider internalServiceProvider = null)
135+
{
136+
var logger = internalServiceProvider?.GetService<ILogger<EventBus>>();
137+
_logger = logger;
138+
_CacheManager = new CacheManager();
139+
_CacheManager.Configure(new EventBusSettings(config, logger));
140+
}
141+
142+
}
143+
}
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@
33
using System.Linq;
44
using System.Threading.Tasks;
55

6-
namespace Logger
6+
namespace MQ
77
{
8-
public class Class1
9-
{
10-
public Class1()
11-
{
12-
}
13-
}
148
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using Autofac;
7+
using Microsoft.Practices.ServiceLocation;
8+
9+
namespace MQ.Internal
10+
{
11+
public class AutofacServiceLocator : ServiceLocatorImplBase
12+
{
13+
/// <summary>
14+
/// The <see cref="T:Autofac.IComponentContext" /> from which services
15+
/// should be located.
16+
/// </summary>
17+
private readonly IComponentContext _container;
18+
19+
/// <summary>
20+
/// Initializes a new instance of the <see cref="T:Autofac.Extras.CommonServiceLocator.AutofacServiceLocator" /> class.
21+
/// </summary>
22+
/// <param name="container">
23+
/// The <see cref="T:Autofac.IComponentContext" /> from which services
24+
/// should be located.
25+
/// </param>
26+
/// <exception cref="T:System.ArgumentNullException">
27+
/// Thrown if <paramref name="container" /> is <see langword="null" />.
28+
/// </exception>
29+
public AutofacServiceLocator(IComponentContext container)
30+
{
31+
if (container == null)
32+
{
33+
throw new ArgumentNullException("container");
34+
}
35+
this._container = container;
36+
}
37+
38+
/// <summary>
39+
/// Resolves the requested service instance.
40+
/// </summary>
41+
/// <param name="serviceType">Type of instance requested.</param>
42+
/// <param name="key">Name of registered service you want. May be <see langword="null" />.</param>
43+
/// <returns>The requested service instance.</returns>
44+
/// <exception cref="T:System.ArgumentNullException">
45+
/// Thrown if <paramref name="serviceType" /> is <see langword="null" />.
46+
/// </exception>
47+
protected override object DoGetInstance(Type serviceType, string key)
48+
{
49+
if (serviceType == null)
50+
{
51+
throw new ArgumentNullException("serviceType");
52+
}
53+
if (key == null)
54+
{
55+
return ResolutionExtensions.Resolve(this._container, serviceType);
56+
}
57+
return ResolutionExtensions.ResolveNamed(this._container, key, serviceType);
58+
}
59+
60+
/// <summary>
61+
/// Resolves all requested service instances.
62+
/// </summary>
63+
/// <param name="serviceType">Type of instance requested.</param>
64+
/// <returns>Sequence of service instance objects.</returns>
65+
/// <exception cref="T:System.ArgumentNullException">
66+
/// Thrown if <paramref name="serviceType" /> is <see langword="null" />.
67+
/// </exception>
68+
protected override IEnumerable<object> DoGetAllInstances(Type serviceType)
69+
{
70+
if (serviceType == null)
71+
{
72+
throw new ArgumentNullException("serviceType");
73+
}
74+
Type type = typeof(IEnumerable).MakeGenericType(new Type[]
75+
{
76+
serviceType
77+
});
78+
return Enumerable.Cast<object>((IEnumerable)ResolutionExtensions.Resolve(this._container, type));
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)