Savarese Software Research Corporation
Caller.h
Go to the documentation of this file.
1 /* Copyright 2006-2013 Savarese Software Research Corporation.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.savarese.com/software/ApacheLicense-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
22 #ifndef __SSRC_WISP_PROTOCOL_CALLER_H
23 #define __SSRC_WISP_PROTOCOL_CALLER_H
24 
25 #include <ssrc/spread.h>
28 
29 #include <queue>
30 #include <sstream>
31 #include <stdexcept>
32 #include <tuple>
33 
34 #include <boost/multi_index_container.hpp>
35 #include <boost/multi_index/mem_fun.hpp>
36 #include <boost/multi_index/hashed_index.hpp>
37 #include <boost/multi_index/composite_key.hpp>
38 
40 
41 using NS_SSRC_SPREAD::GroupList;
42 using NS_SSRC_SPREAD::Message;
43 using NS_SSRC_SPREAD::Mailbox;
45 
50 const Message::Service DefaultMessageServiceType = Message::SafeSelfDiscard;
51 
60 enum CallType {
63 
66 
67  /*, MultiWay */
68 };
69 
74 namespace detail {
76  struct VoidReturnType { };
77 }
78 
87 template<typename CallerType, typename ParameterType,
88  typename ReturnType = detail::VoidReturnType>
89 struct CallTraits {
91  static const CallType call_type = TwoWay;
92 
94  typedef ParameterType parameter_type;
95 
97  typedef ReturnType return_type;
98 
100  typedef CallerType caller_type;
101 
103  typedef typename caller_type::template Future<return_type> future_type;
104 
106  typedef typename future_type::shared_ptr future_ptr;
107 };
108 
116 template<typename CallerType, typename ParameterType>
117 struct CallTraits<CallerType, ParameterType, detail::VoidReturnType> {
119  static const CallType call_type = OneWay;
120 
122  typedef ParameterType parameter_type;
123 
126 
128  typedef CallerType caller_type;
129 };
130 
137 #define WISP_ONE_WAY_CALL(caller, method) \
138  typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method> \
139  Call ## method
140 
148 #define WISP_TWO_WAY_CALL(caller, method, result) \
149  typedef NS_SSRC_WISP_PROTOCOL::CallTraits<caller, Message ## method, Message ## result> \
150  Call ## method
151 
157 class CallException : public std::runtime_error {
158 public:
159  explicit CallException(const std::string & message) :
160  std::runtime_error(message)
161  { }
162 };
163 
168 typedef std::uint32_t wisp_call_token;
169 
174 typedef std::uint8_t wisp_call_role;
175 
176 
182 enum CallRole {
185 
188 
191 
192  /*, MultiWayResponse */
193 };
194 
195 
201 struct CallHeader {
211  unsigned int message_size;
212 
216  CallHeader() : id(0), token(0), role(0), message_size(0) { }
217 
222  const wisp_call_token token,
223  const CallRole role,
224  const unsigned int size = 0) :
225  id(id), token(token), role(role), message_size(size) { }
226 
230  void clear() {
231  id = 0, token = 0, role = 0, message_size = 0;
232  }
233 
237  void init(const wisp_message_id _id,
238  const wisp_call_token _token,
239  const CallRole _role,
240  const unsigned int _size = 0)
241  {
242  id = _id, token = _token, role = _role, message_size = _size;
243  }
244 
249  bool is_jumbo_fragment() const {
250  return (message_size > 0);
251  }
252 
253  template<class Archive>
254  void serialize(Archive & ar, const unsigned int) {
255  ar & id & token & role & message_size;
256  }
257 };
258 
263 struct MessageInfo {
267  Message message;
269  GroupList groups;
270 
271  explicit
272  MessageInfo(const unsigned int message_capacity = Message::DefaultCapacity) :
273  header(), message(message_capacity), groups()
274  { }
275 
277  return message.type();
278  }
279 
280  wisp_message_id id() const {
281  return header.id;
282  }
283 
285  return header.token;
286  }
287 
288  wisp_call_role role() const {
289  return header.role;
290  }
291 
292  const std::string & sender() const {
293  return message.sender();
294  }
295 
296  void init_header(const wisp_message_id id,
297  const wisp_call_token token,
298  const CallRole role)
299  {
300  header.init(id, token, role);
301  }
302 
303  void clear_header() {
304  header.clear();
305  }
306 };
307 
308 typedef std::shared_ptr<MessageInfo> message_info_ptr;
309 
310 const bool GroupMembershipEnable = true;
311 const bool GroupMembershipDisable = false;
312 
316 template<typename PT = BinaryPackingTraits>
317 class Caller {
318  template<typename ReturnType> friend class Future;
319 
320 public:
321  typedef PT packing_traits;
322  typedef typename packing_traits::packer_type packer_type;
323  typedef typename packing_traits::unpacker_type unpacker_type;
324 
325  // Don't remove this definition. It is necessary to prevent an
326  // unresolved MaxUnfragmentedMessageSize symbol in debug builds.
327 #define __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE 131072U
328  static const unsigned int MaxUnfragmentedMessageSize =
330 
331 private:
332  typedef std::deque<message_info_ptr> request_queue;
333 
334  // For now, use hashed_unique, but for multiple responses will need
335  // hashed_non_unique.
336  typedef boost::multi_index_container<
338  boost::multi_index::indexed_by<
339  boost::multi_index::hashed_unique<
340  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
341  &MessageInfo::token> >
342  > > response_map;
343 
344  typedef boost::multi_index_container<
346  boost::multi_index::indexed_by<
347  boost::multi_index::hashed_unique<
348  boost::multi_index::composite_key<
349  MessageInfo,
350  boost::multi_index::const_mem_fun<MessageInfo, const std::string &,
351  &MessageInfo::sender>,
352  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_role,
353  &MessageInfo::role>,
354  boost::multi_index::const_mem_fun<MessageInfo, wisp_call_token,
355  &MessageInfo::token> > >
356  > > jumbo_message_map;
357 
358  Mailbox _mbox;
359  wisp_call_token _call_token;
360  packer_type _packer;
361  unpacker_type _unpacker;
362 
364  request_queue _requests;
365  response_map _responses;
366  jumbo_message_map _jumbo_messages;
367  jumbo_message_map::key_from_value _jumbo_key_from_value;
368 
369  const unsigned int _initial_message_capacity;
370  // scratch variable
371  message_info_ptr _info;
372  MessageInfo _send_info;
373 
374  static void throwCallException(const wisp_message_protocol expected_proto,
375  const wisp_message_protocol actual_proto,
376  const wisp_message_id expected_id,
377  const wisp_message_id actual_id)
378  SSRC_DECL_THROW(CallException)
379  {
380  std::ostringstream message;
381  message << "Mismatched message id or protocol.\nExpected protocol: "
382  << expected_proto << " Actual protocol: " << actual_proto
383  << "\nExpected id: " << expected_id
384  << " Actual id: " << actual_id << std::endl;
385  throw CallException(message.str());
386  }
387 
388  static void throwCallException(const wisp_call_token expected_token,
389  const wisp_call_token actual_token)
390  SSRC_DECL_THROW(CallException)
391  {
392  std::ostringstream message;
393  message << "Mismatched call token.\nExpected token: "
394  << expected_token << " Actual token: " << actual_token
395  << std::endl;
396  throw CallException(message.str());
397  }
398 
399  void reset_receive_info() {
400  _info = make_smart_ptr<message_info_ptr>(std::min(_info->message.capacity(), __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE));
401  }
402 
403  void push_request() {
404  _requests.push_back(_info);
405  reset_receive_info();
406  }
407 
408  void insert_response() {
409  _responses.insert(_info);
410  reset_receive_info();
411  }
412 
413  enum JumboFragmentResult {
414  JumboFragmentFirstFragment,
415  JumboFragmentNextFragment,
416  JumboFragmentCompleteRequest,
417  JumboFragmentCompleteResponse
418  };
419 
420  JumboFragmentResult insert_jumbo_fragment(const message_info_ptr & msginfo,
421  const unsigned int header_size)
422  {
423  JumboFragmentResult result = JumboFragmentNextFragment;
424  Message & message = msginfo->message;
425  jumbo_message_map::iterator it =
426  _jumbo_messages.find(_jumbo_key_from_value(msginfo));
427 
428  if(it == _jumbo_messages.end()) {
429  // Pre-allocate memory for remaining fragments.
430  message_info_ptr info(make_smart_ptr<message_info_ptr>(msginfo->header.message_size));
431  message.seek(message.size());
432  *info = *msginfo;
433  _jumbo_messages.insert(info);
434  result = JumboFragmentFirstFragment;
435  } else {
436  const message_info_ptr & info = *it;
437 
438  info->message.write(&message[header_size], message.size()-header_size);
439 
440  if(info->message.offset() >= info->header.message_size) {
441  info->message.seek(header_size);
442  if(info->role() == TwoWayResponse) {
443  _responses.insert(info);
444  result = JumboFragmentCompleteResponse;
445  } else {
446  _requests.push_back(info);
447  result = JumboFragmentCompleteRequest;
448  }
449  _jumbo_messages.erase(it);
450  }
451  }
452  return result;
453  }
454 
455  // Receives call responses, but not requests and membership messages
456  template<typename MessageType>
457  void receive_response(MessageType & msg, const wisp_call_token token)
458  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
459  CallException)
460  {
461  begin_receive_response:
462  typename response_map::iterator it(_responses.find(token));
463 
464  if(it == _responses.end()) {
465  if(!_info.unique()) {
466  reset_receive_info();
467  }
468  do {
469  Message & message = _info->message;
470 
471  _mbox.receive(message);
472  _mbox.copy_groups(_info->groups);
473 
474  if(message.is_regular()) {
475  CallHeader & header = _info->header;
476 
477  message.rewind();
478 
479  const unsigned int header_size = _unpacker.unpack(header, message);
480 
481  if(header.is_jumbo_fragment()) {
482  const JumboFragmentResult jumbo_result =
483  insert_jumbo_fragment(_info, header_size);
484  if(jumbo_result == JumboFragmentCompleteResponse) {
485  goto begin_receive_response;
486  }
487  } else {
488  if(header.role == TwoWayResponse) {
489  if(header.token == token)
490  break;
491  else
492  insert_response();
493  } else
494  push_request();
495  }
496  } else {
497  _info->clear_header();
498  push_request();
499  }
500  } while(true);
501  unpack(msg, *_info);
502  } else {
503  unpack(msg, *(*it));
504  _responses.erase(it);
505  }
506  }
507 
508  // We assume integers wrap around to min at max.
509  wisp_call_token next_token() {
510  return _call_token++;
511  }
512 
513  template<typename DestinationType, typename MessageType>
514  void send(const DestinationType & dest,
515  const MessageType & msg,
516  const wisp_call_token token,
517  const CallRole role,
518  const Message::Service service)
519  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
520  {
521  bool done_packing(false);
522 
523  _send_info.init_header(msg.id, token, role);
524 
525  // We increase the message buffer size by a factor of two and try
526  // again when the write area is exhausted. We don't try to
527  // prevent an infinite loop because the message we are writing is
528  // finite in size; therefore the loop must terminate. See WISP-11
529  // for more information.
530 
531 #define __WISP_WRITE_AREA_EXHAUSTED "write area exhausted"
532 
533  do {
534  try {
535  _send_info.message.rewind();
536  _packer.pack(_send_info.header, _send_info.message);
537  _packer.pack(msg, _send_info.message);
538  done_packing = true;
539  } catch(const std::ios_base::failure & e) {
540  constexpr std::size_t len = sizeof(__WISP_WRITE_AREA_EXHAUSTED) - 1;
541  if(std::strncmp(e.what(), __WISP_WRITE_AREA_EXHAUSTED, len) == 0) {
542  _send_info.message.resize(2*_send_info.message.capacity());
543  } else {
544  throw;
545  }
546  }
547  } while(!done_packing);
548 
549 #undef __WISP_WRITE_AREA_EXHAUSTED
550 
551  _send_info.message.set_type(msg.protocol);
552  _send_info.message.set_service(service);
553 
554  if(_send_info.message.size() <= MaxUnfragmentedMessageSize) {
555  _mbox.send(_send_info.message, dest);
556  } else {
557  const unsigned int message_size =
558  _send_info.header.message_size = _send_info.message.size();
559  _send_info.message.rewind();
560 
561  const unsigned int header_size =
562  _packer.pack(_send_info.header, _send_info.message);
563  const unsigned int max_fragment_size =
564  MaxUnfragmentedMessageSize - header_size;
565 
566  // We never fragment a header, so there's no risk of
567  // header_size equaling message_size..
568  for(unsigned int offset = header_size, size = 0;
569  offset < message_size; offset+=size)
570  {
571  size = std::min(max_fragment_size, message_size - offset);
572  _mbox.clear_message_parts();
573  _mbox.add_message_part(&_send_info.message[0], header_size);
574  _mbox.add_message_part(&_send_info.message[offset], size);
575  _mbox.send(dest, msg.protocol, service);
576  }
577  _mbox.clear_message_parts();
578  }
579  }
580 
581 public:
582  // Using jumbo_message_map::key_type doesn't work because it's a composite
583  // key that stores a reference, so we have to use a compatible key.
584  typedef
585  std::tuple<std::string, wisp_call_role, wisp_call_token>
587 
588  template<typename ReturnType>
589  class Future {
590  friend class Caller;
591 
592  bool _valid;
593  wisp_call_token _token;
594  Caller & _caller;
595 
596  Future(Caller & caller, const wisp_call_token token) :
597  _valid(true), _token(token), _caller(caller)
598  { }
599 
600  public:
601  typedef ReturnType return_type;
602  typedef std::shared_ptr<Future> shared_ptr;
603 
604  // Note, you can't sit in a while(!future.ready()) loop because
605  // there's no receiver thread.
606  bool ready() const {
607  return _caller.returned(_token);
608  }
609 
610  bool valid() const {
611  return _valid;
612  }
613 
614  wisp_call_token token() const {
615  return _token;
616  }
617 
618  void receive(return_type & result)
619  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
621  {
622  if(!valid())
623  throw CallException("Invalid future.");
624 
625  _caller.receive_response(result, _token);
626 
627  _valid = false;
628  }
629 
630  // Exclusively for supporting continuations in Service
631  void unpack(return_type & result, MessageInfo & info)
632  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
634  {
635  if(!valid())
636  throw CallException("Invalid future.");
637 
638  if(info.token() == _token)
639  _caller.unpack(result, info);
640  else
641  throwCallException(_token, info.token());
642 
643  _valid = false;
644  }
645  };
646 
647  explicit
648  Caller(const std::string & connection = "",
649  const std::string & name = "",
650  const unsigned int message_capacity = Message::DefaultCapacity,
651  const bool group_membership = GroupMembershipDisable) :
652  _mbox(connection, name, group_membership), _call_token(0),
653  _packer(), _unpacker(), _requests(), _responses(),
654  _jumbo_messages(),
655  _jumbo_key_from_value(_jumbo_messages.key_extractor()),
656  _initial_message_capacity(message_capacity),
657  _info(make_smart_ptr<message_info_ptr>(std::min(message_capacity, __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE))),
658  _send_info(message_capacity)
659  { }
660 
661 #undef __WISP_CALLER_MAX_UNFRAGMENTED_MESSAGE_SIZE
662 
663  // Not intended to be subclassed outside of Wisp.
664  //virtual ~Caller() = default;
665 
666  // We expose this for allowing retrieval of group information.
667  const Mailbox & mbox() {
668  return _mbox;
669  }
670 
671  packer_type & packer() {
672  return _packer;
673  }
674 
675  unpacker_type & unpacker() {
676  return _unpacker;
677  }
678 
679  wisp_call_token call_token() {
680  return _call_token;
681  }
682 
683  const std::string & name() const {
684  return _mbox.private_group();
685  }
686 
687  bool group_membership() const {
688  return _mbox.group_membership();
689  }
690 
691  void join(const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
692  _mbox.join(group);
693  }
694 
695  void leave(const std::string & group) SSRC_DECL_THROW(NS_SSRC_SPREAD::Error) {
696  _mbox.leave(group);
697  }
698 
699  unsigned int message_capacity_initial() const {
700  return _initial_message_capacity;
701  }
702 
703  unsigned int message_capacity_receive() const {
704  return _info->message.capacity();
705  }
706 
707  unsigned int message_capacity_send() const {
708  return _send_info.message.capacity();
709  }
710 
711  unsigned int request_queue_size() const {
712  return _requests.size();
713  }
714 
715  unsigned int response_map_size() const {
716  return _responses.size();
717  }
718 
719  bool returned(const wisp_call_token token) const {
720  return (_responses.find(token) != _responses.end());
721  }
722 
723  unsigned int count_jumbo_messages() const {
724  return _jumbo_messages.size();
725  }
726 
727  template<typename key_container>
728  void collect_jumbo_message_keys(key_container & container) {
729  for(jumbo_message_map::const_iterator it = _jumbo_messages.begin(),
730  end = _jumbo_messages.end(); it != end; ++it)
731  {
732  const MessageInfo *info = it->get();
733  container.insert(container.end(), jumbo_message_key_type(info->sender(), info->role(), info->token()));
734  }
735  }
736 
737  template<typename iterator_type>
738  void erase_jumbo_messages(const iterator_type & begin,
739  const iterator_type & end)
740  {
741  // You can't erase a compatible key, so we have to perform a lookup first
742  for(iterator_type it = begin; it != end; ++it) {
743  jumbo_message_map::iterator msg = _jumbo_messages.find(*it);
744  if(msg != _jumbo_messages.end())
745  _jumbo_messages.erase(msg);
746  }
747  }
748 
749  // Call only after unpacking CallHeader so message offset is correct.
750  template<typename MessageType>
751  void unpack(MessageType & msg, MessageInfo & info)
752  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
754  {
755  if(info.id() == MessageType::id &&
756  info.protocol() == MessageType::protocol)
757  _unpacker.unpack(msg, info.message);
758  else
759  throwCallException(MessageType::protocol, info.protocol(),
760  MessageType::id, info.id());
761  }
762 
769  void receive(message_info_ptr & info)
770  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
771  {
772  // Drain responses first, then requests.
773  if(!_responses.empty()) {
774  response_map::iterator it = _responses.begin();
775  info = *it;
776  _responses.erase(it);
777  } else if(!_requests.empty()) {
778  info = _requests.front();
779  _requests.pop_front();
780  } else {
781  do {
782  Message & message = _info->message;
783 
784  _mbox.receive(message);
785  _mbox.copy_groups(_info->groups);
786 
787  if(message.is_regular()) {
788  CallHeader & header = _info->header;
789 
790  message.rewind();
791 
792  const unsigned int header_size = _unpacker.unpack(header, message);
793 
794  if(header.is_jumbo_fragment()) {
795  const JumboFragmentResult jumbo_result =
796  insert_jumbo_fragment(_info, header_size);
797  if(jumbo_result == JumboFragmentCompleteResponse) {
798  response_map::iterator it = _responses.begin();
799  info = *it;
800  _responses.erase(it);
801  return;
802  } else if(jumbo_result == JumboFragmentCompleteRequest) {
803  info = _requests.front();
804  _requests.pop_front();
805  return;
806  }
807  } else
808  break;
809  } else {
810  _info->clear_header();
811  break;
812  }
813  } while(true);
814  info = _info;
815  }
816  }
817 
818  // Also implement timeout.
819  // Receives requests and membership messages, but not call responses
820  void receive_request(message_info_ptr & info)
821  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
822  {
823  if(!_requests.empty()) {
824  info = _requests.front();
825  _requests.pop_front();
826  } else {
827  if(!_info.unique()) {
828  reset_receive_info();
829  }
830  do {
831  Message & message = _info->message;
832 
833  _mbox.receive(message);
834  _mbox.copy_groups(_info->groups);
835 
836  if(message.is_regular()) {
837  CallHeader & header = _info->header;
838 
839  message.rewind();
840 
841  const unsigned int header_size = _unpacker.unpack(header, message);
842 
843  if(header.is_jumbo_fragment()) {
844  const JumboFragmentResult jumbo_result =
845  insert_jumbo_fragment(_info, header_size);
846  if(jumbo_result == JumboFragmentCompleteRequest) {
847  info = _requests.front();
848  _requests.pop_front();
849  return;
850  }
851  } else {
852  if(header.role == TwoWayResponse)
853  insert_response();
854  else
855  break;
856  }
857  } else {
858  _info->clear_header();
859  break;
860  }
861  } while(true);
862  info = _info;
863  }
864  }
865 
866  template<typename Traits, typename DestinationType>
867  void send(const DestinationType & dest,
868  const typename Traits::parameter_type & param,
869  const Message::Service service = DefaultMessageServiceType)
870  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
871  {
872  static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
873  send(dest, param, next_token(), OneWayRequest, service);
874  }
875 
876  template<typename Traits, typename DestinationType>
877  void reply(const DestinationType & dest,
878  const wisp_call_token token,
879  const typename Traits::parameter_type & param,
880  const Message::Service service = DefaultMessageServiceType)
881  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
882  {
883  static_assert(Traits::call_type == OneWay, "expected call_type OneWay");
884  send(dest, param, token, TwoWayResponse, service);
885  }
886 
887  // Synchronous call.
888  template<typename Traits, typename DestinationType>
889  void call(const DestinationType & dest,
890  typename Traits::return_type *ret,
891  const typename Traits::parameter_type & param,
892  const Message::Service service = DefaultMessageServiceType)
893  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
895  {
896  static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
897  wisp_call_token token = next_token();
898  send(dest, param, token, TwoWayRequest, service);
899  receive_response(*ret, token);
900  }
901 
902  // Asynchronous call.
903  template<typename Traits, typename DestinationType>
904  typename Future<typename Traits::return_type>::shared_ptr
905  call(const DestinationType & dest,
906  const typename Traits::parameter_type & param,
907  const Message::Service service = DefaultMessageServiceType)
908  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
909  {
910  static_assert(Traits::call_type == TwoWay, "expected call_type TwoWay");
911  typedef Future<typename Traits::return_type> future_type;
912  wisp_call_token token = next_token();
913 
914  send(dest, param, token, TwoWayRequest, service);
915 
916  // Can't use make_shared here because of private constructor.
917  //return std::make_shared<future_type>(*this, token);
918  return typename future_type::shared_ptr(new future_type(*this, token));
919  }
920 
921 private:
922  template<CallType call_type> struct CallTag { };
923 
924  template<typename Traits, typename DestinationType>
925  typename Future<typename Traits::return_type>::shared_ptr
926  _call(const CallTag<OneWay> &,
927  const DestinationType & dest,
928  const typename Traits::parameter_type & param,
929  const Message::Service service = DefaultMessageServiceType)
930  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
931  {
932  typedef Future<typename Traits::return_type> future_type;
933  send<Traits>(dest, param, service);
934  return typename future_type::shared_ptr();
935  }
936 
937  template<typename Traits, typename DestinationType>
938  typename Future<typename Traits::return_type>::shared_ptr
939  _call(const CallTag<TwoWay> &,
940  const DestinationType & dest,
941  const typename Traits::parameter_type & param,
942  const Message::Service service = DefaultMessageServiceType)
943  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
944  {
945  return call<Traits>(dest, param, service);
946  }
947 
948 public:
949 
950  template<typename Traits, typename DestinationType>
951  void operator()(const Traits &,
952  const DestinationType & dest,
953  const wisp_call_token token,
954  const typename Traits::parameter_type & param,
955  const Message::Service service =
957  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
958  {
959  reply<Traits>(dest, token, param, service);
960  }
961 
962  template<typename Traits, typename DestinationType>
963  void operator()(const Traits &,
964  const DestinationType & dest,
965  typename Traits::return_type *ret,
966  const typename Traits::parameter_type & param,
967  const Message::Service service =
969  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
971  {
972  call<Traits>(dest, ret, param, service);
973  }
974 
975  template<typename Traits, typename DestinationType>
976  typename Future<typename Traits::return_type>::shared_ptr
977  operator()(const Traits &,
978  const DestinationType & dest,
979  const typename Traits::parameter_type & param,
980  const Message::Service service = DefaultMessageServiceType)
981  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
982  {
983  return _call<Traits>(CallTag<Traits::call_type>(), dest, param, service);
984  }
985 
986 
987  // Convenience functions that invoke parameter_type constructor for
988  // you. We permit up to WISP_CALLER_MAX_PARAMETERS - 1
989  // parameter_type arguments. Any more, and you should probably use
990  // a different approach.
991 
992  // Send.
993 
994  template<typename Traits, typename DestinationType>
995  void send(const DestinationType & dest,
996  const Message::Service service = DefaultMessageServiceType)
997  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
998  {
999  send<Traits>(dest, typename Traits::parameter_type(), service);
1000  }
1001 
1002  template<typename Traits, typename DestinationType, typename... P>
1003  void send(const Message::Service service,
1004  const DestinationType & dest,
1005  P && ...p)
1006  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1007  {
1008  send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...),
1009  service);
1010  }
1011 
1012  template<typename Traits, typename DestinationType, typename... P>
1013  void sendp(const DestinationType & dest, P && ...p)
1014 
1015  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1016  {
1017  send<Traits>(dest, typename Traits::parameter_type(std::forward<P>(p)...));
1018  }
1019 
1020  // Reply.
1021 
1022  template<typename Traits, typename DestinationType>
1023  void reply(const DestinationType & dest,
1024  const wisp_call_token token,
1025  const Message::Service service = DefaultMessageServiceType)
1026  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1027  {
1028  reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1029  }
1030 
1031  template<typename Traits, typename DestinationType>
1032  void operator()(const Traits &,
1033  const DestinationType & dest,
1034  const wisp_call_token token,
1035  const Message::Service service =
1037  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1038  {
1039  reply<Traits>(dest, token, typename Traits::parameter_type(), service);
1040  }
1041 
1042  template<typename Traits, typename DestinationType, typename... P>
1043  void reply(const Message::Service service,
1044  const DestinationType & dest,
1045  const wisp_call_token token,
1046  P && ...p)
1047  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1048  {
1049  reply<Traits>(dest, token,
1050  typename Traits::parameter_type(std::forward<P>(p)...),
1051  service);
1052  }
1053 
1054  template<typename Traits, typename DestinationType, typename... P>
1055  void replyp(const DestinationType & dest,
1056  const wisp_call_token token,
1057  P && ...p)
1058  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1059  {
1060  reply<Traits>(dest, token,
1061  typename Traits::parameter_type(std::forward<P>(p)...));
1062  }
1063 
1064  // Synchronous call.
1065  template<typename Traits, typename DestinationType>
1066  void call(const DestinationType & dest,
1067  typename Traits::return_type *ret,
1068  const Message::Service service = DefaultMessageServiceType)
1069  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1070  CallException)
1071  {
1072  call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1073  }
1074 
1075  template<typename Traits, typename DestinationType>
1076  void operator()(const Traits &,
1077  const DestinationType & dest,
1078  typename Traits::return_type *ret,
1079  const Message::Service service = DefaultMessageServiceType)
1080  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
1081  CallException)
1082  {
1083  call<Traits>(dest, ret, typename Traits::parameter_type(), service);
1084  }
1085 
1086  template<typename Traits, typename DestinationType, typename... P>
1087  void call(const Message::Service service,
1088  const DestinationType & dest,
1089  typename Traits::return_type *ret,
1090  P && ...p)
1091  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1092  {
1093  call<Traits>(dest, ret,
1094  typename Traits::parameter_type(std::forward<P>(p)...),
1095  service);
1096  }
1097 
1098  template<typename Traits, typename DestinationType, typename... P>
1099  void callp(const DestinationType & dest,
1100  typename Traits::return_type *ret,
1101  P && ...p)
1102  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1103  {
1104  call<Traits>(dest, ret,
1105  typename Traits::parameter_type(std::forward<P>(p)...));
1106  }
1107 
1108  // Asynchronous call.
1109  template<typename Traits, typename DestinationType>
1110  typename Future<typename Traits::return_type>::shared_ptr
1111  call(const DestinationType & dest,
1112  const Message::Service service = DefaultMessageServiceType)
1113  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1114  {
1115  return call<Traits>(dest, typename Traits::parameter_type(), service);
1116  }
1117 
1118  template<typename Traits, typename DestinationType, typename... P>
1119  typename Future<typename Traits::return_type>::shared_ptr
1120  call(const Message::Service service, const DestinationType & dest,
1121  P && ...p)
1122  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1123  {
1124  return call<Traits>(dest,
1125  typename Traits::parameter_type(std::forward<P>(p)...),
1126  service);
1127  }
1128 
1129  template<typename Traits, typename DestinationType, typename... P>
1130  typename Future<typename Traits::return_type>::shared_ptr
1131  callp(const DestinationType & dest, P && ...p)
1132  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1133  {
1134  return call<Traits>(dest,
1135  typename Traits::parameter_type(std::forward<P>(p)...));
1136  }
1137 
1138  // Dual-purpose operator
1139 
1140  template<typename Traits, typename DestinationType>
1141  typename Future<typename Traits::return_type>::shared_ptr
1142  operator()(const Traits &,
1143  const DestinationType & dest,
1144  const Message::Service service = DefaultMessageServiceType)
1145  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
1146  {
1147  return
1148  _call<Traits>(CallTag<Traits::call_type>(),
1149  dest, typename Traits::parameter_type(), service);
1150  }
1151 };
1152 
1154 
1155 #endif

Savarese Software Research Corporation
Copyright © 2006-2012 Savarese Software Research Corporation. All rights reserved.