Branch data Line data Source code
1 : : /* Copyright 2006-2013 Savarese Software Research Corporation.
2 : : *
3 : : * Licensed under the Apache License, Version 2.0 (the "License");
4 : : * you may not use this file except in compliance with the License.
5 : : * You may obtain a copy of the License at
6 : : *
7 : : * http://www.savarese.com/software/ApacheLicense-2.0
8 : : *
9 : : * Unless required by applicable law or agreed to in writing, software
10 : : * distributed under the License is distributed on an "AS IS" BASIS,
11 : : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : * See the License for the specific language governing permissions and
13 : : * limitations under the License.
14 : : */
15 : :
16 : : /**
17 : : * @file
18 : : * This header defines the ContinuationCaller class and Wisp communication
19 : : * protocol support classes.
20 : : */
21 : :
22 : : #ifndef __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
23 : : #define __SSRC_WISP_PROTOCOL_CONTINUATION_CALLER_H
24 : :
25 : : #include <ssrc/wisp/protocol/Caller.h>
26 : :
27 : : #include <functional>
28 : :
29 : : #include <boost/multi_index/member.hpp>
30 : : #include <boost/multi_index/ordered_index.hpp>
31 : :
32 : : __BEGIN_NS_SSRC_WISP_PROTOCOL
33 : :
34 : : struct Continuation {
35 : : // We save token as an optimization to avoid invoking a virtual function
36 : : // in the hash index.
37 : : wisp_call_token token;
38 : :
39 : : protected:
40 : 2 : explicit Continuation(wisp_call_token token) : token(token) { }
41 : :
42 : : public:
43 : 2 : virtual ~Continuation() = default;
44 : :
45 : : virtual void resume(MessageInfo & msginfo) = 0;
46 : : };
47 : :
48 : :
49 : : typedef std::shared_ptr<Continuation> continuation_ptr;
50 : 2 : const continuation_ptr NullContinuation = continuation_ptr();
51 : :
52 : : template<typename CallTraits_, typename Closure>
53 : : class FutureContinuation : public Continuation {
54 : : public:
55 : : typedef CallTraits_ call_traits;
56 : : typedef typename call_traits::future_ptr future_ptr;
57 : : typedef typename call_traits::return_type return_type;
58 : : // Closure should be void (const return_type &)
59 : : typedef Closure continuation_function;
60 : :
61 : : private:
62 : : future_ptr _future;
63 : : return_type _result;
64 : : continuation_function _continue;
65 : :
66 : : public:
67 : :
68 : 2 : FutureContinuation(const future_ptr & future,
69 : : continuation_function && continue_) :
70 : : Continuation(future->token()),
71 : : _future(future), _result(),
72 : 2 : _continue(std::forward<continuation_function>(continue_))
73 : 2 : { }
74 : :
75 : 2 : virtual ~FutureContinuation() = default;
76 : :
77 : 2 : virtual void resume(MessageInfo & msginfo) {
78 : 2 : _future->unpack(_result, msginfo);
79 : 2 : _continue(_result);
80 : 2 : }
81 : : };
82 : :
83 : : // Specialization for use with std::bind
84 : : template<typename CallTraits_>
85 : : class FutureContinuation<CallTraits_,
86 : : std::function<void (const typename CallTraits_::return_type &)> >:
87 : : public Continuation
88 : : {
89 : : public:
90 : : typedef CallTraits_ call_traits;
91 : : typedef typename call_traits::future_ptr future_ptr;
92 : : typedef typename call_traits::return_type return_type;
93 : : typedef std::function<void (const return_type &)> continuation_function;
94 : :
95 : : private:
96 : : future_ptr _future;
97 : : return_type _result;
98 : : continuation_function _continue;
99 : :
100 : : public:
101 : :
102 : : FutureContinuation(const future_ptr & future,
103 : : continuation_function && continue_) :
104 : : Continuation(future->token()),
105 : : _future(future), _result(),
106 : : _continue(std::forward<continuation_function>(continue_))
107 : : { }
108 : :
109 : : virtual ~FutureContinuation() = default;
110 : :
111 : : virtual void resume(MessageInfo & msginfo) {
112 : : _future->unpack(_result, msginfo);
113 : : _continue(_result);
114 : : }
115 : : };
116 : :
117 : : template<typename PT = BinaryPackingTraits>
118 : 6 : class ContinuationCaller : public protocol::Caller<PT> {
119 : : typedef protocol::Caller<PT> super;
120 : :
121 : : enum { ByHash, ByOrder };
122 : :
123 : : typedef boost::multi_index_container<
124 : : continuation_ptr,
125 : : boost::multi_index::indexed_by<
126 : : boost::multi_index::hashed_unique<
127 : : boost::multi_index::member<Continuation, wisp_call_token,
128 : : &Continuation::token> >,
129 : : boost::multi_index::ordered_non_unique<
130 : : boost::multi_index::member<Continuation, wisp_call_token,
131 : : &Continuation::token> >
132 : : > > continuation_map;
133 : :
134 : : typedef
135 : : typename continuation_map::template nth_index<ByHash>::type index_by_hash;
136 : : typedef
137 : : typename continuation_map::template nth_index<ByOrder>::type index_by_order;
138 : :
139 : : continuation_map _continuations;
140 : :
141 : : public:
142 : :
143 : : explicit
144 : 6 : ContinuationCaller(const std::string & connection = "",
145 : : const std::string & name = "",
146 : : const unsigned int message_capacity = Message::DefaultCapacity,
147 : : const bool group_membership = GroupMembershipDisable) :
148 : : super(connection, name, message_capacity, group_membership),
149 [ + - + - ]: 6 : _continuations()
150 : 6 : { }
151 : :
152 : 2 : unsigned int continuations_map_size() const {
153 : 2 : return _continuations.size();
154 : : }
155 : :
156 : : template<typename CallTraits_, typename Closure, typename DestinationType>
157 : : continuation_ptr
158 : 2 : split_call(Closure && continue_,
159 : : const DestinationType & dest,
160 : : const typename CallTraits_::parameter_type & param,
161 : : const Message::Service service = DefaultMessageServiceType)
162 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
163 : : {
164 [ + - + - ]: 2 : return std::make_shared<FutureContinuation<CallTraits_, Closure>>(super::template call<CallTraits_>(dest, param, service), std::forward<Closure>(continue_));
165 : : }
166 : :
167 : 2 : void schedule(const continuation_ptr & continuation) {
168 : 2 : _continuations.insert(continuation);
169 : 2 : }
170 : :
171 : : template<typename CallTraits_, typename Closure, typename DestinationType>
172 : 2 : void future_call(Closure && continue_,
173 : : const DestinationType & dest,
174 : : const typename CallTraits_::parameter_type & param,
175 : : const Message::Service service = DefaultMessageServiceType)
176 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
177 : : {
178 [ + - + - ]: 2 : schedule(split_call<CallTraits_>(std::forward<Closure>(continue_),
179 : : dest, param, service));
180 : 2 : }
181 : :
182 : : // Cancels all pending requests with min_token <= token < max_token
183 : 0 : void cancel_range(wisp_call_token min_token, wisp_call_token max_token) {
184 : 0 : index_by_order & index = _continuations.template get<ByOrder>();
185 : :
186 : : start:
187 [ # # ]: 0 : if(min_token < max_token)
188 : 0 : index.erase(index.lower_bound(min_token), index.upper_bound(max_token));
189 [ # # ]: 0 : else if(min_token > max_token) {
190 : 0 : index.erase(index.lower_bound(min_token), index.end());
191 : 0 : min_token = 0;
192 : 0 : goto start;
193 : : }
194 : 0 : }
195 : :
196 : 3 : void cancel_all() {
197 : 3 : _continuations.clear();
198 : 3 : }
199 : :
200 : 2 : continuation_ptr cancel(wisp_call_token token) {
201 [ + - ]: 2 : continuation_map::iterator it = _continuations.find(token);
202 : :
203 [ + - + - ]: 2 : if(it != _continuations.end()) {
204 [ + - ]: 4 : continuation_ptr result = *it;
205 [ + - ]: 2 : _continuations.erase(it);
206 : 2 : return result;
207 : : }
208 : :
209 : 0 : return NullContinuation;
210 : : }
211 : :
212 : 2 : bool resume(MessageInfo & msginfo) {
213 [ + - ]: 4 : continuation_ptr result = cancel(msginfo.token());
214 : :
215 [ + - ]: 2 : if(result != NullContinuation) {
216 [ + - ]: 2 : result->resume(msginfo);
217 : 2 : return true;
218 : : }
219 : :
220 : 0 : return false;
221 : : }
222 : :
223 : : template<typename CallTraits_, typename Closure, typename DestinationType>
224 : : void future_call(Closure && continue_,
225 : : const DestinationType & dest,
226 : : const Message::Service service = DefaultMessageServiceType)
227 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
228 : : {
229 : : future_call<CallTraits_>(std::forward<Closure>(continue_),
230 : : dest, typename CallTraits_::parameter_type(),
231 : : service);
232 : : }
233 : :
234 : : template<typename CallTraits_, typename Closure, typename DestinationType,
235 : : typename... P>
236 : 1 : void future_call(const Message::Service service,
237 : : Closure && continue_,
238 : : const DestinationType & dest, P && ...p)
239 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
240 : : {
241 [ + - ]: 1 : future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
242 : 1 : typename CallTraits_::parameter_type(std::forward<P>(p)...),
243 : : service);
244 : 1 : }
245 : :
246 : : template<typename CallTraits_, typename Closure, typename DestinationType,
247 : : typename... P>
248 : : void future_callp(Closure && continue_,
249 : : const DestinationType & dest, P && ...p)
250 : : SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
251 : : {
252 : : future_call<CallTraits_>(std::forward<Closure>(continue_), dest,
253 : : typename CallTraits_::parameter_type(std::forward<P>(p)...));
254 : : }
255 : :
256 : : };
257 : :
258 : : __END_NS_SSRC_WISP_PROTOCOL
259 : :
260 : : #endif
|