Savarese Software Research Corporation
EventQueue.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2009 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     https://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00022 #ifndef __SSRC_WSPR_WS_EVENT_QUEUE_H
00023 #define __SSRC_WSPR_WS_EVENT_QUEUE_H
00024 
00025 #include <boost/ptr_container/ptr_unordered_map.hpp>
00026 
00027 #include <ssrc/wispers/ws/service.h>
00028 #include <ssrc/wispers/session/protocol.h>
00029 #include <ssrc/wispers/group_session/protocol.h>
00030 
00031 #ifdef WSPR_DEBUG
00032 #include <ssrc/wispers/utility/PropertiesToString.h>
00033 #endif
00034 
00035 __BEGIN_NS_SSRC_WSPR_WS
00036 
00037 using NS_SSRC_WSPR_UTILITY::property_vector;
00038 using NS_SSRC_WSPR_UTILITY::StringTo;
00039 using NS_SSRC_WSPR_SESSION::SessionProtocol;
00040 using NS_SSRC_WSPR_GROUP_SESSION::GroupSessionProtocol;
00041 using NS_SSRC_WSPR_GROUP_SESSION::uid_container;
00042 using NS_SSRC_WSPR_SESSION::SessionProtocol;
00043 
00044 struct EventQueueInitializer {
00045   // In seconds.
00046   unsigned int fetch_timeout;
00047 
00048   EventQueueInitializer() :
00049     fetch_timeout(20)
00050   { }
00051 };
00052 
00053 class EventQueue : public WebService {
00054   typedef WebService super;
00055   friend class NS_SSRC_WISP_SERVICE::ServiceProtocolProcessor<packing_traits>;
00056 
00057   WISP_IMPORT(SessionProtocol, MessageLoginSession);
00058 
00059   struct EventEntry {
00060     //uid_type recipient;
00061     unsigned int channel;
00062     timeout_ptr timeout;
00063     properties_ptr events;
00064     property_vector & evec;
00065 
00066     explicit EventEntry(const unsigned int channel = 1) :
00067       channel(channel), timeout(), events(new Properties),
00068       evec(events->create_property_vector("wspr", "event"))
00069     { }
00070 
00071     property_vector::size_type size() const {
00072       return evec.size();
00073     }
00074 
00075     // Destructively merge.
00076     void add(const Properties && event) {
00077       evec.push_back(new Properties(event));
00078     }
00079 
00080     // Non-destructively copy.
00081     void add(const Properties & event) {
00082       evec.push_back(new Properties(event));
00083     }
00084 
00085     void remove(unsigned int confirm) {
00086       const property_vector::size_type sz = size();
00087 
00088       if(sz > 0 && confirm > 0) {
00089         if(confirm >= sz) {
00090           evec.clear();
00091         } else {
00092           evec.erase(evec.begin(), evec.begin() + confirm);
00093         }
00094       }
00095 
00096 #ifdef WSPR_DEBUG
00097       std::cerr << /*recipient <<*/ " Size: " << size() << std::endl;
00098 #endif
00099     }
00100   };
00101 
00102   typedef boost::ptr_unordered_map<uid_type, EventEntry> event_map;
00103 
00104   TimeValue _fetch_timeout;
00105   event_map _events;
00106   StringTo _number_cast;
00107   
00108 protected:
00109   WISP_IMPORT(SessionProtocol, MessageExpireSession);
00110   WISP_IMPORT(GroupSessionProtocol, MessageExpireGroupSession);
00111 
00112   void reply(const string & sender,
00113              const MessageResponse & response,
00114              const NS_SSRC_WISP_PROTOCOL::wisp_call_token token)
00115   {
00116     try {
00117 #ifdef WSPR_DEBUG
00118       std::cerr << "MessageResponse:\n"
00119                 << NS_SSRC_WSPR_UTILITY::to_string(*response.template_data)
00120                 << std::endl;
00121 #endif
00122       _caller.reply<CallResponse>(sender, token, response);
00123     } catch(std::ios_base::failure &) {
00124       event_map::iterator && it = _events.find(response.session->uid);
00125 
00126 #ifdef WSPR_DEBUG
00127       std::cerr << "Caught std::ios_base::failure" << std::endl;
00128 #endif
00129 
00130       if(it != _events.end()) {
00131         it->second->remove(it->second->size());
00132       }
00133 
00134       // TODO: prepare an error response.
00135       _caller.reply<CallResponse>(sender, token,
00136                                   MessageResponse(response.session, 0));
00137     }
00138   }
00139 
00140   void complete_fetch(const string & sender,
00141                       MessageResponse & response,
00142                       const NS_SSRC_WISP_PROTOCOL::wisp_call_token token,
00143                       const EventInfo &)
00144   {
00145 #ifdef WSPR_DEBUG
00146     std::cerr << "COMPLETE FETCH " << response.session->uid << std::endl;
00147 #endif
00148     event_map::iterator && it = _events.find(response.session->uid);
00149 
00150     if(it != _events.end()) {
00151       response.template_data = it->second->events;
00152 
00153       if(it->second->timeout)
00154         it->second->timeout.reset();
00155     }
00156 
00157     reply(sender, response, token);
00158   }
00159 
00160   void fetch(const WebServiceCall & call, const MessageInfo & msginfo) {
00161     string channel_str;
00162 
00163     MessageResponse response(call.session, 0);
00164 
00165 #ifdef WSPR_DEBUG
00166     std::cerr << "START FETCH ";
00167     if(call.session)
00168       std::cerr << call.session->uid;
00169     std::cerr << std::endl;
00170 #endif
00171 
00172     if(call.session) {
00173       unsigned int channel = 0;
00174 
00175       if(call.get_parameter("channel", channel_str)) {
00176         try {
00177           channel = _number_cast.cast<int>(channel_str);
00178         } catch(const std::bad_cast & blc) {
00179           goto missing_channel;
00180         }
00181 
00182         event_map::iterator && it = _events.find(call.session->uid);
00183 
00184         if(it != _events.end()) {
00185           EventEntry & entry = *it->second;
00186           unsigned int confirm(0);
00187           timeout_ptr timeout(entry.timeout);
00188 
00189           try {
00190             call.get_parameter("confirm", confirm);
00191           } catch(const std::bad_cast &) {
00192             confirm = 0;
00193           }
00194 
00195           if(confirm > 0) {
00196             entry.remove(confirm);
00197           }
00198 
00199           if(entry.size() > 0) {
00200             if(channel != entry.channel) {
00201               response.template_data.reset(new Properties);
00202               Properties & event = web_event_direct(*response.template_data,
00203                                                      "wspr.Event",
00204                                                      "ChangeChannel");
00205               event.set(entry.channel, "channel");
00206               event.set(1, "_no_confirmation");
00207             } else
00208               response.template_data = entry.events;
00209           }
00210 
00211           if(timeout) {
00212 #ifdef WSPR_DEBUG
00213             std::cerr << "FETCH TIMEOUT " << call.session->uid << std::endl;
00214 #endif
00215             timeout->execute();
00216             //entry.timeout.reset();
00217           }
00218         } else {
00219           // Even though we create a queue on login, keep this code
00220           // here because it's where we will add support for multiple
00221           // channels (TODO).
00222           std::pair<event_map::iterator,bool> && result =
00223             _events.insert(call.session->uid, new EventEntry);
00224           if(result.second)
00225             it = result.first;
00226           else
00227             response.set_error("could not create queue");
00228         }
00229 
00230         if(it != _events.end() && it->second->size() == 0) {
00231           EventEntry & entry = *it->second;
00232 #ifdef WSPR_DEBUG
00233           std::cerr << "SCHEDULING FETCH COMPLETION "
00234                     << call.session->uid << std::endl;
00235 #endif
00236           if(channel != entry.channel) {
00237             response.template_data.reset(new Properties);
00238             Properties & event = web_event_direct(*response.template_data,
00239                                                    "wspr.Event",
00240                                                    "ChangeChannel");
00241             if(channel == 0) {
00242               do {
00243                 entry.channel++;
00244               } while(entry.channel == 0);
00245             }
00246 
00247             event.set(entry.channel, "channel");
00248             event.set(1, "_no_confirmation");
00249           } else {
00250             entry.timeout =
00251               schedule_timeout(boost::bind(&EventQueue::complete_fetch, this,
00252                                            msginfo.sender(), response,
00253                                            msginfo.token(), _1),
00254                                _fetch_timeout, EventLoop::Once);
00255             return;
00256           }
00257         }
00258       } else {
00259       missing_channel:
00260         response.set_error("missing channel");
00261       }
00262     }
00263 
00264 #ifdef WSPR_DEBUG
00265     std::cerr << "END FETCH " << call.session->uid << std::endl;
00266 #endif
00267 
00268     reply(msginfo.sender(), response, msginfo.token());
00269   }
00270 
00271   void process_request(const MessageDeliverEvent & msg, const MessageInfo &) {
00272     unsigned int size = msg.keys.size();
00273     event_map::iterator && end = _events.end();
00274 
00275     if(!msg.template_data) {
00276 #ifdef WSPR_DEBUG
00277       std::cerr << "RECEIVED NULL MessageDeliverEvent::template_data!!\n";
00278 #endif
00279       return;
00280     }
00281 
00282     while(size-- > 0) {
00283       event_map::iterator && it = _events.find(msg.keys[size]);
00284 
00285       if(it != end) {
00286         EventEntry & entry = *it->second;
00287 
00288 #ifdef WSPR_DEBUG
00289         std::cerr << "DELIVER_EVENT TO: " << msg.keys[size] << std::endl
00290                   << "EVENT: "
00291                   << NS_SSRC_WSPR_UTILITY::to_string(*msg.template_data)
00292                   << std::endl;
00293 #endif
00294         entry.add(std::move(*msg.template_data));
00295 
00296         if(entry.timeout) {
00297 #ifdef WSPR_DEBUG
00298           std::cerr << "DELIVER_EVENT TIMEOUT " << msg.keys[size] << std::endl;
00299 #endif
00300           entry.timeout->execute();
00301         }
00302       }
00303     }
00304   }
00305 
00306   void process_request(const MessageDeliverEvents & msg, const MessageInfo &) {
00307     unsigned int size = msg.keys.size();
00308     event_map::iterator && end = _events.end();
00309 
00310     if(msg.events.empty()) {
00311 #ifdef WSPR_DEBUG
00312       std::cerr << "RECEIVED empty MessageDeliverEvents::events!!\n";
00313 #endif
00314       return;
00315     }
00316 
00317     while(size-- > 0) {
00318       event_map::iterator && it = _events.find(msg.keys[size]);
00319 
00320       if(it != end) {
00321         EventEntry & entry = *it->second;
00322 #ifdef WSPR_DEBUG
00323         std::cerr << "DELIVER_EVENTS TO: " << msg.keys[size] << std::endl;
00324 #endif
00325 
00326         for(MessageDeliverEvents::event_container::const_iterator && event =
00327               msg.events.begin(), && event_end = msg.events.end();
00328             event != event_end; ++event)
00329         {
00330 #ifdef WSPR_DEBUG
00331           std::cerr << "EVENT: "
00332                     << NS_SSRC_WSPR_UTILITY::to_string(*event)
00333                     << std::endl;
00334 #endif
00335           entry.add(std::move(*event));
00336         }
00337 
00338         if(entry.timeout) {
00339 #ifdef WSPR_DEBUG
00340           std::cerr << "DELIVER_EVENTS TIMEOUT " << msg.keys[size] << std::endl;
00341 #endif
00342           entry.timeout->execute();
00343         }
00344       }
00345     }
00346   }
00347 
00348   void process_request(const MessageExpireSession & msg, const MessageInfo &) {
00349     if(msg.sessions.size() > 0) {
00350       MessageExpireSession::session_container::const_iterator && it =
00351         msg.sessions.begin();
00352       MessageExpireSession::session_container::const_iterator && end =
00353         msg.sessions.end();
00354       properties_unique_ptr && eventp =
00355         web_event("wspr.Event", "ExpireSession");
00356       Properties & event = *eventp;
00357 
00358       while(it != end) {
00359         event_map::iterator && queue = _events.find(*it);
00360 
00361         if(queue != _events.end()) {
00362           EventEntry & entry = *queue->second;
00363 
00364           if(entry.timeout) {
00365             entry.add(event);
00366             entry.timeout->execute();
00367           }
00368 
00369           _events.erase(queue);
00370 
00371 #ifdef WSPR_DEBUG
00372           std::cerr <<  "EXPIRED QUEUE " << *it << std::endl;
00373 #endif
00374         }
00375 
00376         ++it;
00377       }
00378     }
00379   }
00380 
00381   void process_request(const MessageExpireGroupSession & msg,
00382                        const MessageInfo &)
00383   {
00384     properties_unique_ptr && eventp =
00385       web_event("wspr.GroupSession", "ExpireGroupSession");
00386     Properties & event = *eventp;
00387     event_map::iterator && events_end = _events.end();
00388 
00389     for(GroupSessionProtocol::expiration_notification_container::const_iterator
00390           && it = msg.notifications.begin(), && end = msg.notifications.end();
00391         it != end; ++it)
00392     {
00393       event.set(it->gsid, "gsid");
00394 
00395       for(uid_container::const_iterator && uid = it->uids.begin(),
00396             uid_end = it->uids.end();
00397           uid != uid_end; ++uid)
00398       {
00399         event_map::iterator && queue = _events.find(*uid);
00400 
00401         if(queue != events_end)
00402           queue->second->add(event);
00403       }
00404     }
00405   }
00406 
00407   void process_request(const MessageLoginSession & msg, const MessageInfo &) {
00408     uid_type uid(msg.session.uid);
00409     event_map::iterator && it = _events.find(uid);
00410     if(it == _events.end()) {
00411 #if WSPR_DEBUG
00412       std::cerr << "Created queue on login: " << uid << std::endl;
00413 #endif
00414       _events.insert(uid, new EventEntry);
00415     }
00416   }
00417 
00418   virtual void transition(State state) {
00419     switch(state) {
00420     case Starting:
00421       _caller.join(SessionProtocol::event_group_expire());
00422       _caller.join(GroupSessionProtocol::event_group_expire());
00423       _caller.join(SessionProtocol::event_group_login());
00424       _caller.join(WebServiceProtocol::event_queue_ws_group());
00425       state = Started;
00426       break;
00427     case Stopping:
00428       _caller.leave(WebServiceProtocol::event_queue_ws_group());
00429       _caller.leave(SessionProtocol::event_group_login());
00430       _caller.leave(GroupSessionProtocol::event_group_expire());
00431       _caller.leave(SessionProtocol::event_group_expire());
00432       state = Stopped;
00433       // break;
00434     case Stopped:
00435       for(event_map::iterator && it = _events.begin(), && end = _events.end();
00436           it != end; ++it)
00437         {
00438           if(it->second->timeout) {
00439             it->second->timeout->execute();
00440             //it->second->timeout.reset();
00441           }
00442         }
00443       _events.clear();
00444       break;
00445     default:
00446       break;
00447     }
00448 
00449     super::transition(state);
00450   }
00451 
00452 public:
00453 
00454   explicit EventQueue(super::caller_type & caller,
00455                       const EventQueueInitializer & initializer) :
00456     super(caller), _fetch_timeout(initializer.fetch_timeout, 0), _events(),
00457     _number_cast()
00458   {
00459     super::add_service_type(WebServiceProtocol::event_queue_ws_type());
00460 
00461     WISP_SERVICE_REQUEST(MessageDeliverEvent);
00462     WISP_SERVICE_REQUEST(MessageDeliverEvents);
00463     WISP_SERVICE_REQUEST(MessageExpireSession);
00464     WISP_SERVICE_REQUEST(MessageExpireGroupSession);
00465     WISP_SERVICE_REQUEST(MessageLoginSession);
00466 
00467     WS_TWO_WAY_CALL("fetch", &EventQueue::fetch);
00468   }
00469 
00470   virtual ~EventQueue() { }
00471 };
00472 
00473 __END_NS_SSRC_WSPR_WS
00474 
00475 #endif

Savarese Software Research Corporation
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.