sunnycase 8 лет назад
Родитель
Сommit
bba8e5c99f

+ 27 - 16
src/MineCase.Gateway/Network/ClientSession.cs

@@ -1,4 +1,5 @@
 using Microsoft.Extensions.ObjectPool;
+using MineCase.Buffers;
 using MineCase.Protocol;
 using MineCase.Server.Network;
 using Orleans;
@@ -23,12 +24,14 @@ namespace MineCase.Gateway.Network
         private readonly OutcomingPacketObserver _outcomingPacketObserver;
         private readonly ActionBlock<UncompressedPacket> _outcomingPacketDispatcher;
         private readonly ObjectPool<UncompressedPacket> _uncompressedPacketObjectPool;
+        private readonly IBufferPool<byte> _bufferPool;
 
-        public ClientSession(TcpClient tcpClient, IGrainFactory grainFactory, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
+        public ClientSession(TcpClient tcpClient, IGrainFactory grainFactory, IBufferPool<byte> bufferPool, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
         {
             _sessionId = Guid.NewGuid();
             _tcpClient = tcpClient;
             _grainFactory = grainFactory;
+            _bufferPool = bufferPool;
             _uncompressedPacketObjectPool = uncompressedPacketObjectPool;
             _outcomingPacketObserver = new OutcomingPacketObserver(this);
             _outcomingPacketDispatcher = new ActionBlock<UncompressedPacket>(SendOutcomingPacket);
@@ -57,35 +60,43 @@ namespace MineCase.Gateway.Network
 
         private void OnClosed()
         {
-            _outcomingPacketDispatcher.Complete();
-            _tcpClient.Client.Shutdown(SocketShutdown.Send);
+            _outcomingPacketDispatcher.Post(null);
         }
 
         private async Task DispatchIncomingPacket()
         {
-            var packet = _uncompressedPacketObjectPool.Get();
-            try
+            using (var bufferScope = _bufferPool.CreateScope())
             {
-                if (_useCompression)
+                var packet = _uncompressedPacketObjectPool.Get();
+                try
                 {
-                    var compressedPacket = await CompressedPacket.DeserializeAsync(_remoteStream, null);
-                    packet = PacketCompress.Decompress(ref compressedPacket);
+                    if (_useCompression)
+                    {
+                        var compressedPacket = await CompressedPacket.DeserializeAsync(_remoteStream, null);
+                        packet = PacketCompress.Decompress(ref compressedPacket);
+                    }
+                    else
+                    {
+                        packet = await UncompressedPacket.DeserializeAsync(_remoteStream, bufferScope, packet);
+                    }
+                    await DispatchIncomingPacket(packet);
                 }
-                else
+                finally
                 {
-                    packet = await UncompressedPacket.DeserializeAsync(_remoteStream, packet);
+                    _uncompressedPacketObjectPool.Return(packet);
                 }
-                await DispatchIncomingPacket(packet);
-            }
-            finally
-            {
-                _uncompressedPacketObjectPool.Return(packet);
             }
         }
 
         private async Task SendOutcomingPacket(UncompressedPacket packet)
         {
-            if (_useCompression)
+            // Close
+            if(packet == null)
+            {
+                _tcpClient.Client.Shutdown(SocketShutdown.Send);
+                _outcomingPacketDispatcher.Complete();
+            }
+            else if (_useCompression)
             {
                 var newPacket = PacketCompress.Compress(ref packet);
                 await newPacket.SerializeAsync(_remoteStream);

+ 5 - 2
src/MineCase.Gateway/Network/ConnectionRouter.cs

@@ -1,5 +1,6 @@
 using Microsoft.Extensions.Logging;
 using Microsoft.Extensions.ObjectPool;
+using MineCase.Buffers;
 using MineCase.Protocol;
 using MineCase.Server.Settings;
 using Orleans;
@@ -18,12 +19,14 @@ namespace MineCase.Gateway.Network
         private readonly TcpListener _listener;
         private readonly IGrainFactory _grainFactory;
         private readonly ILogger _logger;
+        private readonly IBufferPool<byte> _bufferPool;
         private readonly ObjectPool<UncompressedPacket> _uncompressedPacketObjectPool;
 
-        public ConnectionRouter(IGrainFactory grainFactory, ILoggerFactory loggerFactory, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
+        public ConnectionRouter(IGrainFactory grainFactory, ILoggerFactory loggerFactory, IBufferPool<byte> bufferPool, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
         {
             _grainFactory = grainFactory;
             _logger = loggerFactory.CreateLogger<ConnectionRouter>();
+            _bufferPool = bufferPool;
             _uncompressedPacketObjectPool = uncompressedPacketObjectPool;
             _listener = new TcpListener(new IPEndPoint(IPAddress.Any, 25565));
         }
@@ -44,7 +47,7 @@ namespace MineCase.Gateway.Network
             try
             {
                 _logger.LogInformation($"Incoming connection from {tcpClient.Client.RemoteEndPoint}.");
-                using (var session = new ClientSession(tcpClient, _grainFactory, _uncompressedPacketObjectPool))
+                using (var session = new ClientSession(tcpClient, _grainFactory, _bufferPool, _uncompressedPacketObjectPool))
                 {
                     await session.Startup(cancellationToken);
                 }

+ 3 - 0
src/MineCase.Gateway/Program.cs

@@ -8,6 +8,8 @@ using Microsoft.Extensions.Logging;
 using MineCase.Gateway.Network;
 using Microsoft.Extensions.ObjectPool;
 using MineCase.Protocol;
+using MineCase.Buffers;
+using System.Buffers;
 
 namespace MineCase.Gateway
 {
@@ -55,6 +57,7 @@ namespace MineCase.Gateway
                 var provider = s.GetRequiredService<ObjectPoolProvider>();
                 return provider.Create<UncompressedPacket>();
             });
+            services.AddSingleton<IBufferPool<byte>>(s => new BufferPool<byte>(ArrayPool<byte>.Shared));
         }
 
         private static IConfiguration LoadConfiguration()

+ 68 - 0
src/MineCase.Protocol/Buffers/IBufferPool.cs

@@ -0,0 +1,68 @@
+using System;
+using System.Buffers;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+
+namespace MineCase.Buffers
+{
+    public interface IBufferPool<T>
+    {
+        IBufferPoolScope<T> CreateScope();
+    }
+
+    public interface IBufferPoolScope<T> : IDisposable
+    {
+        ArraySegment<T> Rent(int length);
+    }
+
+    public class BufferPool<T> : IBufferPool<T>
+    {
+        private readonly ArrayPool<T> _arrayPool;
+        private readonly ConcurrentBag<BufferPoolScope> _scopes = new ConcurrentBag<BufferPoolScope>();
+
+        public BufferPool(ArrayPool<T> arrayPool)
+        {
+            _arrayPool = arrayPool;
+        }
+
+        public IBufferPoolScope<T> CreateScope()
+        {
+            if (!_scopes.TryTake(out var scope))
+                scope = new BufferPoolScope(this, _arrayPool);
+            return scope;
+        }
+
+        private void Return(BufferPoolScope scope)
+        {
+            _scopes.Add(scope);
+        }
+
+        private class BufferPoolScope : IBufferPoolScope<T>
+        {
+            private readonly BufferPool<T> _bufferPool;
+            private readonly ArrayPool<T> _arrayPool;
+            private readonly ConcurrentBag<T[]> _rents = new ConcurrentBag<T[]>();
+
+            public BufferPoolScope(BufferPool<T> bufferPool, ArrayPool<T> arrayPool)
+            {
+                _bufferPool = bufferPool;
+                _arrayPool = arrayPool;
+            }
+
+            public void Dispose()
+            {
+                while (_rents.TryTake(out var rent))
+                    _arrayPool.Return(rent);
+                _bufferPool.Return(this);
+            }
+
+            public ArraySegment<T> Rent(int length)
+            {
+                var rent = _arrayPool.Rent(length);
+                _rents.Add(rent);
+                return new ArraySegment<T>(rent, 0, length);
+            }
+        }
+    }
+}

+ 7 - 6
src/MineCase.Protocol/Protocol/Packet.cs

@@ -4,6 +4,7 @@ using System.IO;
 using System.Runtime.ExceptionServices;
 using System.Text;
 using System.Threading.Tasks;
+using MineCase.Buffers;
 using MineCase.Serialization;
 using Orleans.Concurrency;
 
@@ -19,11 +20,11 @@ namespace MineCase.Protocol
         public uint PacketId;
 
         [SerializeAs(DataType.ByteArray)]
-        public byte[] Data;
+        public ArraySegment<byte> Data;
 
         public async Task SerializeAsync(Stream stream)
         {
-            Length = (uint)Data.Length + PacketId.SizeOfVarInt();
+            Length = (uint)Data.Count + PacketId.SizeOfVarInt();
 
             using (var bw = new BinaryWriter(stream, Encoding.UTF8, true))
             {
@@ -32,10 +33,10 @@ namespace MineCase.Protocol
                 bw.Flush();
             }
 
-            await stream.WriteAsync(Data, 0, Data.Length);
+            await stream.WriteAsync(Data.Array, Data.Offset, Data.Count);
         }
 
-        public static async Task<UncompressedPacket> DeserializeAsync(Stream stream, UncompressedPacket packet = null)
+        public static async Task<UncompressedPacket> DeserializeAsync(Stream stream, IBufferPoolScope<byte> bufferPool, UncompressedPacket packet = null)
         {
             packet = packet ?? new UncompressedPacket();
             int packetIdLen;
@@ -45,8 +46,8 @@ namespace MineCase.Protocol
                 packet.PacketId = br.ReadAsVarInt(out packetIdLen);
             }
 
-            packet.Data = new byte[packet.Length - packetIdLen];
-            await stream.ReadExactAsync(packet.Data, 0, packet.Data.Length);
+            packet.Data = bufferPool.Rent((int)(packet.Length - packetIdLen));
+            await stream.ReadExactAsync(packet.Data.Array, packet.Data.Offset, packet.Data.Count);
             return packet;
         }
     }

+ 6 - 2
src/MineCase.Protocol/Protocol/PacketCompress.cs

@@ -11,6 +11,8 @@ namespace MineCase.Protocol
     {
         public static UncompressedPacket Decompress(ref CompressedPacket packet)
         {
+            throw new NotImplementedException();
+            /*
             var targetPacket = default(UncompressedPacket);
             using (var br = new BinaryReader(new DeflateStream(new MemoryStream(packet.CompressedData), CompressionMode.Decompress)))
             {
@@ -19,11 +21,13 @@ namespace MineCase.Protocol
             }
 
             targetPacket.Length = packet.DataLength;
-            return targetPacket;
+            return targetPacket;*/
         }
 
         public static CompressedPacket Compress(ref UncompressedPacket packet)
         {
+            throw new NotImplementedException();
+            /*
             var targetPacket = default(CompressedPacket);
             using (var stream = new MemoryStream())
             using (var bw = new BinaryWriter(new DeflateStream(stream, CompressionMode.Compress)))
@@ -36,7 +40,7 @@ namespace MineCase.Protocol
                 targetPacket.CompressedData = stream.ToArray();
             }
 
-            return targetPacket;
+            return targetPacket;*/
         }
     }
 }

+ 2 - 2
src/MineCase.Server.Grains/Game/ChunkSenderGrain.cs

@@ -19,7 +19,7 @@ namespace MineCase.Server.Game
             return base.OnActivateAsync();
         }
 
-        public Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUser> users)
+        public Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUserChunkLoader> loaders)
         {
             var stream = GetStreamProvider(StreamProviders.JobsProvider).GetStream<SendChunkJob>(_jobWorkerId, StreamProviders.Namespaces.ChunkSender);
             stream.OnNextAsync(new SendChunkJob
@@ -28,7 +28,7 @@ namespace MineCase.Server.Game
                 ChunkX = x,
                 ChunkZ = z,
                 Clients = clients,
-                Users = users
+                Loaders = loaders
             }).Ignore();
             return Task.CompletedTask;
         }

+ 3 - 3
src/MineCase.Server.Grains/Game/ChunkSenderJobWorker.cs

@@ -24,7 +24,7 @@ namespace MineCase.Server.Game
 
         public IReadOnlyCollection<IClientboundPacketSink> Clients { get; set; }
 
-        public IReadOnlyCollection<IUser> Users { get; set; }
+        public IReadOnlyCollection<IUserChunkLoader> Loaders { get; set; }
     }
 
     internal interface IChunkSenderJobWorker : IGrainWithGuidKey
@@ -53,8 +53,8 @@ namespace MineCase.Server.Game
 
             var generator = new ClientPlayPacketGenerator(new BroadcastPacketSink(job.Clients, _packetPackager));
             await generator.ChunkData(Dimension.Overworld, job.ChunkX, job.ChunkZ, await chunkColumn.GetState());
-            foreach (var user in job.Users)
-                user.OnChunkSent(job.ChunkX, job.ChunkZ).Ignore();
+            foreach (var loader in job.Loaders)
+                loader.OnChunkSent(job.ChunkX, job.ChunkZ).Ignore();
         }
 
         private class BroadcastPacketSink : IPacketSink

+ 1 - 0
src/MineCase.Server.Grains/MineCase.Server.Grains.csproj

@@ -10,6 +10,7 @@
   <ItemGroup>
     <PackageReference Include="Autofac" Version="4.6.1" />
     <PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
+    <PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="1.2.2" />
     <PackageReference Include="Microsoft.Orleans.Core" Version="2.0.0-preview2-20170724" />
     <PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004" PrivateAssets="All" />
     <PackageReference Include="Newtonsoft.Json" Version="10.0.3" />

+ 1 - 1
src/MineCase.Server.Grains/Network/ClientboundPacketSinkGrain.cs

@@ -52,7 +52,7 @@ namespace MineCase.Server.Network
             var packet = new UncompressedPacket
             {
                 PacketId = packetId,
-                Data = data.Value
+                Data = new ArraySegment<byte>(data.Value)
             };
             _subsManager.Notify(n => n.ReceivePacket(packet));
             return Task.CompletedTask;

+ 11 - 4
src/MineCase.Server.Grains/Network/PacketPackager.cs

@@ -4,19 +4,26 @@ using System.IO;
 using System.Reflection;
 using System.Text;
 using System.Threading.Tasks;
+using Microsoft.IO;
 using MineCase.Protocol;
 
 namespace MineCase.Server.Network
 {
     internal class PacketPackager : IPacketPackager
     {
+        private readonly RecyclableMemoryStreamManager _memoryStreamMgr;
+
+        public PacketPackager(RecyclableMemoryStreamManager memoryStreamMgr)
+        {
+            _memoryStreamMgr = memoryStreamMgr;
+        }
+
         public Task<(uint packetId, byte[] data)> PreparePacket(ISerializablePacket packet)
         {
-            using (var stream = new MemoryStream())
-            using (var bw = new BinaryWriter(stream))
+            using (var stream = _memoryStreamMgr.GetStream())
             {
-                packet.Serialize(bw);
-                bw.Flush();
+                using (var bw = new BinaryWriter(stream, Encoding.UTF8, true))
+                    packet.Serialize(bw);
                 return Task.FromResult((GetPacketId(packet), stream.ToArray()));
             }
         }

+ 134 - 0
src/MineCase.Server.Grains/User/UserChunkLoaderGrain.cs

@@ -0,0 +1,134 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using MineCase.Server.Game;
+using MineCase.Server.Game.Entities;
+using MineCase.Server.Network;
+using MineCase.Server.Network.Play;
+using MineCase.Server.World;
+using Orleans;
+
+namespace MineCase.Server.User
+{
+    internal class UserChunkLoaderGrain : Grain, IUserChunkLoader
+    {
+        private IUser _user;
+        private IClientboundPacketSink _sink;
+        private IWorld _world;
+        private ClientPlayPacketGenerator _generator;
+        private IPlayer _player;
+
+        private (int x, int z)? _lastStreamedChunk;
+        private HashSet<(int x, int z)> _sendingChunks;
+        private HashSet<(int x, int z)> _sentChunks;
+
+        private int _viewDistance = 10;
+
+        public override Task OnActivateAsync()
+        {
+            _user = GrainFactory.GetGrain<IUser>(this.GetPrimaryKey());
+            return Task.CompletedTask;
+        }
+
+        public Task OnChunkSent(int chunkX, int chunkZ)
+        {
+            _sendingChunks.Remove((chunkX, chunkZ));
+            _sentChunks.Add((chunkX, chunkZ));
+            return Task.CompletedTask;
+        }
+
+        public async Task OnGameTick()
+        {
+            for (int i = 0; i < 4; i++)
+            {
+                if (!await StreamNextChunk())
+                    break;
+            }
+
+            // unload per 5 ticks
+            if (await _world.GetAge() % 100 == 0)
+                await UnloadOutOfRangeChunks();
+        }
+
+        private async Task<bool> StreamNextChunk()
+        {
+            var currentChunk = await _player.GetChunkPosition();
+            if (_lastStreamedChunk.HasValue && _lastStreamedChunk.Value.Equals((currentChunk.x, currentChunk.z))) return true;
+
+            for (int d = 0; d <= _viewDistance; d++)
+            {
+                for (int x = -d; x <= d; x++)
+                {
+                    var z = d - Math.Abs(x);
+
+                    if (await StreamChunk(currentChunk.x + x, currentChunk.z + z))
+                        return false;
+                    if (await StreamChunk(currentChunk.x + x, currentChunk.z - z))
+                        return false;
+                }
+            }
+
+            _lastStreamedChunk = (currentChunk.x, currentChunk.z);
+            return true;
+        }
+
+        private async Task<bool> StreamChunk(int chunkX, int chunkZ)
+        {
+            var trunkSender = GrainFactory.GetGrain<IChunkSender>(_world.GetPrimaryKeyString());
+            if (!_sentChunks.Contains((chunkX, chunkZ)) && _sendingChunks.Add((chunkX, chunkZ)))
+            {
+                await trunkSender.PostChunk(chunkX, chunkZ, new[] { _sink }, new[] { this.AsReference<IUserChunkLoader>() });
+                return true;
+            }
+
+            return false;
+        }
+
+        private readonly List<(int x, int y)> _clonedSentChunks = new List<(int x, int y)>();
+
+        private List<(int x, int z)> CloneSentChunks()
+        {
+            _clonedSentChunks.Clear();
+            _clonedSentChunks.AddRange(_sentChunks);
+            return _clonedSentChunks;
+        }
+
+        private async Task UnloadOutOfRangeChunks()
+        {
+            var currentChunk = await _player.GetChunkPosition();
+            foreach (var chunk in CloneSentChunks())
+            {
+                var distance = Math.Abs(chunk.x - currentChunk.x) + Math.Abs(chunk.z - currentChunk.z);
+                if (distance > _viewDistance)
+                {
+                    await _generator.UnloadChunk(chunk.x, chunk.z);
+                    _sentChunks.Remove(chunk);
+                }
+            }
+        }
+
+        public Task SetClientPacketSink(IClientboundPacketSink sink)
+        {
+            _sink = sink;
+            _generator = new ClientPlayPacketGenerator(sink);
+            return Task.CompletedTask;
+        }
+
+        public Task JoinGame(IWorld world, IPlayer player)
+        {
+            _world = world;
+            _player = player;
+            _lastStreamedChunk = null;
+            _sendingChunks = new HashSet<(int x, int z)>();
+            _sentChunks = new HashSet<(int x, int z)>();
+            return Task.CompletedTask;
+        }
+
+        public Task SetViewDistance(int viewDistance)
+        {
+            _viewDistance = viewDistance;
+            return Task.CompletedTask;
+        }
+    }
+}

+ 6 - 86
src/MineCase.Server.Grains/User/UserGrain.cs

@@ -20,12 +20,12 @@ namespace MineCase.Server.User
     {
         private string _name;
         private uint _protocolVersion;
-        private int _viewDistance = 10;
         private string _worldId;
         private IWorld _world;
         private IClientboundPacketSink _sink;
         private IPacketRouter _packetRouter;
         private ClientPlayPacketGenerator _generator;
+        private IUserChunkLoader _chunkLoader;
         private IDisposable _sendKeepAliveTimer;
         private IDisposable _worldTimeSyncTimer;
         public HashSet<uint> _keepAliveWaiters;
@@ -39,10 +39,6 @@ namespace MineCase.Server.User
 
         private IPlayer _player;
 
-        private (int x, int z)? _lastStreamedChunk;
-        private HashSet<(int x, int z)> _sendingChunks;
-        private HashSet<(int x, int z)> _sentChunks;
-
         public override async Task OnActivateAsync()
         {
             if (string.IsNullOrEmpty(_worldId))
@@ -53,6 +49,7 @@ namespace MineCase.Server.User
             }
 
             _world = await GrainFactory.GetGrain<IWorldAccessor>(0).GetWorld(_worldId);
+            _chunkLoader = GrainFactory.GetGrain<IUserChunkLoader>(this.GetPrimaryKey());
         }
 
         public Task<IClientboundPacketSink> GetClientPacketSink()
@@ -72,7 +69,7 @@ namespace MineCase.Server.User
         {
             _sink = sink;
             _generator = new ClientPlayPacketGenerator(sink);
-            return Task.CompletedTask;
+            return _chunkLoader.SetClientPacketSink(sink);
         }
 
         public async Task JoinGame()
@@ -83,11 +80,9 @@ namespace MineCase.Server.User
             await _player.BindToUser(this);
             await _world.AttachEntity(_player);
 
-            _lastStreamedChunk = null;
+            await _chunkLoader.JoinGame(_world, _player);
             _state = UserState.JoinedGame;
             _keepAliveWaiters = new HashSet<uint>();
-            _sendingChunks = new HashSet<(int x, int z)>();
-            _sentChunks = new HashSet<(int x, int z)>();
             _sendKeepAliveTimer = RegisterTimer(OnSendKeepAliveRequests, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
 
             // _worldTimeSyncTimer = RegisterTimer(OnSyncWorldTime, null, TimeSpan.Zero, )
@@ -202,74 +197,7 @@ namespace MineCase.Server.User
             }
 
             if (_state >= UserState.JoinedGame && _state < UserState.Destroying)
-            {
-                for (int i = 0; i < 4; i++)
-                {
-                    if (!await StreamNextChunk())
-                        break;
-                }
-
-                // unload per 5 ticks
-                if (await _world.GetAge() % 100 == 0)
-                    await UnloadOutOfRangeChunks();
-            }
-        }
-
-        private async Task<bool> StreamNextChunk()
-        {
-            var currentChunk = await _player.GetChunkPosition();
-            if (_lastStreamedChunk.HasValue && _lastStreamedChunk.Value.Equals((currentChunk.x, currentChunk.z))) return true;
-
-            for (int d = 0; d <= _viewDistance; d++)
-            {
-                for (int x = -d; x <= d; x++)
-                {
-                    var z = d - Math.Abs(x);
-
-                    if (await StreamChunk(currentChunk.x + x, currentChunk.z + z))
-                        return false;
-                    if (await StreamChunk(currentChunk.x + x, currentChunk.z - z))
-                        return false;
-                }
-            }
-
-            _lastStreamedChunk = (currentChunk.x, currentChunk.z);
-            return true;
-        }
-
-        private async Task<bool> StreamChunk(int chunkX, int chunkZ)
-        {
-            var trunkSender = GrainFactory.GetGrain<IChunkSender>(_world.GetPrimaryKeyString());
-            if (!_sentChunks.Contains((chunkX, chunkZ)) && _sendingChunks.Add((chunkX, chunkZ)))
-            {
-                await trunkSender.PostChunk(chunkX, chunkZ, new[] { _sink }, new[] { this.AsReference<IUser>() });
-                return true;
-            }
-
-            return false;
-        }
-
-        private readonly List<(int x, int y)> _clonedSentChunks = new List<(int x, int y)>();
-
-        private List<(int x, int z)> CloneSentChunks()
-        {
-            _clonedSentChunks.Clear();
-            _clonedSentChunks.AddRange(_sentChunks);
-            return _clonedSentChunks;
-        }
-
-        private async Task UnloadOutOfRangeChunks()
-        {
-            var currentChunk = await _player.GetChunkPosition();
-            foreach (var chunk in CloneSentChunks())
-            {
-                var distance = Math.Abs(chunk.x - currentChunk.x) + Math.Abs(chunk.z - currentChunk.z);
-                if (distance > _viewDistance)
-                {
-                    await _generator.UnloadChunk(chunk.x, chunk.z);
-                    _sentChunks.Remove(chunk);
-                }
-            }
+                await _chunkLoader.OnGameTick();
         }
 
         public Task SetPacketRouter(IPacketRouter packetRouter)
@@ -280,15 +208,7 @@ namespace MineCase.Server.User
 
         public Task SetViewDistance(int viewDistance)
         {
-            _viewDistance = viewDistance;
-            return Task.CompletedTask;
-        }
-
-        public Task OnChunkSent(int chunkX, int chunkZ)
-        {
-            _sendingChunks.Remove((chunkX, chunkZ));
-            _sentChunks.Add((chunkX, chunkZ));
-            return Task.CompletedTask;
+            return _chunkLoader.SetViewDistance(viewDistance);
         }
 
         private enum UserState : uint

+ 2 - 0
src/MineCase.Server.Grains/User/UserModule.cs

@@ -10,6 +10,8 @@ namespace MineCase.Server.User
         protected override void Load(ContainerBuilder builder)
         {
             builder.RegisterType<NonAuthenticatedUserGrain>();
+            builder.RegisterType<UserGrain>();
+            builder.RegisterType<UserChunkLoaderGrain>();
         }
     }
 }

+ 1 - 1
src/MineCase.Server.Interfaces/Game/IChunkSender.cs

@@ -10,6 +10,6 @@ namespace MineCase.Server.Game
 {
     public interface IChunkSender : IGrainWithStringKey
     {
-        Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUser> users);
+        Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUserChunkLoader> loaders);
     }
 }

+ 0 - 2
src/MineCase.Server.Interfaces/User/IUser.cs

@@ -47,7 +47,5 @@ namespace MineCase.Server.User
         Task SetPacketRouter(IPacketRouter packetRouter);
 
         Task SetViewDistance(int viewDistance);
-
-        Task OnChunkSent(int chunkX, int chunkZ);
     }
 }

+ 26 - 0
src/MineCase.Server.Interfaces/User/IUserChunkLoader.cs

@@ -0,0 +1,26 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using MineCase.Server.Game.Entities;
+using MineCase.Server.Network;
+using MineCase.Server.World;
+using Orleans;
+using Orleans.Concurrency;
+
+namespace MineCase.Server.User
+{
+    public interface IUserChunkLoader : IGrainWithGuidKey
+    {
+        Task SetClientPacketSink(IClientboundPacketSink sink);
+
+        Task JoinGame(IWorld world, IPlayer player);
+
+        Task OnGameTick();
+
+        [OneWay]
+        Task OnChunkSent(int chunkX, int chunkZ);
+
+        Task SetViewDistance(int viewDistance);
+    }
+}

+ 2 - 0
src/MineCase.Server/Startup.cs

@@ -4,6 +4,7 @@ using Microsoft.Extensions.Configuration;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.DependencyInjection.Extensions;
 using Microsoft.Extensions.Logging;
+using Microsoft.IO;
 using System;
 using System.IO;
 
@@ -31,6 +32,7 @@ namespace MineCase.Server
             services.AddOptions();
             services.AddSingleton(ConfigureLogging());
             services.AddLogging();
+            services.AddSingleton<RecyclableMemoryStreamManager>();
 
             var container = new ContainerBuilder();
             container.Populate(services);