gRPC 참고 링크
- gRPC 서비스와 HTTP API 비교 | Microsoft Docs
LiteDB 참고 링크
- LiteDB :: A .NET embedded NoSQL database
* 개발 환경
1. .NET 5
2. gRPC Service
Server proto | Client proto |
- 서버
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using LiteDB;
using Microsoft.Extensions.Logging;
namespace GrpcServiceTest
{
public class LogingService : LogPT.LogPTBase
{
private readonly ILogger<LogingService> _logger;
private ConcurrentQueue<LogMessageRequest> logsQueue; // 로그 쌓는 큐
private Thread InsertThread; // insert 하는 쓰레드
private LiteDatabase db; // LiteDB
private LiteCollection<LogMessageRequest> LiteCollection; // LogMessageRequest 형식의 컬렉션
public LogingService(ILogger<LogingService> logger)
{
_logger = logger;
logsQueue = new ConcurrentQueue<LogMessageRequest>(); // ConcurrentQueue 생성
db = new LiteDatabase("Filename=.\\TestDB.db;connection=shared"); // connection=shared 가 빠지면 DB를 독점으로 사용해 다른 프로세스에서 접근 불가.
LiteCollection = (LiteCollection<LogMessageRequest>)db.GetCollection<LogMessageRequest>(); // LogMessageRequest 형식의 컬렉션
InsertThread = new Thread(new ThreadStart(InsertDB)); // 쓰레드 생성
InsertThread.Start(); // 쓰레드 시작
}
public override async Task<LogMessageReply> SendLog(IAsyncStreamReader<LogMessageRequest> requestStream, ServerCallContext context) // 스트림 형식 request
{
try
{
await foreach (var message in requestStream.ReadAllAsync()) // C# 8.0 이후 가능
{
logsQueue.Enqueue(message); // Queue에 Enqueue
_logger.LogInformation($"Enqueue Success {message.ProgramName}, {message.DateTime}, {message.Severitylevel}, {message.Host}, {message.LogMessage}");
}
}
catch (Exception e)
{
_logger.LogError(e.ToString());
return await Task.FromResult(new LogMessageReply
{
ResultMessage = false // 실패시 false 리턴
});
}
return await Task.FromResult(new LogMessageReply
{
ResultMessage = true // 성공시 true 리턴
});
}
private void InsertDB()
{
LogMessageRequest logMessageRequest = new LogMessageRequest(); //
while (true)
{
if (logsQueue.Count > 0) // queue에 있을 때만 Dequeue
{
if (logsQueue.TryDequeue(out logMessageRequest)) // ConcurrentQueue는 TryDequeue 사용
{
LiteCollection.Insert(logMessageRequest); // db insert
_logger.LogInformation($"Insert Success {logMessageRequest.ProgramName}, {logMessageRequest.DateTime}, {logMessageRequest.Severitylevel}, {logMessageRequest.Host}, {logMessageRequest.LogMessage}");
var RequestList = LiteCollection.FindAll().ToList(); // db Select
foreach (var user in RequestList)
{
_logger.LogInformation($"select {user.ProgramName}, {user.DateTime}, {user.Severitylevel}, {user.Host}, {user.LogMessage}");
}
}
else
{
_logger.LogWarning($"Insert Fail {logMessageRequest.ProgramName}, {logMessageRequest.DateTime}, {logMessageRequest.Severitylevel}, {logMessageRequest.Host}, {logMessageRequest.LogMessage}");
}
}
}
}
}
}
- 클라이언트
using System;
using System.Threading.Tasks;
using Grpc.Net.Client;
namespace gRPC_ClientTest
{
class Program
{
static async Task Main(string[] args)
{
// The port number(5001) must match the port of the gRPC server.
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new LogPT.LogPTClient(channel);
using var call = client.SendLog();
await call.RequestStream.WriteAsync(new LogMessageRequest { ProgramName = "Client", DateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm.ss.zzz"), Severitylevel = "info", Host = "127.0.0.1", LogMessage = "test" });
await call.RequestStream.CompleteAsync(); // write 후 다 전송 했다고 서버에 complete 보냄.
var response = await call;
Console.WriteLine("LogPT: " + response.ResultMessage.ToString());
Console.ReadKey();
}
}
}
ConcurrentQueue 참고 MSDN : ConcurrentQueue<T> 클래스 (System.Collections.Concurrent) | Microsoft Docs
스레드로부터 안전한 FIFO(선입선출) 방식의 컬렉션을 나타냅니다.
'C#' 카테고리의 다른 글
[C#] 공공 데이터 오픈 API 사용(제대 군인 채용 정보 얻기) (0) | 2021.01.02 |
---|---|
[C#] 카카오페이지 크롤링 (0) | 2021.01.01 |
[C#] TCP Listener을 이용한 비동기 (2) | 2020.12.04 |
[C#] UDP Socket을 이용한 통신 (2) | 2020.12.04 |
[C#] TCP Socket을 이용한 통신 (0) | 2020.12.04 |