#pragma once #include "LibLsp/lsp/lsp_diagnostic.h" #include "LibLsp/JsonRpc/Cancellation.h" #include "LibLsp/JsonRpc/lsResponseMessage.h" #include "LibLsp/JsonRpc/RequestInMessage.h" #include "LibLsp/JsonRpc/NotificationInMessage.h" #include "traits.h" #include #include #include "threaded_queue.h" #include #include "MessageIssue.h" #include "LibLsp/JsonRpc/MessageJsonHandler.h" #include "Endpoint.h" #include "future.h" #include "MessageProducer.h" class MessageJsonHandler; class Endpoint; struct LspMessage; class RemoteEndPoint; namespace lsp { class ostream; class istream; //////////////////////////////////////////////////////////////////////////////// // ResponseOrError //////////////////////////////////////////////////////////////////////////////// // ResponseOrError holds either the response to a request or an error // message. template struct ResponseOrError { using Request = T; ResponseOrError(); ResponseOrError(const T& response); ResponseOrError(T&& response); ResponseOrError(const Rsp_Error& error); ResponseOrError(Rsp_Error&& error); ResponseOrError(const ResponseOrError& other); ResponseOrError(ResponseOrError&& other) noexcept; ResponseOrError& operator=(const ResponseOrError& other); ResponseOrError& operator=(ResponseOrError&& other) noexcept; bool IsError() const { return is_error; } std::string ToJson() { if (is_error) return error.ToJson(); return response.ToJson(); } T response; Rsp_Error error; // empty represents success. bool is_error; }; template ResponseOrError::ResponseOrError(): is_error(false) { } template ResponseOrError::ResponseOrError(const T& resp) : response(resp), is_error(false) {} template ResponseOrError::ResponseOrError(T&& resp) : response(std::move(resp)), is_error(false) {} template ResponseOrError::ResponseOrError(const Rsp_Error& err) : error(err), is_error(true) {} template ResponseOrError::ResponseOrError(Rsp_Error&& err) : error(std::move(err)), is_error(true) {} template ResponseOrError::ResponseOrError(const ResponseOrError& other) : response(other.response), error(other.error), is_error(other.is_error) {} template ResponseOrError::ResponseOrError(ResponseOrError&& other) noexcept : response(std::move(other.response)), error(std::move(other.error)), is_error(other.is_error) {} template ResponseOrError& ResponseOrError::operator=( const ResponseOrError& other) { response = other.response; error = other.error; is_error = other.is_error; return *this; } template ResponseOrError& ResponseOrError::operator=(ResponseOrError&& other) noexcept { response = std::move(other.response); error = std::move(other.error); is_error = other.is_error; return *this; } } class RemoteEndPoint :MessageIssueHandler { template using ParamType = lsp::traits::ParameterType; template using IsRequest = lsp::traits::EnableIfIsType; template using IsResponse = lsp::traits::EnableIfIsType; template using IsNotify = lsp::traits::EnableIfIsType; template using IsRequestHandler = lsp::traits::EnableIf>:: value>; template using IsRequestHandlerWithMonitor = lsp::traits::EnableIf>:: value>; public: RemoteEndPoint(const std::shared_ptr & json_handler, const std::shared_ptr < Endpoint >& localEndPoint, lsp::Log& _log, lsp::JSONStreamStyle style = lsp::JSONStreamStyle::Standard, uint8_t max_workers = 2); ~RemoteEndPoint() override; template , typename ResponseType = typename RequestType::Response> IsRequestHandler< F, lsp::ResponseOrError > registerHandler(F&& handler) { processRequestJsonHandler(handler); local_endpoint->registerRequestHandler(RequestType::kMethodInfo, [=](std::unique_ptr msg) { auto req = reinterpret_cast(msg.get()); lsp::ResponseOrError res(handler(*req)); if (res.is_error) { res.error.id = req->id; send(res.error); } else { res.response.id = req->id; send(res.response); } return true; }); } template , typename ResponseType = typename RequestType::Response> IsRequestHandlerWithMonitor< F, lsp::ResponseOrError > registerHandler(F&& handler) { processRequestJsonHandler(handler); local_endpoint->registerRequestHandler(RequestType::kMethodInfo, [=](std::unique_ptr msg) { auto req = static_cast(msg.get()); lsp::ResponseOrError res(handler(*req , getCancelMonitor(req->id))); if (res.is_error) { res.error.id = req->id; send(res.error); } else { res.response.id = req->id; send(res.response); } return true; }); } using RequestErrorCallback = std::function; template > void send(T& request, F&& handler, RequestErrorCallback onError) { processRequestJsonHandler(handler); auto cb = [=](std::unique_ptr msg) { if (!msg) return true; const auto result = msg.get(); if (static_cast(result)->IsErrorType()) { const auto rsp_error = static_cast(result); onError(*rsp_error); } else { handler(*static_cast(result)); } return true; }; internalSendRequest(request, cb); } template > IsNotify registerHandler(F&& handler) { { std::lock_guard lock(m_sendMutex); if (!jsonHandler->GetNotificationJsonHandler(NotifyType::kMethodInfo)) { jsonHandler->SetNotificationJsonHandler(NotifyType::kMethodInfo, [](Reader& visitor) { return NotifyType::ReflectReader(visitor); }); } } local_endpoint->registerNotifyHandler(NotifyType::kMethodInfo, [=](std::unique_ptr msg) { handler(*static_cast(msg.get())); return true; }); } template > lsp::future< lsp::ResponseOrError > send(T& request) { processResponseJsonHandler(request); using Response = typename T::Response; auto promise = std::make_shared< lsp::promise>>(); auto cb = [=](std::unique_ptr msg) { if (!msg) return true; auto result = msg.get(); if (reinterpret_cast(result)->IsErrorType()) { Rsp_Error* rsp_error = static_cast(result); Rsp_Error temp; std::swap(temp, *rsp_error); promise->set_value(std::move(lsp::ResponseOrError(std::move(temp)))); } else { Response temp; std::swap(temp, *static_cast(result)); promise->set_value(std::move(lsp::ResponseOrError(std::move(temp)))); } return true; }; internalSendRequest(request, cb); return promise->get_future(); } template > std::unique_ptr> waitResponse(T& request, const unsigned time_out = 0) { auto future_rsp = send(request); if (time_out == 0) { future_rsp.wait(); } else { auto state = future_rsp.wait_for(std::chrono::milliseconds(time_out)); if (lsp::future_status::timeout == state) { return {}; } } using Response = typename T::Response; return std::make_unique>(std::move(future_rsp.get())); } void send(NotificationInMessage& msg) { sendMsg(msg); } void send(ResponseInMessage& msg) { sendMsg(msg); } void sendNotification(NotificationInMessage& msg) { send(msg); } void sendResponse(ResponseInMessage& msg) { send(msg); } template T createRequest() { auto req = T(); req.id.set(getNextRequestId()); return req; } int getNextRequestId(); bool cancelRequest(const lsRequestId&); void startProcessingMessages(std::shared_ptr r, std::shared_ptr w); bool isWorking() const; void stop(); std::unique_ptr internalWaitResponse(RequestInMessage&, unsigned time_out = 0); bool internalSendRequest(RequestInMessage &info, GenericResponseHandler handler); void handle(std::vector&&) override; void handle(MessageIssue&&) override; private: CancelMonitor getCancelMonitor(const lsRequestId&); void sendMsg(LspMessage& msg); void mainLoop(std::unique_ptr); bool dispatch(const std::string&); template > IsRequest processRequestJsonHandler(const F& handler) { std::lock_guard lock(m_sendMutex); if (!jsonHandler->GetRequestJsonHandler(RequestType::kMethodInfo)) { jsonHandler->SetRequestJsonHandler(RequestType::kMethodInfo, [](Reader& visitor) { return RequestType::ReflectReader(visitor); }); } } template > void processResponseJsonHandler(T& request) { using Response = typename T::Response; std::lock_guard lock(m_sendMutex); if (!jsonHandler->GetResponseJsonHandler(T::kMethodInfo)) { jsonHandler->SetResponseJsonHandler(T::kMethodInfo, [](Reader& visitor) { if (visitor.HasMember("error")) return Rsp_Error::ReflectReader(visitor); return Response::ReflectReader(visitor); }); } } struct Data; Data* d_ptr; std::shared_ptr < MessageJsonHandler> jsonHandler; std::mutex m_sendMutex; std::shared_ptr < Endpoint > local_endpoint; public: std::shared_ptr < std::thread > message_producer_thread_; };