Savarese Software Research Corporation
service/service.h
Go to the documentation of this file.
1 /* Copyright 2006-2013 Savarese Software Research Corporation.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.savarese.com/software/ApacheLicense-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
21 #ifndef __SSRC_WISP_SERVICE_SERVICE_H
22 #define __SSRC_WISP_SERVICE_SERVICE_H
23 
26 
27 #include <functional>
28 
29 #ifdef WISP_DEBUG
30 #include <iostream>
31 #endif
32 
34 
35 using NS_SSRC_SPREAD::MembershipInfo;
36 using NS_SSRC_SPREAD::Message;
37 using NS_SSRC_SPREAD::GroupList;
38 using NS_SSRC_WISP_PROTOCOL::MessageInfo;
40 using NS_SSRC_WISP_PROTOCOL::CallException;
45 
46 #define WISP_SERVICE_REQUEST(MessageType) \
47  set_request_handler<MessageType>(*this)
48 #define WISP_SERVICE_RESPONSE(MessageType) \
49  set_response_handler<MessageType>(*this)
50 #define WISP_SERVICE_REQUEST_T(MessageType) \
51  this->template set_request_handler<MessageType>(*this)
52 #define WISP_SERVICE_RESPONSE_T(MessageType) \
53  this->template set_response_handler<MessageType>(*this)
54 
55 #define WISP_SERVICE_REQUEST_BUFFERED(MessageType, msg) \
56  set_request_handler<MessageType>(*this, msg)
57 #define WISP_SERVICE_RESPONSE_BUFFERED(MessageType, msg) \
58  set_response_handler<MessageType>(*this, msg)
59 #define WISP_SERVICE_REQUEST_BUFFERED_T(MessageType, msg) \
60  this->template set_request_handler<MessageType>(*this, msg)
61 #define WISP_SERVICE_RESPONSE_BUFFERED_T(MessageType, msg) \
62  this->template set_response_handler<MessageType>(*this, msg)
63 
64 
65 typedef std::function<void (const EventInfo &)> timeout_handler;
66 // We expose message_handler_type, message_handler_entry, and
67 // message_handler_map specifically to support dynamically loaded handlers.
68 typedef std::function<void (MessageInfo &)> message_handler_type;
69 
74 
76  const wisp_message_id id,
77  const message_handler_type & message_handler) :
78  protocol(protocol), id(id), handle_message(message_handler)
79  { }
80 };
81 
82 typedef boost::multi_index_container<
84  boost::multi_index::indexed_by<
85  boost::multi_index::hashed_unique<
86  boost::multi_index::composite_key<
88  boost::multi_index::member<message_handler_entry,
91  boost::multi_index::member<message_handler_entry, wisp_message_id,
93  >
95 
96 /* BEGIN EXPERIMENTAL CODE */
97 // TODO: This is all one big kluge and should be re-implemented, but it works..
98 class ServiceContext {
99 public:
100  class TimeoutHandler : public EventHandler {
101  ServiceContext & _context;
102  bool _once;
103  timeout_handler _handler;
104 
105  public:
106 
108  const timeout_handler & handler, bool once) :
109  _context(context), _once(once), _handler(handler)
110  { }
111 
112  virtual ~TimeoutHandler() = default;
113 
114  virtual void handle_timeout(const EventInfo & info) {
115  _handler(info);
116  // TODO: WARNING: This could blow up because the current object
117  // may be deleted on the call to remove_timeout. This works only
118  // because the removal happens at the end of the function and we
119  // don't access 'this' again. Seek an alternative solution.
120  if(_once)
121  _context.remove_timeout(this);
122  }
123 
124  void execute() {
125  handle_timeout(EventInfo(_context.event_loop(), EventLoop::None, false));
126  }
127 
128  bool once() {
129  return _once;
130  }
131 
132  const TimeoutHandler *address() const {
133  return this;
134  }
135  };
136 
137  typedef std::shared_ptr<TimeoutHandler> timeout_handler_ptr;
138 
139  typedef boost::multi_index_container<
141  boost::multi_index::indexed_by<
142  boost::multi_index::hashed_unique<
143  boost::multi_index::const_mem_fun<TimeoutHandler,
144  const TimeoutHandler *,
147 
148 private:
149  EventLoop *loop;
150  timeout_map _timeouts;
151 
152 public:
153 
154  explicit ServiceContext(EventLoop * loop = 0) : loop(loop) { }
155 
156  timeout_map::size_type count_timeouts() const {
157  return _timeouts.size();
158  }
159 
160  void clear_timeouts() {
161  for(timeout_map::iterator it = _timeouts.begin(), end = _timeouts.end();
162  it != end; ++it)
163  remove_timeout(it->get());
164  }
165 
166  timeout_handler_ptr add_timeout(const timeout_handler & handler_fun,
167  const TimeValue & timeout,
168  const bool once)
169  {
170  timeout_handler_ptr handler(make_smart_ptr<timeout_handler_ptr>(*this, handler_fun, once));
171  _timeouts.insert(handler);
172  loop->add_handler(*handler, EventLoop::None, timeout);
173  return handler;
174  }
175 
176  void remove_timeout(TimeoutHandler *timeout) {
177  // must remove from loop first or we lose pointer!!!
178  loop->remove_handler(*timeout);
179  _timeouts.erase(timeout);
180  }
181 
182  EventLoop & event_loop() { return *loop; }
183 };
184 
185 
187 
188 /* END EXPERIMENTAL CODE */
189 
190 template<typename PackingTraits = BinaryPackingTraits>
192 public:
194  typedef typename protocol::ContinuationCaller<packing_traits> caller_type;
195 
197 
207 
208 private:
209  State _state;
210 
211 private:
212 
213  message_handler_map _request_handlers;
214  message_handler_map _response_handlers;
215 
216 protected:
217  caller_type & _caller;
218 
219  // WARNING!! This is highly experimental and only made available so
220  // a subclass may add handlers to the event loop for I/O on other
221  // descriptors. The context is only valid in the Starting and
222  // Started states.
223  ServiceContext & context() { return _context; };
224 
225  virtual void process_membership_message(const MessageInfo & msginfo,
226  const MembershipInfo & meminfo)
227  { }
228 
229  // So you can reuse message types that are expensive to create on each call.
230  template<typename MessageType, typename Impl>
231  void request(Impl & impl, MessageType & msg, MessageInfo & msginfo)
232  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
233  CallException)
234  {
235  _caller.unpack(msg, msginfo);
236  // Cast forces msginfo to be const arg in process_request
237  impl.process_request(msg, static_cast<const MessageInfo &>(msginfo));
238  }
239 
240  template<typename MessageType, typename Impl>
241  void request(Impl & impl, MessageInfo & msginfo)
242  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
243  {
244  MessageType msg;
245  request(impl, msg, msginfo);
246  }
247 
248  template<typename MessageType, typename Impl>
249  void respond(Impl & impl, MessageType & msg, MessageInfo & msginfo)
250  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure,
251  CallException)
252  {
253  _caller.unpack(msg, msginfo);
254  // Cast forces msginfo to be const arg in process_response
255  impl.process_response(msg, static_cast<const MessageInfo &>(msginfo));
256  }
257 
258  template<typename MessageType, typename Impl>
259  void respond(Impl & impl, MessageInfo & msginfo)
260  SSRC_DECL_THROW(boost::archive::archive_exception, std::ios_base::failure)
261  {
262  MessageType msg;
263  respond(impl, msg, msginfo);
264  }
265 
266  bool set_request_handler(const message_handler_entry & handler) {
267  return _request_handlers.insert(handler).second;
268  }
269 
270  message_handler_map::size_type
272  return
273  _request_handlers.erase(_request_handlers.key_extractor()((handler)));
274  }
275 
277  _request_handlers.clear();
278  }
279 
280  template<typename MessageType, typename Impl>
281  bool set_request_handler(Impl & impl) {
282  constexpr void (ServiceProtocolProcessor::*handler)(Impl &, MessageInfo &) =
283  &ServiceProtocolProcessor::template request<MessageType, Impl>;
284  return set_request_handler(message_handler_entry(MessageType::protocol,
285  MessageType::id,
286  std::bind(handler, this, std::ref(impl), std::placeholders::_1)));
287  }
288 
289  template<typename MessageType, typename Impl>
290  bool set_request_handler(Impl & impl, MessageType & buffer) {
291  constexpr void (ServiceProtocolProcessor::*handler)
292  (Impl &, MessageType &, MessageInfo &) =
293  &ServiceProtocolProcessor::template request<MessageType, Impl>;
294  return set_request_handler(message_handler_entry(MessageType::protocol,
295  MessageType::id,
296  std::bind(handler, this, std::ref(impl), std::ref(buffer), std::placeholders::_1)));
297  }
298 
300  return _response_handlers.insert(handler).second;
301  }
302 
303  message_handler_map::size_type
305  return
306  _response_handlers.erase(_response_handlers.key_extractor()((handler)));
307  }
308 
310  _response_handlers.clear();
311  }
312 
313  template<typename MessageType, typename Impl>
314  bool set_response_handler(Impl & impl) {
315  constexpr void (ServiceProtocolProcessor::*handler)(Impl &, MessageInfo &) =
316  &ServiceProtocolProcessor::template respond<MessageType, Impl>;
317  return set_response_handler(message_handler_entry(MessageType::protocol,
318  MessageType::id,
319  std::bind(handler, this, std::ref(impl), std::placeholders::_1))); }
320 
321  template<typename MessageType, typename Impl>
322  bool set_response_handler(Impl & impl, MessageType & buffer) {
323  constexpr void (ServiceProtocolProcessor::*handler)
324  (Impl &, MessageType &, MessageInfo &) =
325  &ServiceProtocolProcessor::template respond<MessageType, Impl>;
326  return set_response_handler(message_handler_entry(MessageType::protocol,
327  MessageType::id,
328  std::bind(handler, this, std::ref(impl), std::ref(buffer), std::placeholders::_1)));
329  }
330 
331  virtual void transition(State state) {
332  _state = state;
333  }
334 
335 public:
336 
337  explicit ServiceProtocolProcessor(caller_type & caller) :
338  _state(Stopped),
339  _request_handlers(), _response_handlers(), _caller(caller)
340  { }
341 
342  virtual ~ServiceProtocolProcessor() = default;
343 
344  std::string name() const {
345  return _caller.name();
346  }
347 
348  State state() const {
349  return _state;
350  }
351 
352  void start() {
354  }
355 
356  // Services listening for membership messages should wait for self-leave
357  // before transitioning to Stopped.
358  void stop() {
360  }
361 
362  void membership(const MessageInfo & msginfo,
363  const MembershipInfo & meminfo)
364  {
365  process_membership_message(msginfo, meminfo);
366  }
367 
368  void request(MessageInfo & msginfo) {
369  message_handler_map::iterator it =
370  _request_handlers.find(std::make_tuple(msginfo.protocol(), msginfo.id()));
371  if(it != _request_handlers.end()) {
372  it->handle_message(msginfo);
373  }
374  // TODO: add an else log unhandled message
375  }
376 
380  void response(MessageInfo & msginfo) {
381  if(!_caller.resume(msginfo)) {
382  message_handler_map::iterator it =
383  _response_handlers.find(std::make_tuple(msginfo.protocol(),
384  msginfo.id()));
385  if(it != _response_handlers.end()) {
386  it->handle_message(msginfo);
387  }
388  // TODO: add an else log unhandled message
389  }
390  }
391 
392  /* BEGIN EXPERIMENTAL CODE */
393  // TODO: This is all one big kluge and should be re-implemented (which
394  // is why this is stuffed at the end of the class), but it works.
395 
397  const TimeValue & timeout,
398  const bool once = EventLoop::Persist)
399  {
400  return _context.add_timeout(handler, timeout, once);
401  }
402 
403  void cancel_timeout(const timeout_ptr & timeout) {
404  _context.remove_timeout(timeout.get());
405  }
406 
407  void clear_timeouts() {
408  _context.clear_timeouts();
409  }
410 
411  ServiceContext::timeout_map::size_type count_timeouts() const {
412  return _context.count_timeouts();
413  }
414 
415 private:
416  template<typename PP> friend class ServiceEventHandler;
417 
418  ServiceContext _context;
419 
420  void set_service_context(const ServiceContext & context) {
421  _context = context;
422  }
423 
424  /* END EXPERIMENTAL CODE */
425 };
426 
427 template<typename PP>
429 public:
430  typedef PP protocol_processor;
431  WISP_IMPORT_T(protocol_processor, caller_type);
432 
433 private:
434  caller_type & _caller;
435  protocol_processor _protocol;
436  // Scratch variables for handle_event().
437  message_info_ptr _message_info;
438  MembershipInfo _membership_info;
439  protocol::wisp_call_token _min_token, _max_token;
440  std::vector<typename caller_type::jumbo_message_key_type> _jumbo_message_keys;
441 
442 protected:
443 
444  virtual void remove_handler(EventLoop & loop) {
445  /* BEGIN EXPERIMENTAL CODE */
446  // TODO: This is a kluge and should be re-implemented, but it works.
447  _protocol.clear_timeouts();
448  /* END EXPERIMENTAL CODE */
449  loop.remove_handler(*this);
450  _caller.cancel_all();
451  }
452 
453 public:
454 
455  explicit ServiceEventHandler(caller_type & caller) :
456  _caller(caller), _protocol(caller),
457  _message_info(make_smart_ptr<message_info_ptr>(caller.message_capacity_receive())),
458  _membership_info(),
459  _min_token(0), _max_token(0),
460  _jumbo_message_keys(4)
461  { }
462 
463  template<typename Initializer>
464  explicit ServiceEventHandler(caller_type & caller,
465  const Initializer & initializer) :
466  _caller(caller), _protocol(caller, initializer),
467  _message_info(),
468  _membership_info(),
469  _min_token(0), _max_token(0),
470  _jumbo_message_keys(4)
471  { }
472 
473  virtual ~ServiceEventHandler() = default;
474 
475  typename protocol_processor::State state() const {
476  return _protocol.state();
477  }
478 
479  virtual int event_descriptor() const {
480  return _caller.mbox().descriptor();
481  }
482 
483  virtual void handle_read(const EventInfo & info) {
484  try {
485  if(state() != protocol_processor::Stopped) {
486  _caller.receive(_message_info);
487 
488  if(_message_info->message.is_regular() &&
489  _protocol.state() < protocol_processor::Stopping)
490  {
491  if(_message_info->role() == protocol::TwoWayResponse)
492  _protocol.response(*_message_info);
493  else
494  _protocol.request(*_message_info);
495  } else if(_message_info->message.is_membership()) {
496  _message_info->message.get_membership_info(_membership_info);
497  _protocol.membership(*_message_info, _membership_info);
498  }
499 
500  if(state() == protocol_processor::Stopped)
501  remove_handler(info.event_loop());
502  }
503  } catch(const boost::archive::archive_exception & ae) {
504  // TODO: log
505 #ifdef WISP_DEBUG
506  std::cerr << _protocol.name()
507  << ": Caught boost::archive::archive_exception: "
508  << ae.what() << "\nContinuing.";
509 #endif
510  } catch(const std::ios_base::failure & iof) {
511  // TODO: log. probably don't want to try again
512 #ifdef WISP_DEBUG
513  std::cerr << _protocol.name() << ": Caught std::ios_base::failure: "
514  << iof.what() << "\nContinuing.";
515 #endif
516  }
517  }
518 
519  virtual void handle_timeout(const EventInfo & info) {
520  // Cancel continuations
521  _caller.cancel_range(_min_token, _max_token);
522  _min_token = _max_token;
523  _max_token = _caller.call_token();
524 
525  // Clear out incomplete jumbo messages
526  if(!_jumbo_message_keys.empty()) {
527  if(_caller.count_jumbo_messages() > 0) {
528  _caller.erase_jumbo_messages(_jumbo_message_keys.begin(),
529  _jumbo_message_keys.end());
530  }
531  _jumbo_message_keys.clear();
532  }
533 
534  if(_caller.count_jumbo_messages() > 0) {
535  _caller.collect_jumbo_message_keys(_jumbo_message_keys);
536  }
537  }
538 
539  protocol_processor & protocol() {
540  return _protocol;
541  }
542 
543  void start(EventLoop & loop, const unsigned int call_timeout) {
544  _min_token = _max_token = _caller.call_token();
545  loop.add_handler(*this, EventLoop::Read, TimeValue(call_timeout, 0));
546  /* BEGIN EXPERIMENTAL CODE */
547  // TODO: This is a kluge and should be re-implemented, but it works.
548  _protocol.set_service_context(ServiceContext(&loop));
549  /* END EXPERIMENTAL CODE */
550  _protocol.start();
551  }
552 
553  void stop() {
554  _protocol.stop();
555  }
556 };
557 
558 
562 template<typename EH>
563 class Service {
564 public:
565  typedef EH event_handler;
566  WISP_IMPORT_T(event_handler, caller_type);
567  WISP_IMPORT_T(event_handler, protocol_processor);
568 
569 private:
570  caller_type _caller;
571  event_handler _handler;
572 
573 protected:
574 
575  protocol_processor & protocol() {
576  return _handler.protocol();
577  }
578 
579 public:
580 
581  explicit Service(const std::string & connection = "",
582  const std::string & name = "",
583  const unsigned int message_capacity =
584  Message::DefaultCapacity) :
585  _caller(connection, name, message_capacity,
586  protocol_processor::GroupMembership),
587  _handler(_caller)
588  { }
589 
590  template<typename Initializer>
591  explicit Service(const Initializer & initializer,
592  const std::string & connection = "",
593  const std::string & name = "",
594  const unsigned int message_capacity =
595  Message::DefaultCapacity) :
596  _caller(connection, name, message_capacity,
597  protocol_processor::GroupMembership),
598  _handler(_caller, initializer)
599  { }
600 
601  const std::string & name() const {
602  return _caller.name();
603  }
604 
605  typename protocol_processor::State state() {
606  return protocol().state();
607  }
608 
609  void start(EventLoop & loop, unsigned int call_timeout) {
610  _handler.start(loop, call_timeout);
611  }
612 
613  void stop() {
614  _handler.stop();
615  }
616 };
617 
619 
620 #endif

Savarese Software Research Corporation
Copyright © 2006-2012 Savarese Software Research Corporation. All rights reserved.