EventQueue.h
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2009 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * https://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00022 #ifndef __SSRC_WSPR_WS_EVENT_QUEUE_H 00023 #define __SSRC_WSPR_WS_EVENT_QUEUE_H 00024 00025 #include <boost/ptr_container/ptr_unordered_map.hpp> 00026 00027 #include <ssrc/wispers/ws/service.h> 00028 #include <ssrc/wispers/session/protocol.h> 00029 #include <ssrc/wispers/group_session/protocol.h> 00030 00031 #ifdef WSPR_DEBUG 00032 #include <ssrc/wispers/utility/PropertiesToString.h> 00033 #endif 00034 00035 __BEGIN_NS_SSRC_WSPR_WS 00036 00037 using NS_SSRC_WSPR_UTILITY::property_vector; 00038 using NS_SSRC_WSPR_UTILITY::StringTo; 00039 using NS_SSRC_WSPR_SESSION::SessionProtocol; 00040 using NS_SSRC_WSPR_GROUP_SESSION::GroupSessionProtocol; 00041 using NS_SSRC_WSPR_GROUP_SESSION::uid_container; 00042 using NS_SSRC_WSPR_SESSION::SessionProtocol; 00043 00044 struct EventQueueInitializer { 00045 // In seconds. 00046 unsigned int fetch_timeout; 00047 00048 EventQueueInitializer() : 00049 fetch_timeout(20) 00050 { } 00051 }; 00052 00053 class EventQueue : public WebService { 00054 typedef WebService super; 00055 friend class NS_SSRC_WISP_SERVICE::ServiceProtocolProcessor<packing_traits>; 00056 00057 WISP_IMPORT(SessionProtocol, MessageLoginSession); 00058 00059 struct EventEntry { 00060 //uid_type recipient; 00061 unsigned int channel; 00062 timeout_ptr timeout; 00063 properties_ptr events; 00064 property_vector & evec; 00065 00066 explicit EventEntry(const unsigned int channel = 1) : 00067 channel(channel), timeout(), events(new Properties), 00068 evec(events->create_property_vector("wspr", "event")) 00069 { } 00070 00071 property_vector::size_type size() const { 00072 return evec.size(); 00073 } 00074 00075 // Destructively merge. 00076 void add(const Properties && event) { 00077 evec.push_back(new Properties(event)); 00078 } 00079 00080 // Non-destructively copy. 00081 void add(const Properties & event) { 00082 evec.push_back(new Properties(event)); 00083 } 00084 00085 void remove(unsigned int confirm) { 00086 const property_vector::size_type sz = size(); 00087 00088 if(sz > 0 && confirm > 0) { 00089 if(confirm >= sz) { 00090 evec.clear(); 00091 } else { 00092 evec.erase(evec.begin(), evec.begin() + confirm); 00093 } 00094 } 00095 00096 #ifdef WSPR_DEBUG 00097 std::cerr << /*recipient <<*/ " Size: " << size() << std::endl; 00098 #endif 00099 } 00100 }; 00101 00102 typedef boost::ptr_unordered_map<uid_type, EventEntry> event_map; 00103 00104 TimeValue _fetch_timeout; 00105 event_map _events; 00106 StringTo _number_cast; 00107 00108 protected: 00109 WISP_IMPORT(SessionProtocol, MessageExpireSession); 00110 WISP_IMPORT(GroupSessionProtocol, MessageExpireGroupSession); 00111 00112 void reply(const string & sender, 00113 const MessageResponse & response, 00114 const NS_SSRC_WISP_PROTOCOL::wisp_call_token token) 00115 { 00116 try { 00117 #ifdef WSPR_DEBUG 00118 std::cerr << "MessageResponse:\n" 00119 << NS_SSRC_WSPR_UTILITY::to_string(*response.template_data) 00120 << std::endl; 00121 #endif 00122 _caller.reply<CallResponse>(sender, token, response); 00123 } catch(std::ios_base::failure &) { 00124 event_map::iterator && it = _events.find(response.session->uid); 00125 00126 #ifdef WSPR_DEBUG 00127 std::cerr << "Caught std::ios_base::failure" << std::endl; 00128 #endif 00129 00130 if(it != _events.end()) { 00131 it->second->remove(it->second->size()); 00132 } 00133 00134 // TODO: prepare an error response. 00135 _caller.reply<CallResponse>(sender, token, 00136 MessageResponse(response.session, 0)); 00137 } 00138 } 00139 00140 void complete_fetch(const string & sender, 00141 MessageResponse & response, 00142 const NS_SSRC_WISP_PROTOCOL::wisp_call_token token, 00143 const EventInfo &) 00144 { 00145 #ifdef WSPR_DEBUG 00146 std::cerr << "COMPLETE FETCH " << response.session->uid << std::endl; 00147 #endif 00148 event_map::iterator && it = _events.find(response.session->uid); 00149 00150 if(it != _events.end()) { 00151 response.template_data = it->second->events; 00152 00153 if(it->second->timeout) 00154 it->second->timeout.reset(); 00155 } 00156 00157 reply(sender, response, token); 00158 } 00159 00160 void fetch(const WebServiceCall & call, const MessageInfo & msginfo) { 00161 string channel_str; 00162 00163 MessageResponse response(call.session, 0); 00164 00165 #ifdef WSPR_DEBUG 00166 std::cerr << "START FETCH "; 00167 if(call.session) 00168 std::cerr << call.session->uid; 00169 std::cerr << std::endl; 00170 #endif 00171 00172 if(call.session) { 00173 unsigned int channel = 0; 00174 00175 if(call.get_parameter("channel", channel_str)) { 00176 try { 00177 channel = _number_cast.cast<int>(channel_str); 00178 } catch(const std::bad_cast & blc) { 00179 goto missing_channel; 00180 } 00181 00182 event_map::iterator && it = _events.find(call.session->uid); 00183 00184 if(it != _events.end()) { 00185 EventEntry & entry = *it->second; 00186 unsigned int confirm(0); 00187 timeout_ptr timeout(entry.timeout); 00188 00189 try { 00190 call.get_parameter("confirm", confirm); 00191 } catch(const std::bad_cast &) { 00192 confirm = 0; 00193 } 00194 00195 if(confirm > 0) { 00196 entry.remove(confirm); 00197 } 00198 00199 if(entry.size() > 0) { 00200 if(channel != entry.channel) { 00201 response.template_data.reset(new Properties); 00202 Properties & event = web_event_direct(*response.template_data, 00203 "wspr.Event", 00204 "ChangeChannel"); 00205 event.set(entry.channel, "channel"); 00206 event.set(1, "_no_confirmation"); 00207 } else 00208 response.template_data = entry.events; 00209 } 00210 00211 if(timeout) { 00212 #ifdef WSPR_DEBUG 00213 std::cerr << "FETCH TIMEOUT " << call.session->uid << std::endl; 00214 #endif 00215 timeout->execute(); 00216 //entry.timeout.reset(); 00217 } 00218 } else { 00219 // Even though we create a queue on login, keep this code 00220 // here because it's where we will add support for multiple 00221 // channels (TODO). 00222 std::pair<event_map::iterator,bool> && result = 00223 _events.insert(call.session->uid, new EventEntry); 00224 if(result.second) 00225 it = result.first; 00226 else 00227 response.set_error("could not create queue"); 00228 } 00229 00230 if(it != _events.end() && it->second->size() == 0) { 00231 EventEntry & entry = *it->second; 00232 #ifdef WSPR_DEBUG 00233 std::cerr << "SCHEDULING FETCH COMPLETION " 00234 << call.session->uid << std::endl; 00235 #endif 00236 if(channel != entry.channel) { 00237 response.template_data.reset(new Properties); 00238 Properties & event = web_event_direct(*response.template_data, 00239 "wspr.Event", 00240 "ChangeChannel"); 00241 if(channel == 0) { 00242 do { 00243 entry.channel++; 00244 } while(entry.channel == 0); 00245 } 00246 00247 event.set(entry.channel, "channel"); 00248 event.set(1, "_no_confirmation"); 00249 } else { 00250 entry.timeout = 00251 schedule_timeout(boost::bind(&EventQueue::complete_fetch, this, 00252 msginfo.sender(), response, 00253 msginfo.token(), _1), 00254 _fetch_timeout, EventLoop::Once); 00255 return; 00256 } 00257 } 00258 } else { 00259 missing_channel: 00260 response.set_error("missing channel"); 00261 } 00262 } 00263 00264 #ifdef WSPR_DEBUG 00265 std::cerr << "END FETCH " << call.session->uid << std::endl; 00266 #endif 00267 00268 reply(msginfo.sender(), response, msginfo.token()); 00269 } 00270 00271 void process_request(const MessageDeliverEvent & msg, const MessageInfo &) { 00272 unsigned int size = msg.keys.size(); 00273 event_map::iterator && end = _events.end(); 00274 00275 if(!msg.template_data) { 00276 #ifdef WSPR_DEBUG 00277 std::cerr << "RECEIVED NULL MessageDeliverEvent::template_data!!\n"; 00278 #endif 00279 return; 00280 } 00281 00282 while(size-- > 0) { 00283 event_map::iterator && it = _events.find(msg.keys[size]); 00284 00285 if(it != end) { 00286 EventEntry & entry = *it->second; 00287 00288 #ifdef WSPR_DEBUG 00289 std::cerr << "DELIVER_EVENT TO: " << msg.keys[size] << std::endl 00290 << "EVENT: " 00291 << NS_SSRC_WSPR_UTILITY::to_string(*msg.template_data) 00292 << std::endl; 00293 #endif 00294 entry.add(std::move(*msg.template_data)); 00295 00296 if(entry.timeout) { 00297 #ifdef WSPR_DEBUG 00298 std::cerr << "DELIVER_EVENT TIMEOUT " << msg.keys[size] << std::endl; 00299 #endif 00300 entry.timeout->execute(); 00301 } 00302 } 00303 } 00304 } 00305 00306 void process_request(const MessageDeliverEvents & msg, const MessageInfo &) { 00307 unsigned int size = msg.keys.size(); 00308 event_map::iterator && end = _events.end(); 00309 00310 if(msg.events.empty()) { 00311 #ifdef WSPR_DEBUG 00312 std::cerr << "RECEIVED empty MessageDeliverEvents::events!!\n"; 00313 #endif 00314 return; 00315 } 00316 00317 while(size-- > 0) { 00318 event_map::iterator && it = _events.find(msg.keys[size]); 00319 00320 if(it != end) { 00321 EventEntry & entry = *it->second; 00322 #ifdef WSPR_DEBUG 00323 std::cerr << "DELIVER_EVENTS TO: " << msg.keys[size] << std::endl; 00324 #endif 00325 00326 for(MessageDeliverEvents::event_container::const_iterator && event = 00327 msg.events.begin(), && event_end = msg.events.end(); 00328 event != event_end; ++event) 00329 { 00330 #ifdef WSPR_DEBUG 00331 std::cerr << "EVENT: " 00332 << NS_SSRC_WSPR_UTILITY::to_string(*event) 00333 << std::endl; 00334 #endif 00335 entry.add(std::move(*event)); 00336 } 00337 00338 if(entry.timeout) { 00339 #ifdef WSPR_DEBUG 00340 std::cerr << "DELIVER_EVENTS TIMEOUT " << msg.keys[size] << std::endl; 00341 #endif 00342 entry.timeout->execute(); 00343 } 00344 } 00345 } 00346 } 00347 00348 void process_request(const MessageExpireSession & msg, const MessageInfo &) { 00349 if(msg.sessions.size() > 0) { 00350 MessageExpireSession::session_container::const_iterator && it = 00351 msg.sessions.begin(); 00352 MessageExpireSession::session_container::const_iterator && end = 00353 msg.sessions.end(); 00354 properties_unique_ptr && eventp = 00355 web_event("wspr.Event", "ExpireSession"); 00356 Properties & event = *eventp; 00357 00358 while(it != end) { 00359 event_map::iterator && queue = _events.find(*it); 00360 00361 if(queue != _events.end()) { 00362 EventEntry & entry = *queue->second; 00363 00364 if(entry.timeout) { 00365 entry.add(event); 00366 entry.timeout->execute(); 00367 } 00368 00369 _events.erase(queue); 00370 00371 #ifdef WSPR_DEBUG 00372 std::cerr << "EXPIRED QUEUE " << *it << std::endl; 00373 #endif 00374 } 00375 00376 ++it; 00377 } 00378 } 00379 } 00380 00381 void process_request(const MessageExpireGroupSession & msg, 00382 const MessageInfo &) 00383 { 00384 properties_unique_ptr && eventp = 00385 web_event("wspr.GroupSession", "ExpireGroupSession"); 00386 Properties & event = *eventp; 00387 event_map::iterator && events_end = _events.end(); 00388 00389 for(GroupSessionProtocol::expiration_notification_container::const_iterator 00390 && it = msg.notifications.begin(), && end = msg.notifications.end(); 00391 it != end; ++it) 00392 { 00393 event.set(it->gsid, "gsid"); 00394 00395 for(uid_container::const_iterator && uid = it->uids.begin(), 00396 uid_end = it->uids.end(); 00397 uid != uid_end; ++uid) 00398 { 00399 event_map::iterator && queue = _events.find(*uid); 00400 00401 if(queue != events_end) 00402 queue->second->add(event); 00403 } 00404 } 00405 } 00406 00407 void process_request(const MessageLoginSession & msg, const MessageInfo &) { 00408 uid_type uid(msg.session.uid); 00409 event_map::iterator && it = _events.find(uid); 00410 if(it == _events.end()) { 00411 #if WSPR_DEBUG 00412 std::cerr << "Created queue on login: " << uid << std::endl; 00413 #endif 00414 _events.insert(uid, new EventEntry); 00415 } 00416 } 00417 00418 virtual void transition(State state) { 00419 switch(state) { 00420 case Starting: 00421 _caller.join(SessionProtocol::event_group_expire()); 00422 _caller.join(GroupSessionProtocol::event_group_expire()); 00423 _caller.join(SessionProtocol::event_group_login()); 00424 _caller.join(WebServiceProtocol::event_queue_ws_group()); 00425 state = Started; 00426 break; 00427 case Stopping: 00428 _caller.leave(WebServiceProtocol::event_queue_ws_group()); 00429 _caller.leave(SessionProtocol::event_group_login()); 00430 _caller.leave(GroupSessionProtocol::event_group_expire()); 00431 _caller.leave(SessionProtocol::event_group_expire()); 00432 state = Stopped; 00433 // break; 00434 case Stopped: 00435 for(event_map::iterator && it = _events.begin(), && end = _events.end(); 00436 it != end; ++it) 00437 { 00438 if(it->second->timeout) { 00439 it->second->timeout->execute(); 00440 //it->second->timeout.reset(); 00441 } 00442 } 00443 _events.clear(); 00444 break; 00445 default: 00446 break; 00447 } 00448 00449 super::transition(state); 00450 } 00451 00452 public: 00453 00454 explicit EventQueue(super::caller_type & caller, 00455 const EventQueueInitializer & initializer) : 00456 super(caller), _fetch_timeout(initializer.fetch_timeout, 0), _events(), 00457 _number_cast() 00458 { 00459 super::add_service_type(WebServiceProtocol::event_queue_ws_type()); 00460 00461 WISP_SERVICE_REQUEST(MessageDeliverEvent); 00462 WISP_SERVICE_REQUEST(MessageDeliverEvents); 00463 WISP_SERVICE_REQUEST(MessageExpireSession); 00464 WISP_SERVICE_REQUEST(MessageExpireGroupSession); 00465 WISP_SERVICE_REQUEST(MessageLoginSession); 00466 00467 WS_TWO_WAY_CALL("fetch", &EventQueue::fetch); 00468 } 00469 00470 virtual ~EventQueue() { } 00471 }; 00472 00473 __END_NS_SSRC_WSPR_WS 00474 00475 #endif
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.