group_session/service.cc
Go to the documentation of this file.
00001 /* 00002 * Copyright 2006-2009 Savarese Software Research Corporation 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"); 00005 * you may not use this file except in compliance with the License. 00006 * You may obtain a copy of the License at 00007 * 00008 * https://www.savarese.com/software/ApacheLicense-2.0 00009 * 00010 * Unless required by applicable law or agreed to in writing, software 00011 * distributed under the License is distributed on an "AS IS" BASIS, 00012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00013 * See the License for the specific language governing permissions and 00014 * limitations under the License. 00015 */ 00016 00017 #include <ssrc/wispers/group_session/service.h> 00018 #include <ssrc/wispers/ws/protocol.h> 00019 00020 __BEGIN_NS_SSRC_WSPR_GROUP_SESSION 00021 00022 using NS_SSRC_WISP_PROTOCOL::GroupMembershipDisable; 00023 using NS_SSRC_WSPR_WS::WebServiceProtocol; 00024 00025 WISP_IMPORT(WebServiceProtocol, MessageDeliverEvent); 00026 WISP_IMPORT(WebServiceProtocol, MessageDeliverEvents); 00027 WISP_IMPORT(WebServiceProtocol, CallDeliverEvent); 00028 WISP_IMPORT(WebServiceProtocol, CallDeliverEvents); 00029 00030 const string GroupSessionService::EventGroupExpireGroupSession = 00031 GroupSessionProtocol::event_group_expire(); 00032 00033 /********************* 00034 * Support Functions * 00035 *********************/ 00036 00040 void GroupSessionService::transition(State state) { 00041 switch(state) { 00042 case Starting: 00043 _caller.join(SessionProtocol::event_group_expire()); 00044 _caller.join(SessionProtocol::event_group_login()); 00045 _caller.join(protocol_traits::service_group()); 00046 _caller.join(gs_protocol_group(_protocol_id)); 00047 _caller.join(gs_protocol_partition_group(_protocol_id, _partition_id)); 00048 check_for_expirations(TimeValue::now_seconds()); 00049 _gs_poll_timeout = 00050 schedule_timeout(boost::bind(&GroupSessionService::_check_for_expirations, this, _1), _gs_poll_interval); 00051 state = Started; 00052 break; 00053 case Stopping: 00054 if(_gs_poll_timeout) 00055 cancel_timeout(_gs_poll_timeout); 00056 _caller.leave(gs_protocol_partition_group(_protocol_id, _partition_id)); 00057 _caller.leave(gs_protocol_group(_protocol_id)); 00058 _caller.leave(protocol_traits::service_group()); 00059 _caller.leave(SessionProtocol::event_group_login()); 00060 _caller.leave(SessionProtocol::event_group_expire()); 00061 if(_caller.group_membership() == GroupMembershipDisable) 00062 state = Stopped; 00063 break; 00064 default: 00065 break; 00066 } 00067 00068 super::transition(state); 00069 } 00070 00071 void GroupSessionService::send_expirations(const gsid_container & gsids) { 00072 unsigned int id_count(0); 00073 GroupSessionProtocol::ExpirationNotification notification; 00074 MessageExpireGroupSession msg; 00075 unsigned int index = gsids.size(); 00076 00077 while(index-- > 0) { 00078 const unsigned int uid_count = 00079 _database->get_member_uids(notification.uids, gsids[index]); 00080 00081 if(uid_count > 0) { 00082 notification.gsid = gsids[index]; 00083 // count gsid 00084 ++id_count; 00085 00086 if(id_count + uid_count <= _ids_per_expiration_message) { 00087 msg.notifications.push_back(notification); 00088 id_count+=uid_count; 00089 notification.uids.clear(); 00090 } else { 00091 // TODO: unit test this section to verify it does what we want. 00092 GroupSessionProtocol::ExpirationNotification n; 00093 unsigned int copy_size, uid_offset = 0; 00094 n.gsid = notification.gsid; 00095 00096 do { 00097 copy_size = _ids_per_expiration_message - id_count; 00098 00099 if(copy_size > 0) { 00100 copy_size = std::min(copy_size, uid_count - uid_offset); 00101 n.uids.insert(n.uids.begin(), 00102 notification.uids.begin() + uid_offset, 00103 notification.uids.begin() + (uid_offset + copy_size)); 00104 msg.notifications.push_back(n); 00105 n.uids.clear(); 00106 uid_offset+=copy_size; 00107 id_count+=copy_size; 00108 } 00109 00110 if(id_count >= _ids_per_expiration_message) { 00111 _caller.send<CallExpireGroupSession>(EventGroupExpireGroupSession, 00112 msg); 00113 msg.notifications.clear(); 00114 id_count = (uid_offset < uid_count); 00115 } 00116 } while(uid_offset < uid_count); 00117 } 00118 } 00119 } 00120 00121 if(msg.notifications.size() > 0) 00122 _caller.send<CallExpireGroupSession>(EventGroupExpireGroupSession, msg); 00123 } 00124 00128 unsigned int GroupSessionService::check_for_expirations(const sec_type now) { 00129 unsigned int session_changes(0), reservation_changes(0); 00130 00131 try { 00132 gsid_container gsids; 00133 00134 if(_database->find_expired_gsids(gsids, now) > 0) { 00135 // We have to send the expirations first before the members get dropped. 00136 send_expirations(gsids); 00137 00138 DatabaseTransaction transaction(*_database); 00139 session_changes = _database->remove_expired_sessions(now); 00140 reservation_changes = _database->remove_expired_reservations(now); 00141 transaction.end(); 00142 } 00143 } catch(const DatabaseException & de) { 00144 // TODO: log and recover 00145 } 00146 00147 _session_count-=session_changes; 00148 _reservation_count-=reservation_changes; 00149 00150 return (session_changes + reservation_changes); 00151 } 00152 00153 unsigned int GroupSessionService::cancel_reservation(const gsid_type gsid, 00154 const uid_type requestor, 00155 const MessageInfo &) 00156 { 00157 // When we cancel a reservation, we have to remove the metadata, 00158 // but not when we activate it. When a group session is removed, 00159 // a trigger removes the metadata, but we can't do that for 00160 // reservations because activation removes the reservation. 00161 unsigned int changes = 0; 00162 00163 try { 00164 DatabaseTransaction transaction(*_database); 00165 changes = _database->remove_reservation(gsid, requestor); 00166 transaction.end(); 00167 } catch(const DatabaseException & de) { 00168 // TODO: log and recover. 00169 } 00170 00171 _reservation_count-=changes; 00172 00173 return changes; 00174 } 00175 00176 /******************** 00177 * Service Requests * 00178 ********************/ 00179 00183 void 00184 GroupSessionService::process_request(MessageCreateGroupSession & msg, 00185 const MessageInfo &) 00186 { 00187 try { 00188 unsigned int count = 0; 00189 const sec_type created(TimeValue::now_seconds()); 00190 const sec_type expires = created + msg.lifetime; 00191 GroupSession session(0, created, expires, msg.session_type, 00192 msg.session_group, msg.session_name, 00193 msg.max_observers); 00194 DatabaseTransaction transaction(*_database); 00195 00196 do { 00197 // TODO: genericize this for similar services and make the try limit 00198 // configurable. 00199 if(++count > 10) 00200 throw DatabaseException("GroupSession add_session failure"); 00201 session.gsid = generate_gsid(); 00202 } while(_database->reservation_ops.find(session.gsid).first || 00203 !_database->group_session_ops.insert(session)); 00204 00205 for(uid_container::const_iterator && it = msg.participants.begin(), 00206 && end = msg.participants.end(); it != end; ++it) 00207 { 00208 _database->participant_ops.insert_statement->execute(session.gsid, *it); 00209 } 00210 00211 transaction.end(); 00212 ++_session_count; 00213 start_group_session(session, msg.payload); 00214 } catch(const DatabaseException & de) { 00215 // TODO: log and recover. 00216 } 00217 } 00218 00222 void GroupSessionService::process_request(MessageCreateReservation & msg, 00223 const MessageInfo &) 00224 { 00225 try { 00226 unsigned int count = 0; 00227 const sec_type created(TimeValue::now_seconds()); 00228 const sec_type reservation_expires = created + msg.reservation_lifetime; 00229 Reservation reservation(0, created, reservation_expires, 00230 msg.session_type, msg.session_group, 00231 msg.session_name, msg.max_observers, 00232 msg.creator_uid, 00233 msg.gs_lifetime); 00234 DatabaseTransaction transaction(*_database); 00235 00236 do { 00237 // TODO: genericize this for similar services and make the try limit 00238 // configurable. 00239 if(++count > 100) 00240 throw DatabaseException("GroupSession add_reservation failure"); 00241 reservation.gsid = generate_gsid(); 00242 } while(_database->group_session_ops.find(reservation.gsid).first || 00243 !insert_reservation(reservation, msg.payload)); 00244 00245 _database->reservation_participant_ops.insert_statement->execute(reservation.gsid, msg.creator_uid); 00246 00247 transaction.end(); 00248 } catch(const DatabaseException & de) { 00249 // TODO: log and recover. 00250 recount_sessions(); 00251 } 00252 } 00253 00254 void 00255 GroupSessionService::process_request(const MessageFindGroupSessionsByType & msg, 00256 const MessageInfo & msginfo) 00257 { 00258 WISP_IMPORT(protocol_traits, query_list); 00259 00260 try { 00261 MessageReturnGroupSessionsByType result; 00262 00263 // TODO: Is it more efficient to create a dynamic query that does 00264 // WHERE type = ? OR type = ? OR type = ? ... 00265 // Obviously, that is more efficient for an non-embedded database. 00266 for(query_list::const_iterator it = msg.keys.begin(), end = msg.keys.end(); 00267 it != end; ++it) 00268 _database->get_sessions_by_type(result.sessions, *it); 00269 00270 _caller.reply<CallReturnGroupSessionsByType>(msginfo.sender(), 00271 msginfo.token(), result, 00272 Message::FIFOSelfDiscard); 00273 } catch(const DatabaseException & de) { 00274 // TODO: log and recover. 00275 } 00276 } 00277 00278 void GroupSessionService::process_request(const MessageFindGroupSessions & msg, 00279 const MessageInfo & msginfo) 00280 { 00281 try { 00282 MessageFindGroupSessionsResult result; 00283 result.requestor = msg.requestor; 00284 result.total_sessions = _session_count; 00285 result.limit = msg.limit; 00286 result.offset = msg.offset; 00287 result.gs_name = _gs_name; 00288 00289 _database->find_sessions(result.sessions, msg.limit, msg.offset); 00290 00291 _caller.send<CallFindGroupSessionsResult>(msginfo.sender(), result, 00292 Message::CausalSelfDiscard); 00293 } catch(const DatabaseException & de) { 00294 // TODO: log and recover. 00295 } 00296 } 00297 00298 void GroupSessionService::process_request(const MessageFindMembers & msg, 00299 const MessageInfo & msginfo) 00300 { 00301 try { 00302 MessageFindMembersResult result; 00303 00304 for(gsid_container::const_iterator it = msg.sessions.begin(), 00305 end = msg.sessions.end(); it != end; ++it) 00306 { 00307 _database->find_participants(result.participants, *it); 00308 _database->find_observers(result.observers, *it); 00309 } 00310 result.requestor = msg.requestor; 00311 _caller.send<CallFindMembersResult>(msginfo.sender(), result, 00312 Message::CausalSelfDiscard); 00313 } catch(const DatabaseException & de) { 00314 // TODO: log and recover. 00315 } 00316 } 00317 00318 void GroupSessionService::process_request(const MessageAddMembers & msg, 00319 const MessageInfo &) 00320 { 00321 if(!msg.participants.empty() || !msg.observers.empty()) { 00322 try { 00323 DatabaseTransaction transaction(*_database); 00324 if(!msg.participants.empty()) 00325 _database->participant_ops.insert(msg.participants.begin(), 00326 msg.participants.end()); 00327 if(!msg.observers.empty()) 00328 _database->observer_ops.insert(msg.observers.begin(), 00329 msg.observers.end()); 00330 transaction.end(); 00331 } catch(const DatabaseException & de) { 00332 // TODO: log and recover. 00333 } 00334 } 00335 } 00336 00337 void GroupSessionService::process_request(const MessageRemoveMembers & msg, 00338 const MessageInfo &) 00339 { 00340 if(!msg.participants.empty() || !msg.observers.empty()) { 00341 try { 00342 DatabaseTransaction transaction(*_database); 00343 if(!msg.participants.empty()) 00344 _database->participant_ops.erase_range(msg.participants.begin(), 00345 msg.participants.end()); 00346 if(!msg.observers.empty()) 00347 _database->observer_ops.erase_range(msg.observers.begin(), 00348 msg.observers.end()); 00349 transaction.end(); 00350 } catch(const DatabaseException & de) { 00351 // TODO: log and recover. 00352 } 00353 } 00354 } 00355 00356 void GroupSessionService::process_request(const MessageAddMember & msg, 00357 const MessageInfo & msginfo) 00358 { 00359 try { 00360 protocol_traits::AddMemberResultCode result_code = 00361 protocol_traits::AddMemberInternalError; 00362 GroupSessionDatabase::GroupSessionOperations::find_result_type 00363 find_result = _database->group_session_ops.find(msg.member.gsid); 00364 00365 if(find_result.first) { 00366 if(!_database->is_member(msg.member.uid, msg.member.gsid)) { 00367 bool success(false); 00368 00369 if(msg.member_type == protocol_traits::Observer) { 00370 const unsigned int max_observers = find_result.second.max_observers; 00371 00372 if(max_observers > 0 && 00373 _database->count_observers(msg.member.gsid) < max_observers) 00374 { 00375 success = _database->observer_ops.insert(msg.member); 00376 } else 00377 result_code = protocol_traits::AddMemberMaxObserversReached; 00378 } else { 00379 success = _database->participant_ops.insert(msg.member); 00380 } 00381 00382 if(success) 00383 result_code = protocol_traits::AddMemberSuccess; 00384 } else 00385 result_code = protocol_traits::AddMemberDuplicateEntry; 00386 } else 00387 result_code = protocol_traits::AddMemberNonexistentGroupSession; 00388 00389 _caller.sendp<CallAddMemberConfirm>(msginfo.sender(), 00390 result_code, 00391 msg.member_type, 00392 msg.member.uid, 00393 _gs_name, 00394 find_result.second); 00395 } catch(const DatabaseException & de) { 00396 // TODO: log and recover. 00397 //result_code = protocol_traits::AddMemberInternalError; 00398 } 00399 } 00400 00401 void GroupSessionService::process_request(const MessageRemoveMember & msg, 00402 const MessageInfo & msginfo) 00403 { 00404 try { 00405 unsigned int changes = 0; 00406 00407 switch(msg.member_type) { 00408 case GroupSessionProtocol::Participant: 00409 changes = _database->participant_ops.erase(msg.member); 00410 break; 00411 case GroupSessionProtocol::Observer: 00412 changes = _database->observer_ops.erase(msg.member); 00413 break; 00414 } 00415 00416 if(changes > 0) { 00417 GroupSessionDatabase::GroupSessionOperations::find_result_type 00418 find_result = _database->group_session_ops.find(msg.member.gsid); 00419 00420 if(find_result.first) { 00421 _caller. 00422 sendp<CallRemoveMemberConfirm>(msginfo.sender(), 00423 msg.member_type, msg.member, _gs_name, 00424 find_result.second); 00425 } 00426 } 00427 } catch(const DatabaseException & de) { 00428 // TODO: log and recover. 00429 } 00430 } 00431 00432 void GroupSessionService::process_request(const MessageFindGroupSessionsForMember & msg, 00433 const MessageInfo & msginfo) 00434 { 00435 try { 00436 MessageReturnGroupSessionsForMember result; 00437 result.uid = msg.uid; 00438 result.gs_name = _gs_name; 00439 _database->get_participant_sessions(result.participating, msg.uid); 00440 _database->get_observer_sessions(result.observing, msg.uid); 00441 _database->get_participant_reservations(result.awaiting, msg.uid); 00442 _caller.reply<CallReturnGroupSessionsForMember>(msginfo.sender(), 00443 msginfo.token(), result, 00444 Message::FIFOSelfDiscard); 00445 } catch(const DatabaseException & de) { 00446 // TODO: log and recover 00447 } 00448 } 00449 00450 void GroupSessionService::process_request(const MessageRelayEvent & msg, 00451 const MessageInfo &) 00452 { 00453 try { 00454 MessageDeliverEvent mde(0); 00455 _database->get_member_uids(mde.keys, msg.gsid); 00456 00457 if(mde.keys.size() > 0) { 00458 mde.template_data = msg.event; 00459 _caller.send<CallDeliverEvent>(msg.event_queue, mde); 00460 } 00461 } catch(const DatabaseException & de) { 00462 // TODO: log and recover 00463 } 00464 } 00465 00466 void 00467 GroupSessionService::process_request(const MessageRelayEventSelfDiscard & msg, 00468 const MessageInfo &) 00469 { 00470 try { 00471 MessageDeliverEvent mde(0); 00472 _database->get_member_uids_discard(mde.keys, msg.gsid, msg.source); 00473 00474 if(!mde.keys.empty()) { 00475 mde.template_data = msg.event; 00476 _caller.send<CallDeliverEvent>(msg.event_queue, mde); 00477 } 00478 } catch(const DatabaseException & de) { 00479 // TODO: log and recover 00480 } 00481 } 00482 00483 void GroupSessionService::process_request(MessageRelayEvents & msg, 00484 const MessageInfo &) 00485 { 00486 try { 00487 MessageDeliverEvents mde(std::move(msg.events)); 00488 _database->get_member_uids(mde.keys, msg.gsid); 00489 00490 if(mde.keys.size() > 0) { 00491 _caller.send<CallDeliverEvents>(msg.event_queue, mde); 00492 } 00493 } catch(const DatabaseException & de) { 00494 // TODO: log and recover 00495 } 00496 } 00497 00498 void 00499 GroupSessionService::process_request(MessageRelayEventsSelfDiscard & msg, 00500 const MessageInfo &) 00501 { 00502 try { 00503 MessageDeliverEvents mde(std::move(msg.events)); 00504 _database->get_member_uids_discard(mde.keys, msg.gsid, msg.source); 00505 00506 if(!mde.keys.empty()) { 00507 _caller.send<CallDeliverEvents>(msg.event_queue, mde); 00508 } 00509 } catch(const DatabaseException & de) { 00510 // TODO: log and recover 00511 } 00512 } 00513 00514 void GroupSessionService::process_request(const MessageExpireSession & msg, 00515 const MessageInfo &) 00516 { 00517 using NS_SSRC_WSPR_WS::web_event_indirect; 00518 using NS_SSRC_WSPR_UTILITY::Properties; 00519 using NS_SSRC_WSPR_UTILITY::primitive_property_vector; 00520 00521 if(!msg.sessions.empty()) { 00522 try { 00523 // TODO: optimize by reducing number of queries, messages, and 00524 // memory operations. 00525 // for example, if we add a DeliverEvents message to EventQueue 00526 // we can batch the delivery of multiple events to different 00527 // sets of uids. 00528 // Two things are wrong with this implementation: 00529 // 1. It's a brute force approach 00530 // 2. It has direct knowledge of the event queue, which we normally 00531 // place at the Web service level. 00532 // Brute force approach: 00533 // 1. For each uid, get all members of sessions where uid is 00534 // a participant or observer, using as keys for a DeliverEvent 00535 // message. If keys is non-zero in size, add a web_event called 00536 // ExpireMember (with protocol type "wspr.GroupSession" (don't 00537 // hard code that if avoidable)) containing the expired session uid. 00538 for(MessageExpireSession::session_container::const_iterator 00539 && it = msg.sessions.begin(), && end = msg.sessions.end(); 00540 it != end; ++it) 00541 { 00542 MessageDeliverEvent mde; 00543 00544 _database->find_members_for_uid(mde.keys, *it); 00545 00546 if(!mde.keys.empty()) { 00547 Properties & event = web_event_indirect(*mde.template_data, 00548 "wspr.GroupSession", 00549 "ExpireMember"); 00550 event.set(*it, "uid"); 00551 _caller.send<CallDeliverEvent>(WebServiceProtocol::event_queue_ws_group(), mde); 00552 } 00553 } 00554 { 00555 DatabaseTransaction transaction(*_database); 00556 _database->remove_observers_by_uid(msg.sessions.begin(), 00557 msg.sessions.end()); 00558 transaction.end(); 00559 } 00560 } catch(const DatabaseException & de) { 00561 // TODO: log and recover 00562 } 00563 } 00564 } 00565 00566 void GroupSessionService::process_request(const MessageLoginSession & msg, 00567 const MessageInfo &) 00568 { 00569 using NS_SSRC_WSPR_WS::web_event_indirect; 00570 using NS_SSRC_WSPR_UTILITY::Properties; 00571 00572 const uid_type uid(msg.session.uid); 00573 00574 try { 00575 // TODO: optimize 00576 // Two things are wrong with this implementation: 00577 // 1. It's a brute force approach (in the sense that all group session 00578 // services do a lookup on login) 00579 // 2. It has direct knowledge of the event queue, which we normally 00580 // place at the Web service level. 00581 MessageDeliverEvent mde; 00582 _database->find_members_for_participant(mde.keys, uid); 00583 00584 if(!mde.keys.empty()) { 00585 Properties & event = web_event_indirect(*mde.template_data, 00586 "wspr.GroupSession", 00587 "LoginParticipant"); 00588 event.set(uid, "uid"); 00589 _caller.send<CallDeliverEvent>(WebServiceProtocol::event_queue_ws_group(), mde); 00590 } 00591 } catch(const DatabaseException & de) { 00592 // TODO: log and recover and recover 00593 } 00594 } 00595 00596 /**************************** 00597 * Constructors/Destructors * 00598 ****************************/ 00599 00600 00606 GroupSessionService::GroupSessionService(super::caller_type & caller, 00607 const wisp_message_protocol protocol, 00608 GroupSessionDatabase *db, 00609 const GroupSessionInitializer & initializer) 00610 SSRC_DECL_THROW(DatabaseException, std::invalid_argument) : 00611 super(caller), 00612 _protocol_id(protocol), 00613 _partition_id(initializer.partition_id), 00614 _num_partitions(initializer.num_partitions), 00615 _ids_per_expiration_message(initializer.ids_per_expiration_message), 00616 _database(db), 00617 _gsid_min(gsid_min(_protocol_id, _partition_id, _num_partitions)), 00618 _gsid_max(gsid_max(_protocol_id, _partition_id, _num_partitions)), 00619 _random(_gsid_min, _gsid_max), 00620 _gs_poll_interval(initializer.gs_poll_interval, 0), 00621 _gs_poll_timeout(), 00622 _session_count(_database->count_sessions()), 00623 _reservation_count(_database->count_reservations()), 00624 _gs_name(initializer.gs_name) 00625 { 00626 if(_partition_id >= _num_partitions) 00627 SSRC_DECL_THROW(std::invalid_argument("invalid group session partition")); 00628 00629 add_service_type(protocol_traits::service_type()); 00630 00631 WISP_SERVICE_REQUEST(MessageCreateGroupSession); 00632 WISP_SERVICE_REQUEST(MessageEndGroupSession); 00633 WISP_SERVICE_REQUEST(MessageRelayEvent); 00634 WISP_SERVICE_REQUEST(MessageRelayEventSelfDiscard); 00635 WISP_SERVICE_REQUEST(MessageRelayEvents); 00636 WISP_SERVICE_REQUEST(MessageRelayEventsSelfDiscard); 00637 WISP_SERVICE_REQUEST(MessageFindGroupSessions); 00638 WISP_SERVICE_REQUEST(MessageFindGroupSessionsForMember); 00639 WISP_SERVICE_REQUEST(MessageFindGroupSessionsByType); 00640 WISP_SERVICE_REQUEST(MessageCreateReservation); 00641 WISP_SERVICE_REQUEST(MessageCancelReservation); 00642 WISP_SERVICE_REQUEST(MessageFindMembers); 00643 WISP_SERVICE_REQUEST(MessageAddMembers); 00644 WISP_SERVICE_REQUEST(MessageRemoveMembers); 00645 WISP_SERVICE_REQUEST(MessageAddMember); 00646 WISP_SERVICE_REQUEST(MessageRemoveMember); 00647 WISP_SERVICE_REQUEST(MessageExpireSession); 00648 WISP_SERVICE_REQUEST(MessageLoginSession); 00649 } 00650 00651 __END_NS_SSRC_WSPR_GROUP_SESSION
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.