Savarese Software Research Corporation
session/service.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright 2006-2009 Savarese Software Research Corporation
00003  *
00004  * Licensed under the Apache License, Version 2.0 (the "License");
00005  * you may not use this file except in compliance with the License.
00006  * You may obtain a copy of the License at
00007  *
00008  *     https://www.savarese.com/software/ApacheLicense-2.0
00009  *
00010  * Unless required by applicable law or agreed to in writing, software
00011  * distributed under the License is distributed on an "AS IS" BASIS,
00012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  * See the License for the specific language governing permissions and
00014  * limitations under the License.
00015  */
00016 
00017 #include <ssrc/wispers/session/service.h>
00018 #include <ssrc/wispers/utility/checkpoint.h>
00019 
00020 
00021 __BEGIN_NS_SSRC_WSPR_SESSION
00022 
00028 void Session::process_request(const MessageInsert & msg, const MessageInfo &) {
00029   unsigned int i = msg.values.size();
00030 
00031   while(i-- > 0) {
00032     session_map::iterator && it = find<BySID>(msg.values[i].sid);
00033 
00034     if(it != end<BySID>()) {
00035       const_cast<SessionData &>(*it).attributes = msg.values[i].attributes;
00036       touch_session(it);
00037     }
00038   }
00039 }
00040 
00044 void Session::process_request(const MessageCreateSession & msg,
00045                               const MessageInfo & msginfo)
00046 {
00047   session_map::iterator it(end<BySID>());
00048 
00049   if(!msg.sid.empty())
00050     it = find<BySID>(msg.sid);
00051 
00052   if(it == end<BySID>())
00053     it = create_session().first;
00054 
00055   _caller.reply<CallSingleQueryResult>(Message::FIFOSelfDiscard,
00056                                        msginfo.sender(), msginfo.token(), *it);
00057 }
00058 
00062 void Session::process_request(const MessageGetSession & msg,
00063                               const MessageInfo & msginfo)
00064 {
00065   MessageSingleQueryResult result;
00066   session_map::iterator && it = find<BySID>(msg.sid);
00067 
00068   if(it != end<BySID>()) {
00069     result.found  = true;
00070     result.result = *it;
00071     if(msg.update_access_time)
00072       touch_session(it);
00073   }
00074 
00075   _caller.reply<CallSingleQueryResult>(msginfo.sender(), msginfo.token(),
00076                                        result, Message::FIFOSelfDiscard);
00077 }
00078 
00082 void Session::process_request(MessageSetAttributes & msg,
00083                               const MessageInfo &)
00084 {
00085   session_map::iterator && it = find<BySID>(msg.sid);
00086 
00087   if(it != end<BySID>()) {
00088     const_cast<SessionData &>(*it).attributes->merge(std::move(msg.attributes));
00089     touch_session(it);
00090   }
00091 }
00092 
00096 void Session::update_session(const SessionData & update) {
00097   session_map::iterator && it = find<BySID>(update.sid);
00098 
00099   if(it != end<BySID>()) {
00100     SessionData session = *it;
00101     get_index<BySID>().erase(it);
00102     session.uid = update.uid;
00103     session.attributes = update.attributes;
00104     _touch_session(session);
00105 
00106     index_by_uid & index = get_index<ByUID>();
00107     index_by_uid::iterator && uit = find<ByUID>(session.uid);
00108 
00109     if(uit != end<ByUID>()) {
00110       // We don't need to send an expiration message because the next
00111       // client action requiring a session will clear the session.
00112       // Everything else (e.g., EventQueue) indexes on uid.
00113       index.erase(uit);
00114     }
00115 
00116     insert<BySID>(session);
00117   }
00118 }
00119 
00120 void Session::process_request(const MessageLogoutSession & msg,
00121                               const MessageInfo &)
00122 {
00123   index_by_uid & index = get_index<ByUID>();
00124   index_by_uid::iterator && it = find<ByUID>(msg.session.uid);
00125 
00126   if(it != end<ByUID>()) {
00127     index.erase(it);
00128     MessageExpireSession expire;
00129     expire.sessions.push_back(msg.session.uid);
00130     _caller.send<CallExpireSession>(protocol_traits::event_group_expire(),
00131                                     expire);
00132   }
00133 }
00134 
00138 void Session::load_checkpoint(const std::string & filename)
00139   SSRC_DECL_THROW(std::runtime_error, boost::archive::archive_exception)
00140 {
00141   NS_SSRC_WSPR_UTILITY::load_checkpoint(_checkpoint_path, super::_index);
00142 
00143   for(index_by_sid::iterator it = begin<BySID>(); it != end<BySID>(); ++it)
00144     touch_session(it);
00145 }
00146 
00150 void Session::transition(State state) {
00151   switch(state) {
00152   case Starting:
00153     _caller.join(protocol_traits::service_group());
00154     _caller.join(protocol_traits::event_group_login());
00155     _caller.join(protocol_traits::event_group_logout());
00156     _poll_timeout = 
00157       schedule_timeout(boost::bind(&Session::check_for_expirations, this),
00158                        TimeValue(_poll_interval, 0));
00159     state = Started;
00160     break;
00161   case Stopping:
00162     if(_poll_timeout)
00163       cancel_timeout(_poll_timeout);
00164     _caller.leave(protocol_traits::event_group_logout());
00165     _caller.leave(protocol_traits::event_group_login());
00166     _caller.leave(protocol_traits::service_group());
00167     state = Stopped;
00168     //break;
00169   case Stopped:
00170     // TODO: remove this after we implement graceful restarts/upgrades.
00171     //expire_sessions(boost::integer_traits<idle_count_type>::const_max);
00172     NS_SSRC_WSPR_UTILITY::save_checkpoint(_checkpoint_path, super::_index);
00173     break;
00174   default:
00175     break;
00176   }
00177 
00178   super::transition(state);
00179 }
00180 
00184 Session::Session(super::caller_type & caller,
00185                  const SessionInitializer & initializer)
00186   SSRC_DECL_THROW(std::runtime_error, boost::archive::archive_exception) :
00187   super(caller), _random_id(),
00188   _expirations_per_message(initializer.expirations_per_message),
00189   _poll_interval(initializer.poll_interval),
00190   _max_idle_time(initializer.max_idle_time),
00191   _low_count(0), _touch_session(_max_idle_time / _poll_interval),
00192   _poll_timeout(), _checkpoint_path(initializer.checkpoint_path)
00193 {
00194   add_service_type(protocol_traits::service_type());
00195 
00196   WISP_SERVICE_REQUEST(MessageCreateSession);
00197   WISP_SERVICE_REQUEST(MessageSetAttributes);
00198   WISP_SERVICE_REQUEST(MessageGetSession);
00199   WISP_SERVICE_REQUEST(MessageUpdateSession);
00200   WISP_SERVICE_REQUEST(MessageLoginSession);
00201   WISP_SERVICE_REQUEST(MessageLogoutSession);
00202 
00203   load_checkpoint(_checkpoint_path);
00204 }
00205 
00206 void Session::expire_sessions(const idle_count_type low_count) {
00207   index_by_access & index = get_index<ByLastAccess>();
00208   index_by_access::iterator upper = index.upper_bound(low_count);
00209   index_by_access::iterator it = index.begin();
00210   MessageExpireSession msg;
00211   unsigned int count = 0;
00212 
00213   do {
00214     while(it != upper && count < _expirations_per_message) {
00215       msg.sessions.push_back(it->uid);
00216       it = index.erase(it);
00217       ++count;
00218     }
00219 
00220     if(msg.sessions.size() > 0) {
00221       _caller.send<CallExpireSession>(protocol_traits::event_group_expire(),
00222                                       msg);
00223       msg.sessions.clear();
00224     }
00225   } while(it != upper);
00226 }
00227 
00231 string Session::create_sid() {
00232   sid_type sid(SessionIdNumChars, ' ');
00233   session_map::iterator && end_ = end<BySID>();
00234 
00235   do {
00236     _random_id(sid);
00237     // TODO: Protect against accidental infinite loop (e.g., if we fill up)!
00238   } while(find<BySID>(sid) != end_);
00239 
00240   return sid;
00241 }
00242 
00243 
00244 
00245 __END_NS_SSRC_WSPR_SESSION

Savarese Software Research Corporation
Copyright © 2006-2011 Savarese Software Research Corporation. All rights reserved.