22 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
23 #define __SSRC_WISP_PROTOCOL_CALLER_H
25 #include <ssrc/spread.h>
34 #include <boost/multi_index_container.hpp>
35 #include <boost/multi_index/mem_fun.hpp>
36 #include <boost/multi_index/hashed_index.hpp>
37 #include <boost/multi_index/composite_key.hpp>
41 using NS_SSRC_SPREAD::GroupList;
42 using NS_SSRC_SPREAD::Message;
43 using NS_SSRC_SPREAD::Mailbox;
87 template<
typename CallerType,
typename ParameterType,
103 typedef typename caller_type::template Future<return_type>
future_type;
116 template<
typename CallerType,
typename ParameterType>
137 #define WISP_ONE_WAY_CALL(caller, method) \
138 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
148 #define WISP_TWO_WAY_CALL(caller, method, result) \
149 typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
160 std::runtime_error(message)
224 const unsigned int size = 0) :
225 id(id), token(token), role(role), message_size(size) { }
231 id = 0, token = 0, role = 0, message_size = 0;
240 const unsigned int _size = 0)
242 id = _id, token = _token, role = _role, message_size = _size;
250 return (message_size > 0);
253 template<
class Archive>
255 ar &
id & token & role & message_size;
272 MessageInfo(
const unsigned int message_capacity = Message::DefaultCapacity) :
273 header(), message(message_capacity), groups()
277 return message.type();
293 return message.sender();
300 header.
init(
id, token, role);
316 template<
typename PT = BinaryPackingTraits>
318 template<
typename ReturnType>
friend class Future;
327 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
328 static const unsigned int MaxUnfragmentedMessageSize =
332 typedef std::deque<message_info_ptr> request_queue;
336 typedef boost::multi_index_container<
338 boost::multi_index::indexed_by<
339 boost::multi_index::hashed_unique<
341 &MessageInfo::token> >
344 typedef boost::multi_index_container<
346 boost::multi_index::indexed_by<
347 boost::multi_index::hashed_unique<
348 boost::multi_index::composite_key<
350 boost::multi_index::const_mem_fun<MessageInfo,
const std::string &,
351 &MessageInfo::sender>,
355 &MessageInfo::token> > >
356 > > jumbo_message_map;
359 wisp_call_token _call_token;
361 unpacker_type _unpacker;
364 request_queue _requests;
365 response_map _responses;
366 jumbo_message_map _jumbo_messages;
367 jumbo_message_map::key_from_value _jumbo_key_from_value;
369 const unsigned int _initial_message_capacity;
371 message_info_ptr _info;
372 MessageInfo _send_info;
380 std::ostringstream message;
381 message <<
"Mismatched message id or protocol.\nExpected protocol: "
382 << expected_proto <<
" Actual protocol: " << actual_proto
383 <<
"\nExpected id: " << expected_id
384 <<
" Actual id: " << actual_id << std::endl;
388 static void throwCallException(
const wisp_call_token expected_token,
389 const wisp_call_token actual_token)
392 std::ostringstream message;
393 message <<
"Mismatched call token.\nExpected token: "
394 << expected_token <<
" Actual token: " << actual_token
399 void reset_receive_info() {
403 void push_request() {
404 _requests.push_back(_info);
405 reset_receive_info();
408 void insert_response() {
409 _responses.insert(_info);
410 reset_receive_info();
413 enum JumboFragmentResult {
414 JumboFragmentFirstFragment,
415 JumboFragmentNextFragment,
416 JumboFragmentCompleteRequest,
417 JumboFragmentCompleteResponse
420 JumboFragmentResult insert_jumbo_fragment(
const message_info_ptr & msginfo,
421 const unsigned int header_size)
423 JumboFragmentResult result = JumboFragmentNextFragment;
424 Message & message = msginfo->message;
425 jumbo_message_map::iterator it =
426 _jumbo_messages.find(_jumbo_key_from_value(msginfo));
428 if(it == _jumbo_messages.end()) {
430 message_info_ptr info(make_smart_ptr<message_info_ptr>(msginfo->header.message_size));
431 message.seek(message.size());
433 _jumbo_messages.insert(info);
434 result = JumboFragmentFirstFragment;
436 const message_info_ptr & info = *it;
438 info->message.write(&message[header_size], message.size()-header_size);
440 if(info->message.offset() >= info->header.message_size) {
441 info->message.seek(header_size);
443 _responses.insert(info);
444 result = JumboFragmentCompleteResponse;
446 _requests.push_back(info);
447 result = JumboFragmentCompleteRequest;
449 _jumbo_messages.erase(it);
456 template<
typename MessageType>
457 void receive_response(MessageType & msg,
const wisp_call_token token)
458 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
461 begin_receive_response:
462 typename response_map::iterator it(_responses.find(token));
464 if(it == _responses.end()) {
465 if(!_info.unique()) {
466 reset_receive_info();
469 Message & message = _info->message;
471 _mbox.receive(message);
472 _mbox.copy_groups(_info->groups);
474 if(message.is_regular()) {
475 CallHeader & header = _info->header;
479 const unsigned int header_size = _unpacker.unpack(header, message);
481 if(header.is_jumbo_fragment()) {
482 const JumboFragmentResult jumbo_result =
483 insert_jumbo_fragment(_info, header_size);
484 if(jumbo_result == JumboFragmentCompleteResponse) {
485 goto begin_receive_response;
489 if(header.token == token)
497 _info->clear_header();
504 _responses.erase(it);
509 wisp_call_token next_token() {
510 return _call_token++;
513 template<
typename DestinationType,
typename MessageType>
514 void send(
const DestinationType & dest,
515 const MessageType & msg,
516 const wisp_call_token token,
518 const Message::Service service)
519 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
521 bool done_packing(
false);
531 #define __WISP_WRITE_AREA_EXHAUSTED "write area exhausted"
537 _packer.pack(msg, _send_info.
message);
539 }
catch(
const std::ios_base::failure & e) {
547 }
while(!done_packing);
549 #undef __WISP_WRITE_AREA_EXHAUSTED
551 _send_info.
message.set_type(msg.protocol);
552 _send_info.
message.set_service(service);
554 if(_send_info.
message.size() <= MaxUnfragmentedMessageSize) {
555 _mbox.send(_send_info.
message, dest);
557 const unsigned int message_size =
561 const unsigned int header_size =
563 const unsigned int max_fragment_size =
564 MaxUnfragmentedMessageSize - header_size;
568 for(
unsigned int offset = header_size, size = 0;
569 offset < message_size; offset+=size)
571 size = std::min(max_fragment_size, message_size - offset);
572 _mbox.clear_message_parts();
573 _mbox.add_message_part(&_send_info.
message[0], header_size);
574 _mbox.add_message_part(&_send_info.
message[offset], size);
575 _mbox.send(dest, msg.protocol, service);
577 _mbox.clear_message_parts();
585 std::tuple<std::string, wisp_call_role, wisp_call_token>
588 template<
typename ReturnType>
593 wisp_call_token _token;
597 _valid(true), _token(token), _caller(caller)
619 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
625 _caller.receive_response(result, _token);
631 void unpack(return_type & result, MessageInfo & info)
632 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
638 if(info.token() == _token)
639 _caller.
unpack(result, info);
641 throwCallException(_token, info.token());
648 Caller(
const std::string & connection =
"",
649 const std::string & name =
"",
650 const unsigned int message_capacity = Message::DefaultCapacity,
652 _mbox(connection, name, group_membership), _call_token(0),
653 _packer(), _unpacker(), _requests(), _responses(),
655 _jumbo_key_from_value(_jumbo_messages.key_extractor()),
656 _initial_message_capacity(message_capacity),
658 _send_info(message_capacity)
661 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
683 const std::string &
name()
const {
684 return _mbox.private_group();
688 return _mbox.group_membership();
691 void join(
const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
695 void leave(
const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
700 return _initial_message_capacity;
704 return _info->message.capacity();
708 return _send_info.
message.capacity();
712 return _requests.size();
716 return _responses.size();
720 return (_responses.find(token) != _responses.end());
724 return _jumbo_messages.size();
727 template<
typename key_container>
729 for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
730 end = _jumbo_messages.end(); it != end; ++it)
732 const MessageInfo *info = it->get();
737 template<
typename iterator_type>
739 const iterator_type & end)
742 for(iterator_type it = begin; it != end; ++it) {
743 jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
744 if(msg != _jumbo_messages.end())
745 _jumbo_messages.erase(msg);
750 template<
typename MessageType>
751 void unpack(MessageType & msg, MessageInfo & info)
752 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
755 if(info.id() == MessageType::id &&
756 info.protocol() == MessageType::protocol)
757 _unpacker.unpack(msg, info.message);
759 throwCallException(MessageType::protocol, info.protocol(),
760 MessageType::id, info.id());
770 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
773 if(!_responses.empty()) {
774 response_map::iterator it = _responses.begin();
776 _responses.erase(it);
777 }
else if(!_requests.empty()) {
778 info = _requests.front();
779 _requests.pop_front();
782 Message & message = _info->message;
784 _mbox.receive(message);
785 _mbox.copy_groups(_info->groups);
787 if(message.is_regular()) {
792 const unsigned int header_size = _unpacker.unpack(header, message);
795 const JumboFragmentResult jumbo_result =
796 insert_jumbo_fragment(_info, header_size);
797 if(jumbo_result == JumboFragmentCompleteResponse) {
798 response_map::iterator it = _responses.begin();
800 _responses.erase(it);
802 }
else if(jumbo_result == JumboFragmentCompleteRequest) {
803 info = _requests.front();
804 _requests.pop_front();
810 _info->clear_header();
821 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
823 if(!_requests.empty()) {
824 info = _requests.front();
825 _requests.pop_front();
827 if(!_info.unique()) {
828 reset_receive_info();
831 Message & message = _info->message;
833 _mbox.receive(message);
834 _mbox.copy_groups(_info->groups);
836 if(message.is_regular()) {
841 const unsigned int header_size = _unpacker.unpack(header, message);
844 const JumboFragmentResult jumbo_result =
845 insert_jumbo_fragment(_info, header_size);
846 if(jumbo_result == JumboFragmentCompleteRequest) {
847 info = _requests.front();
848 _requests.pop_front();
858 _info->clear_header();
866 template<
typename Traits,
typename DestinationType>
867 void send(
const DestinationType & dest,
868 const typename Traits::parameter_type & param,
870 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
872 static_assert(Traits::call_type ==
OneWay,
"expected call_type OneWay");
876 template<
typename Traits,
typename DestinationType>
877 void reply(
const DestinationType & dest,
878 const wisp_call_token token,
879 const typename Traits::parameter_type & param,
881 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
883 static_assert(Traits::call_type ==
OneWay,
"expected call_type OneWay");
888 template<
typename Traits,
typename DestinationType>
889 void call(
const DestinationType & dest,
890 typename Traits::return_type *ret,
891 const typename Traits::parameter_type & param,
893 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
896 static_assert(Traits::call_type ==
TwoWay,
"expected call_type TwoWay");
897 wisp_call_token token = next_token();
899 receive_response(*ret, token);
903 template<
typename Traits,
typename DestinationType>
904 typename Future<typename Traits::return_type>::shared_ptr
905 call(
const DestinationType & dest,
906 const typename Traits::parameter_type & param,
908 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
910 static_assert(Traits::call_type ==
TwoWay,
"expected call_type TwoWay");
912 wisp_call_token token = next_token();
918 return typename future_type::shared_ptr(
new future_type(*
this, token));
922 template<CallType call_type>
struct CallTag { };
924 template<
typename Traits,
typename DestinationType>
925 typename Future<typename Traits::return_type>::shared_ptr
926 _call(
const CallTag<OneWay> &,
927 const DestinationType & dest,
928 const typename Traits::parameter_type & param,
930 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
932 typedef Future<typename Traits::return_type> future_type;
933 send<Traits>(dest, param, service);
934 return typename future_type::shared_ptr();
937 template<
typename Traits,
typename DestinationType>
938 typename Future<typename Traits::return_type>::shared_ptr
939 _call(
const CallTag<TwoWay> &,
940 const DestinationType & dest,
941 const typename Traits::parameter_type & param,
943 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
945 return call<Traits>(dest, param, service);
950 template<
typename Traits,
typename DestinationType>
952 const DestinationType & dest,
953 const wisp_call_token token,
954 const typename Traits::parameter_type & param,
955 const Message::Service service =
957 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
959 reply<Traits>(dest, token, param, service);
962 template<
typename Traits,
typename DestinationType>
964 const DestinationType & dest,
965 typename Traits::return_type *ret,
966 const typename Traits::parameter_type & param,
967 const Message::Service service =
969 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
972 call<Traits>(dest, ret, param, service);
975 template<
typename Traits,
typename DestinationType>
976 typename Future<typename Traits::return_type>::shared_ptr
978 const DestinationType & dest,
979 const typename Traits::parameter_type & param,
981 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
983 return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
994 template<
typename Traits,
typename DestinationType>
995 void send(
const DestinationType & dest,
997 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
999 send<Traits>(dest,
typename Traits::parameter_type(), service);
1002 template<
typename Traits,
typename DestinationType,
typename... P>
1003 void send(
const Message::Service service,
1004 const DestinationType & dest,
1006 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1008 send<Traits>(dest,
typename Traits::parameter_type(std::forward<P>(p)...),
1012 template<
typename Traits,
typename DestinationType,
typename... P>
1013 void sendp(
const DestinationType & dest, P && ...p)
1015 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1017 send<Traits>(dest,
typename Traits::parameter_type(std::forward<P>(p)...));
1022 template<
typename Traits,
typename DestinationType>
1023 void reply(
const DestinationType & dest,
1024 const wisp_call_token token,
1026 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1028 reply<Traits>(dest, token,
typename Traits::parameter_type(), service);
1031 template<
typename Traits,
typename DestinationType>
1033 const DestinationType & dest,
1034 const wisp_call_token token,
1035 const Message::Service service =
1037 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1039 reply<Traits>(dest, token,
typename Traits::parameter_type(), service);
1042 template<
typename Traits,
typename DestinationType,
typename... P>
1043 void reply(
const Message::Service service,
1044 const DestinationType & dest,
1045 const wisp_call_token token,
1047 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1049 reply<Traits>(dest, token,
1050 typename Traits::parameter_type(std::forward<P>(p)...),
1054 template<
typename Traits,
typename DestinationType,
typename... P>
1056 const wisp_call_token token,
1058 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1060 reply<Traits>(dest, token,
1061 typename Traits::parameter_type(std::forward<P>(p)...));
1065 template<
typename Traits,
typename DestinationType>
1066 void call(
const DestinationType & dest,
1067 typename Traits::return_type *ret,
1069 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
1072 call<Traits>(dest, ret,
typename Traits::parameter_type(), service);
1075 template<
typename Traits,
typename DestinationType>
1077 const DestinationType & dest,
1078 typename Traits::return_type *ret,
1080 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure,
1083 call<Traits>(dest, ret,
typename Traits::parameter_type(), service);
1086 template<
typename Traits,
typename DestinationType,
typename... P>
1087 void call(
const Message::Service service,
1088 const DestinationType & dest,
1089 typename Traits::return_type *ret,
1091 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1093 call<Traits>(dest, ret,
1094 typename Traits::parameter_type(std::forward<P>(p)...),
1098 template<
typename Traits,
typename DestinationType,
typename... P>
1099 void callp(
const DestinationType & dest,
1100 typename Traits::return_type *ret,
1102 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1104 call<Traits>(dest, ret,
1105 typename Traits::parameter_type(std::forward<P>(p)...));
1109 template<
typename Traits,
typename DestinationType>
1110 typename Future<typename Traits::return_type>::shared_ptr
1113 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1115 return call<Traits>(dest,
typename Traits::parameter_type(), service);
1118 template<
typename Traits,
typename DestinationType,
typename... P>
1119 typename Future<typename Traits::return_type>::shared_ptr
1120 call(
const Message::Service service,
const DestinationType & dest,
1122 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1124 return call<Traits>(dest,
1125 typename Traits::parameter_type(std::forward<P>(p)...),
1129 template<
typename Traits,
typename DestinationType,
typename... P>
1130 typename Future<typename Traits::return_type>::shared_ptr
1131 callp(
const DestinationType & dest, P && ...p)
1132 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1134 return call<Traits>(dest,
1135 typename Traits::parameter_type(std::forward<P>(p)...));
1140 template<
typename Traits,
typename DestinationType>
1141 typename Future<typename Traits::return_type>::shared_ptr
1143 const DestinationType & dest,
1145 SSRC_DECL_THROW(
boost::archive::archive_exception,
std::ios_base::failure)
1148 _call<Traits>(CallTag<Traits::call_type>(),
1149 dest,
typename Traits::parameter_type(), service);