21 #ifndef __SSRC_WISP_SERVICE_SERVICE_H
22 #define __SSRC_WISP_SERVICE_SERVICE_H
35 using NS_SSRC_SPREAD::MembershipInfo;
36 using NS_SSRC_SPREAD::Message;
37 using NS_SSRC_SPREAD::GroupList;
38 using NS_SSRC_WISP_PROTOCOL::MessageInfo;
40 using NS_SSRC_WISP_PROTOCOL::CallException;
46 #define WISP_SERVICE_REQUEST(MessageType) \
47 set_request_handler<MessageType>(*this)
48 #define WISP_SERVICE_RESPONSE(MessageType) \
49 set_response_handler<MessageType>(*this)
50 #define WISP_SERVICE_REQUEST_T(MessageType) \
51 this->template set_request_handler<MessageType>(*this)
52 #define WISP_SERVICE_RESPONSE_T(MessageType) \
53 this->template set_response_handler<MessageType>(*this)
55 #define WISP_SERVICE_REQUEST_BUFFERED(MessageType, msg) \
56 set_request_handler<MessageType>(*this, msg)
57 #define WISP_SERVICE_RESPONSE_BUFFERED(MessageType, msg) \
58 set_response_handler<MessageType>(*this, msg)
59 #define WISP_SERVICE_REQUEST_BUFFERED_T(MessageType, msg) \
60 this->template set_request_handler<MessageType>(*this, msg)
61 #define WISP_SERVICE_RESPONSE_BUFFERED_T(MessageType, msg) \
62 this->template set_response_handler<MessageType>(*this, msg)
78 protocol(protocol), id(id), handle_message(message_handler)
82 typedef boost::multi_index_container<
84 boost::multi_index::indexed_by<
85 boost::multi_index::hashed_unique<
86 boost::multi_index::composite_key<
109 _context(context), _once(once), _handler(handler)
139 typedef boost::multi_index_container<
141 boost::multi_index::indexed_by<
142 boost::multi_index::hashed_unique<
144 const TimeoutHandler *,
157 return _timeouts.size();
161 for(timeout_map::iterator it = _timeouts.begin(), end = _timeouts.end();
170 timeout_handler_ptr handler(make_smart_ptr<timeout_handler_ptr>(*
this, handler_fun, once));
171 _timeouts.insert(handler);
179 _timeouts.erase(timeout);
190 template<
typename PackingTraits = BinaryPackingTraits>
194 typedef typename protocol::ContinuationCaller<packing_traits>
caller_type;
226 const MembershipInfo & meminfo)
230 template<
typename MessageType,
typename Impl>
231 void request(Impl & impl, MessageType & msg, MessageInfo & msginfo)
232 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
235 _caller.unpack(msg, msginfo);
237 impl.process_request(msg, static_cast<const MessageInfo &>(msginfo));
240 template<
typename MessageType,
typename Impl>
241 void request(Impl & impl, MessageInfo & msginfo)
242 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
248 template<
typename MessageType,
typename Impl>
249 void respond(Impl & impl, MessageType & msg, MessageInfo & msginfo)
250 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
253 _caller.unpack(msg, msginfo);
255 impl.process_response(msg, static_cast<const MessageInfo &>(msginfo));
258 template<
typename MessageType,
typename Impl>
259 void respond(Impl & impl, MessageInfo & msginfo)
260 SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
267 return _request_handlers.insert(handler).second;
270 message_handler_map::size_type
273 _request_handlers.erase(_request_handlers.key_extractor()((handler)));
277 _request_handlers.clear();
280 template<
typename MessageType,
typename Impl>
283 &ServiceProtocolProcessor::template request<MessageType, Impl>;
286 std::bind(handler,
this, std::ref(impl), std::placeholders::_1)));
289 template<
typename MessageType,
typename Impl>
292 (Impl &, MessageType &, MessageInfo &) =
293 &ServiceProtocolProcessor::template request<MessageType, Impl>;
296 std::bind(handler,
this, std::ref(impl), std::ref(buffer), std::placeholders::_1)));
300 return _response_handlers.insert(handler).second;
303 message_handler_map::size_type
306 _response_handlers.erase(_response_handlers.key_extractor()((handler)));
310 _response_handlers.clear();
313 template<
typename MessageType,
typename Impl>
316 &ServiceProtocolProcessor::template respond<MessageType, Impl>;
319 std::bind(handler,
this, std::ref(impl), std::placeholders::_1))); }
321 template<
typename MessageType,
typename Impl>
324 (Impl &, MessageType &, MessageInfo &) =
325 &ServiceProtocolProcessor::template respond<MessageType, Impl>;
328 std::bind(handler,
this, std::ref(impl), std::ref(buffer), std::placeholders::_1)));
339 _request_handlers(), _response_handlers(), _caller(caller)
345 return _caller.name();
363 const MembershipInfo & meminfo)
369 message_handler_map::iterator it =
370 _request_handlers.find(std::make_tuple(msginfo.protocol(), msginfo.id()));
371 if(it != _request_handlers.end()) {
372 it->handle_message(msginfo);
381 if(!_caller.resume(msginfo)) {
382 message_handler_map::iterator it =
383 _response_handlers.find(std::make_tuple(msginfo.protocol(),
385 if(it != _response_handlers.end()) {
386 it->handle_message(msginfo);
400 return _context.
add_timeout(handler, timeout, once);
427 template<
typename PP>
434 caller_type & _caller;
435 protocol_processor _protocol;
438 MembershipInfo _membership_info;
440 std::vector<typename caller_type::jumbo_message_key_type> _jumbo_message_keys;
447 _protocol.clear_timeouts();
450 _caller.cancel_all();
456 _caller(caller), _protocol(caller),
459 _min_token(0), _max_token(0),
460 _jumbo_message_keys(4)
463 template<
typename Initializer>
465 const Initializer & initializer) :
466 _caller(caller), _protocol(caller, initializer),
469 _min_token(0), _max_token(0),
470 _jumbo_message_keys(4)
475 typename protocol_processor::State
state()
const {
476 return _protocol.state();
480 return _caller.mbox().descriptor();
485 if(
state() != protocol_processor::Stopped) {
486 _caller.receive(_message_info);
488 if(_message_info->message.is_regular() &&
489 _protocol.state() < protocol_processor::Stopping)
492 _protocol.response(*_message_info);
494 _protocol.request(*_message_info);
495 }
else if(_message_info->message.is_membership()) {
496 _message_info->message.get_membership_info(_membership_info);
497 _protocol.membership(*_message_info, _membership_info);
500 if(
state() == protocol_processor::Stopped)
503 }
catch(
const boost::archive::archive_exception & ae) {
506 std::cerr << _protocol.name()
507 <<
": Caught boost::archive::archive_exception: "
508 << ae.what() <<
"\nContinuing.";
510 }
catch(
const std::ios_base::failure & iof) {
513 std::cerr << _protocol.name() <<
": Caught std::ios_base::failure: "
514 << iof.what() <<
"\nContinuing.";
521 _caller.cancel_range(_min_token, _max_token);
522 _min_token = _max_token;
523 _max_token = _caller.call_token();
526 if(!_jumbo_message_keys.empty()) {
527 if(_caller.count_jumbo_messages() > 0) {
528 _caller.erase_jumbo_messages(_jumbo_message_keys.begin(),
529 _jumbo_message_keys.end());
531 _jumbo_message_keys.clear();
534 if(_caller.count_jumbo_messages() > 0) {
535 _caller.collect_jumbo_message_keys(_jumbo_message_keys);
544 _min_token = _max_token = _caller.call_token();
562 template<
typename EH>
571 event_handler _handler;
576 return _handler.protocol();
581 explicit Service(
const std::string & connection =
"",
582 const std::string &
name =
"",
583 const unsigned int message_capacity =
584 Message::DefaultCapacity) :
585 _caller(connection,
name, message_capacity,
586 protocol_processor::GroupMembership),
590 template<
typename Initializer>
591 explicit Service(
const Initializer & initializer,
592 const std::string & connection =
"",
593 const std::string &
name =
"",
594 const unsigned int message_capacity =
595 Message::DefaultCapacity) :
596 _caller(connection,
name, message_capacity,
597 protocol_processor::GroupMembership),
598 _handler(_caller, initializer)
601 const std::string &
name()
const {
602 return _caller.name();
605 typename protocol_processor::State
state() {
610 _handler.start(loop, call_timeout);