session/service.cc
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2009 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * https://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00017 #include <ssrc/wispers/session/service.h> 00018 #include <ssrc/wispers/utility/checkpoint.h> 00019 00020 00021 __BEGIN_NS_SSRC_WSPR_SESSION 00022 00028 void Session::process_request(const MessageInsert & msg, const MessageInfo &) { 00029 unsigned int i = msg.values.size(); 00030 00031 while(i-- > 0) { 00032 session_map::iterator && it = find<BySID>(msg.values[i].sid); 00033 00034 if(it != end<BySID>()) { 00035 const_cast<SessionData &>(*it).attributes = msg.values[i].attributes; 00036 touch_session(it); 00037 } 00038 } 00039 } 00040 00044 void Session::process_request(const MessageCreateSession & msg, 00045 const MessageInfo & msginfo) 00046 { 00047 session_map::iterator it(end<BySID>()); 00048 00049 if(!msg.sid.empty()) 00050 it = find<BySID>(msg.sid); 00051 00052 if(it == end<BySID>()) 00053 it = create_session().first; 00054 00055 _caller.reply<CallSingleQueryResult>(Message::FIFOSelfDiscard, 00056 msginfo.sender(), msginfo.token(), *it); 00057 } 00058 00062 void Session::process_request(const MessageGetSession & msg, 00063 const MessageInfo & msginfo) 00064 { 00065 MessageSingleQueryResult result; 00066 session_map::iterator && it = find<BySID>(msg.sid); 00067 00068 if(it != end<BySID>()) { 00069 result.found = true; 00070 result.result = *it; 00071 if(msg.update_access_time) 00072 touch_session(it); 00073 } 00074 00075 _caller.reply<CallSingleQueryResult>(msginfo.sender(), msginfo.token(), 00076 result, Message::FIFOSelfDiscard); 00077 } 00078 00082 void Session::process_request(MessageSetAttributes & msg, 00083 const MessageInfo &) 00084 { 00085 session_map::iterator && it = find<BySID>(msg.sid); 00086 00087 if(it != end<BySID>()) { 00088 const_cast<SessionData &>(*it).attributes->merge(std::move(msg.attributes)); 00089 touch_session(it); 00090 } 00091 } 00092 00096 void Session::update_session(const SessionData & update) { 00097 session_map::iterator && it = find<BySID>(update.sid); 00098 00099 if(it != end<BySID>()) { 00100 SessionData session = *it; 00101 get_index<BySID>().erase(it); 00102 session.uid = update.uid; 00103 session.attributes = update.attributes; 00104 _touch_session(session); 00105 00106 index_by_uid & index = get_index<ByUID>(); 00107 index_by_uid::iterator && uit = find<ByUID>(session.uid); 00108 00109 if(uit != end<ByUID>()) { 00110 // We don't need to send an expiration message because the next 00111 // client action requiring a session will clear the session. 00112 // Everything else (e.g., EventQueue) indexes on uid. 00113 index.erase(uit); 00114 } 00115 00116 insert<BySID>(session); 00117 } 00118 } 00119 00120 void Session::process_request(const MessageLogoutSession & msg, 00121 const MessageInfo &) 00122 { 00123 index_by_uid & index = get_index<ByUID>(); 00124 index_by_uid::iterator && it = find<ByUID>(msg.session.uid); 00125 00126 if(it != end<ByUID>()) { 00127 index.erase(it); 00128 MessageExpireSession expire; 00129 expire.sessions.push_back(msg.session.uid); 00130 _caller.send<CallExpireSession>(protocol_traits::event_group_expire(), 00131 expire); 00132 } 00133 } 00134 00138 void Session::load_checkpoint(const std::string & filename) 00139 SSRC_DECL_THROW(std::runtime_error, boost::archive::archive_exception) 00140 { 00141 NS_SSRC_WSPR_UTILITY::load_checkpoint(_checkpoint_path, super::_index); 00142 00143 for(index_by_sid::iterator it = begin<BySID>(); it != end<BySID>(); ++it) 00144 touch_session(it); 00145 } 00146 00150 void Session::transition(State state) { 00151 switch(state) { 00152 case Starting: 00153 _caller.join(protocol_traits::service_group()); 00154 _caller.join(protocol_traits::event_group_login()); 00155 _caller.join(protocol_traits::event_group_logout()); 00156 _poll_timeout = 00157 schedule_timeout(boost::bind(&Session::check_for_expirations, this), 00158 TimeValue(_poll_interval, 0)); 00159 state = Started; 00160 break; 00161 case Stopping: 00162 if(_poll_timeout) 00163 cancel_timeout(_poll_timeout); 00164 _caller.leave(protocol_traits::event_group_logout()); 00165 _caller.leave(protocol_traits::event_group_login()); 00166 _caller.leave(protocol_traits::service_group()); 00167 state = Stopped; 00168 //break; 00169 case Stopped: 00170 // TODO: remove this after we implement graceful restarts/upgrades. 00171 //expire_sessions(boost::integer_traits<idle_count_type>::const_max); 00172 NS_SSRC_WSPR_UTILITY::save_checkpoint(_checkpoint_path, super::_index); 00173 break; 00174 default: 00175 break; 00176 } 00177 00178 super::transition(state); 00179 } 00180 00184 Session::Session(super::caller_type & caller, 00185 const SessionInitializer & initializer) 00186 SSRC_DECL_THROW(std::runtime_error, boost::archive::archive_exception) : 00187 super(caller), _random_id(), 00188 _expirations_per_message(initializer.expirations_per_message), 00189 _poll_interval(initializer.poll_interval), 00190 _max_idle_time(initializer.max_idle_time), 00191 _low_count(0), _touch_session(_max_idle_time / _poll_interval), 00192 _poll_timeout(), _checkpoint_path(initializer.checkpoint_path) 00193 { 00194 add_service_type(protocol_traits::service_type()); 00195 00196 WISP_SERVICE_REQUEST(MessageCreateSession); 00197 WISP_SERVICE_REQUEST(MessageSetAttributes); 00198 WISP_SERVICE_REQUEST(MessageGetSession); 00199 WISP_SERVICE_REQUEST(MessageUpdateSession); 00200 WISP_SERVICE_REQUEST(MessageLoginSession); 00201 WISP_SERVICE_REQUEST(MessageLogoutSession); 00202 00203 load_checkpoint(_checkpoint_path); 00204 } 00205 00206 void Session::expire_sessions(const idle_count_type low_count) { 00207 index_by_access & index = get_index<ByLastAccess>(); 00208 index_by_access::iterator upper = index.upper_bound(low_count); 00209 index_by_access::iterator it = index.begin(); 00210 MessageExpireSession msg; 00211 unsigned int count = 0; 00212 00213 do { 00214 while(it != upper && count < _expirations_per_message) { 00215 msg.sessions.push_back(it->uid); 00216 it = index.erase(it); 00217 ++count; 00218 } 00219 00220 if(msg.sessions.size() > 0) { 00221 _caller.send<CallExpireSession>(protocol_traits::event_group_expire(), 00222 msg); 00223 msg.sessions.clear(); 00224 } 00225 } while(it != upper); 00226 } 00227 00231 string Session::create_sid() { 00232 sid_type sid(SessionIdNumChars, ' '); 00233 session_map::iterator && end_ = end<BySID>(); 00234 00235 do { 00236 _random_id(sid); 00237 // TODO: Protect against accidental infinite loop (e.g., if we fill up)! 00238 } while(find<BySID>(sid) != end_); 00239 00240 return sid; 00241 } 00242 00243 00244 00245 __END_NS_SSRC_WSPR_SESSION
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.