Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
BidirMMapPipe.cxx
Go to the documentation of this file.
1 /** @file BidirMMapPipe.cxx
2  *
3  * implementation of BidirMMapPipe, a class which forks off a child process
4  * and serves as communications channel between parent and child
5  *
6  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
7  * @date 2013-07-07
8  */
9 #ifndef _WIN32
10 #include <map>
11 #include <cerrno>
12 #include <limits>
13 #include <string>
14 #include <cstdlib>
15 #include <cstring>
16 #include <iostream>
17 #include <algorithm>
18 #include <exception>
19 
20 #include <poll.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #include <sys/wait.h>
30 #include <sys/socket.h>
31 
32 #include "BidirMMapPipe.h"
33 
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
36 
37 BEGIN_NAMESPACE_ROOFIT
38 
39 /// namespace for implementation details of BidirMMapPipe
40 namespace BidirMMapPipe_impl {
41  /** @brief exception to throw if low-level OS calls go wrong
42  *
43  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
44  * @date 2013-07-07
45  */
46  class BidirMMapPipeException : public std::exception
47  {
48  private:
49  enum {
50  s_sz = 256 ///< length of buffer
51  };
52  char m_buf[s_sz]; ///< buffer containing the error message
53 
54  /// for the POSIX version of strerror_r
55  static int dostrerror_r(int err, char* buf, std::size_t sz,
56  int (*f)(int, char*, std::size_t))
57  { return f(err, buf, sz); }
58  /// for the GNU version of strerror_r
59  static int dostrerror_r(int, char*, std::size_t,
60  char* (*f)(int, char*, std::size_t));
61  public:
62  /// constructor taking error code, hint on operation (msg)
63  BidirMMapPipeException(const char* msg, int err);
64  /// return a destcription of what went wrong
65  virtual const char* what() const noexcept { return m_buf; }
66  };
67 
68  BidirMMapPipeException::BidirMMapPipeException(const char* msg, int err)
69  {
70  std::size_t msgsz = std::strlen(msg);
71  if (msgsz) {
72  msgsz = std::min(msgsz, std::size_t(s_sz));
73  std::copy(msg, msg + msgsz, m_buf);
74  if (msgsz < s_sz) { m_buf[msgsz] = ':'; ++msgsz; }
75  if (msgsz < s_sz) { m_buf[msgsz] = ' '; ++msgsz; }
76  }
77  if (msgsz < s_sz) {
78  // UGLY: GNU and POSIX cannot agree on prototype and behaviour, so
79  // have to sort it out with overloads
80  dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
81  }
82  m_buf[s_sz - 1] = 0; // enforce zero-termination
83  }
84 
85  int BidirMMapPipeException::dostrerror_r(int err, char* buf,
86  std::size_t sz, char* (*f)(int, char*, std::size_t))
87  {
88  buf[0] = 0;
89  char *tmp = f(err, buf, sz);
90  if (tmp && tmp != buf) {
91  std::strncpy(buf, tmp, sz);
92  buf[sz - 1] = 0;
93  if (std::strlen(tmp) > sz - 1) return ERANGE;
94  }
95  return 0;
96  }
97 
98  /** @brief class representing the header structure in an mmapped page
99  *
100  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
101  * @date 2013-07-07
102  *
103  * contains a field to put pages into a linked list, a field for the size
104  * of the data being transmitted, and a field for the position until which
105  * the data has been read
106  */
107  class Page
108  {
109  private:
110  // use as small a data type as possible to maximise payload area
111  // of pages
112  short m_next; ///< next page in list (in pagesizes)
113  unsigned short m_size; ///< size of payload (in bytes)
114  unsigned short m_pos; ///< index of next byte in payload area
115  /// copy construction forbidden
116  Page(const Page&) {}
117  /// assigment forbidden
118  Page& operator=(const Page&) = delete;
119  public:
120  /// constructor
121  Page() : m_next(0), m_size(0), m_pos(0)
122  {
123  // check that short is big enough - must be done at runtime
124  // because the page size is not known until runtime
125  assert(std::numeric_limits<unsigned short>::max() >=
126  PageChunk::pagesize());
127  }
128  /// set pointer to next page
129  void setNext(const Page* p);
130  /// return pointer to next page
131  Page* next() const;
132  /// return reference to size field
133  unsigned short& size() { return m_size; }
134  /// return size (of payload data)
135  unsigned size() const { return m_size; }
136  /// return reference to position field
137  unsigned short& pos() { return m_pos; }
138  /// return position
139  unsigned pos() const { return m_pos; }
140  /// return pointer to first byte in payload data area of page
141  inline unsigned char* begin() const
142  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
143  + sizeof(Page); }
144  /// return pointer to first byte in payload data area of page
145  inline unsigned char* end() const
146  { return reinterpret_cast<unsigned char*>(const_cast<Page*>(this))
147  + PageChunk::pagesize(); }
148  /// return the capacity of the page
149  static unsigned capacity()
150  { return PageChunk::pagesize() - sizeof(Page); }
151  /// true if page empty
152  bool empty() const { return !m_size; }
153  /// true if page partially filled
154  bool filled() const { return !empty(); }
155  /// free space left (to be written to)
156  unsigned free() const { return capacity() - m_size; }
157  /// bytes remaining to be read
158  unsigned remaining() const { return m_size - m_pos; }
159  /// true if page completely full
160  bool full() const { return !free(); }
161  };
162 
163  void Page::setNext(const Page* p)
164  {
165  if (!p) {
166  m_next = 0;
167  } else {
168  const char* p1 = reinterpret_cast<char*>(this);
169  const char* p2 = reinterpret_cast<const char*>(p);
170  std::ptrdiff_t tmp = p2 - p1;
171  // difference must be divisible by page size
172  assert(!(tmp % PageChunk::pagesize()));
173  tmp /= static_cast<std::ptrdiff_t>(PageChunk::pagesize());
174  m_next = tmp;
175  // no truncation when saving in a short
176  assert(m_next == tmp);
177  // final check: next() must return p
178  assert(next() == p);
179  }
180  }
181 
182  Page* Page::next() const
183  {
184  if (!m_next) return 0;
185  char* ptmp = reinterpret_cast<char*>(const_cast<Page*>(this));
186  ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
187  return reinterpret_cast<Page*>(ptmp);
188  }
189 
190  /** @brief class representing a page pool
191  *
192  * @author Manuel Schiller <manuel.schiller@nikhef.nl>
193  * @date 2013-07-24
194  *
195  * pool of mmapped pages (on systems which support it, on all others, the
196  * functionality is emulated with dynamically allocated memory)
197  *
198  * in most operating systems there is a limit to how many mappings any one
199  * process is allowed to request; for this reason, we mmap a relatively
200  * large amount up front, and then carve off little pieces as we need them
201  *
202  * Moreover, some systems have too large a physical page size in their MMU
203  * for the code to handle (we want offsets and lengths to fit into 16
204  * bits), so we carve such big physical pages into smaller logical Pages
205  * if needed. The largest logical page size is currently 16 KiB.
206  */
207  class PagePool {
208  private:
209  /// convenience typedef
210  typedef BidirMMapPipeException Exception;
211 
212  enum {
213  minsz = 7, ///< minimum chunk size (just below 1 << minsz bytes)
214  maxsz = 20, ///< maximum chunk size (just below 1 << maxsz bytes)
215  szincr = 1 ///< size class increment (sz = 1 << (minsz + k * szincr))
216  };
217  /// a chunk of memory in the pool
218  typedef BidirMMapPipe_impl::PageChunk Chunk;
219  /// list of chunks
220  typedef std::list<Chunk*> ChunkList;
221 
222  friend class BidirMMapPipe_impl::PageChunk;
223  public:
224  /// convenience typedef
225  typedef PageChunk::MMapVariety MMapVariety;
226  /// constructor
227  PagePool(unsigned nPagesPerGroup);
228  /// destructor
229  ~PagePool();
230  /// pop a free element out of the pool
231  Pages pop();
232 
233  /// return (logical) page size of the system
234  static unsigned pagesize() { return PageChunk::pagesize(); }
235  /// return variety of mmap supported on the system
236  static MMapVariety mmapVariety()
237  { return PageChunk::mmapVariety(); }
238 
239  /// return number of pages per group (ie. as returned by pop())
240  unsigned nPagesPerGroup() const { return m_nPgPerGrp; }
241 
242  /// zap the pool (unmap all but Pages p)
243  void zap(Pages& p);
244 
245  private:
246  /// list of chunks used by the pool
247  ChunkList m_chunks;
248  /// list of chunks used by the pool which are not full
249  ChunkList m_freelist;
250  /// chunk size map (histogram of chunk sizes)
251  unsigned m_szmap[(maxsz - minsz) / szincr];
252  /// current chunk size
253  int m_cursz;
254  /// page group size
255  unsigned m_nPgPerGrp;
256 
257  /// adjust _cursz to current largest block
258  void updateCurSz(int sz, int incr);
259  /// find size of next chunk to allocate (in a hopefully smart way)
260  int nextChunkSz() const;
261  /// release a chunk
262  void putOnFreeList(Chunk* chunk);
263  /// release a chunk
264  void release(Chunk* chunk);
265  };
266 
267  Pages::Pages(PageChunk* parent, Page* pages, unsigned npg) :
268  m_pimpl(new impl)
269  {
270  assert(npg < 256);
271  m_pimpl->m_parent = parent;
272  m_pimpl->m_pages = pages;
273  m_pimpl->m_refcnt = 1;
274  m_pimpl->m_npages = npg;
275  /// initialise pages
276  for (unsigned i = 0; i < m_pimpl->m_npages; ++i) new(page(i)) Page();
277  }
278 
279  unsigned PageChunk::s_physpgsz = PageChunk::getPageSize();
280  unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
281  PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
282 
283  Pages::~Pages()
284  {
285  if (m_pimpl && !--(m_pimpl->m_refcnt)) {
286  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
287  delete m_pimpl;
288  }
289  }
290 
291  Pages::Pages(const Pages& other) :
292  m_pimpl(other.m_pimpl)
293  { ++(m_pimpl->m_refcnt); }
294 
295  Pages& Pages::operator=(const Pages& other)
296  {
297  if (&other == this) return *this;
298  if (--(m_pimpl->m_refcnt)) {
299  if (m_pimpl->m_parent) m_pimpl->m_parent->push(*this);
300  delete m_pimpl;
301  }
302  m_pimpl = other.m_pimpl;
303  ++(m_pimpl->m_refcnt);
304  return *this;
305  }
306 
307  unsigned Pages::pagesize() { return PageChunk::pagesize(); }
308 
309  Page* Pages::page(unsigned pgno) const
310  {
311  assert(pgno < m_pimpl->m_npages);
312  unsigned char* pptr =
313  reinterpret_cast<unsigned char*>(m_pimpl->m_pages);
314  pptr += pgno * pagesize();
315  return reinterpret_cast<Page*>(pptr);
316  }
317 
318  unsigned Pages::pageno(Page* p) const
319  {
320  const unsigned char* pptr =
321  reinterpret_cast<const unsigned char*>(p);
322  const unsigned char* bptr =
323  reinterpret_cast<const unsigned char*>(m_pimpl->m_pages);
324  assert(0 == ((pptr - bptr) % pagesize()));
325  const unsigned nr = (pptr - bptr) / pagesize();
326  assert(nr < m_pimpl->m_npages);
327  return nr;
328  }
329 
330  unsigned PageChunk::getPageSize()
331  {
332  // find out page size of system
333  long pgsz = sysconf(_SC_PAGESIZE);
334  if (-1 == pgsz) throw Exception("sysconf", errno);
335  if (pgsz > 512 && pgsz > long(sizeof(Page)))
336  return pgsz;
337 
338  // in case of failure or implausible value, use a safe default: 4k
339  // page size, and do not try to mmap
340  s_mmapworks = Copy;
341  return 1 << 12;
342  }
343 
344  PageChunk::PageChunk(PagePool* parent,
345  unsigned length, unsigned nPgPerGroup) :
346  m_begin(dommap(length)),
347  m_end(reinterpret_cast<void*>(
348  reinterpret_cast<unsigned char*>(m_begin) + length)),
349  m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
350  {
351  // ok, push groups of pages onto freelist here
352  unsigned char* p = reinterpret_cast<unsigned char*>(m_begin);
353  unsigned char* pend = reinterpret_cast<unsigned char*>(m_end);
354  while (p < pend) {
355  m_freelist.push_back(reinterpret_cast<void*>(p));
356  p += nPgPerGroup * PagePool::pagesize();
357  }
358  }
359 
360  PageChunk::~PageChunk()
361  {
362  if (m_parent) assert(empty());
363  if (m_begin) domunmap(m_begin, len());
364  }
365 
366  bool PageChunk::contains(const Pages& p) const
367  { return p.m_pimpl->m_parent == this; }
368 
369  Pages PageChunk::pop()
370  {
371  assert(!m_freelist.empty());
372  void* p = m_freelist.front();
373  m_freelist.pop_front();
374  ++m_nUsedGrp;
375  return Pages(this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
376  }
377 
378  void PageChunk::push(const Pages& p)
379  {
380  assert(contains(p));
381  bool wasempty = m_freelist.empty();
382  m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
383  --m_nUsedGrp;
384  if (m_parent) {
385  // notify parent if we need to be put on the free list again
386  if (wasempty) m_parent->putOnFreeList(this);
387  // notify parent if we're empty
388  if (empty()) return m_parent->release(this);
389  }
390  }
391 
392  void* PageChunk::dommap(unsigned len)
393  {
394  assert(len && 0 == (len % s_physpgsz));
395  // ok, the idea here is to try the different methods of mmapping, and
396  // choose the first one that works. we have four flavours:
397  // 1 - anonymous mmap (best)
398  // 2 - mmap of /dev/zero (about as good as anonymous mmap, but a tiny
399  // bit more tedious to set up, since you need to open/close a
400  // device file)
401  // 3 - mmap of a temporary file (very tedious to set up - need to
402  // create a temporary file, delete it, make the underlying storage
403  // large enough, then mmap the fd and close it)
404  // 4 - if all those fail, we malloc the buffers, and copy the data
405  // through the OS (then we're no better than normal pipes)
406  static bool msgprinted = false;
407  if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
408 #if defined(MAP_ANONYMOUS)
409 #undef MYANONFLAG
410 #define MYANONFLAG MAP_ANONYMOUS
411 #elif defined(MAP_ANON)
412 #undef MYANONFLAG
413 #define MYANONFLAG MAP_ANON
414 #else
415 #undef MYANONFLAG
416 #endif
417 #ifdef MYANONFLAG
418  void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419  MYANONFLAG | MAP_SHARED, -1, 0);
420  if (MAP_FAILED == retVal) {
421  if (Anonymous == s_mmapworks) throw Exception("mmap", errno);
422  } else {
423  assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
424  s_mmapworks = Anonymous;
425  if (BidirMMapPipe::debugflag() && !msgprinted) {
426  std::cerr << " INFO: In " << __func__ << " (" <<
427  __FILE__ << ", line " << __LINE__ <<
428  "): anonymous mmapping works, excellent!" <<
429  std::endl;
430  msgprinted = true;
431  }
432  return retVal;
433  }
434 #endif
435 #undef MYANONFLAG
436  }
437  if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
438  // ok, no anonymous mappings supported directly, so try to map
439  // /dev/zero which has much the same effect on many systems
440  int fd = ::open("/dev/zero", O_RDWR);
441  if (-1 == fd)
442  throw Exception("open /dev/zero", errno);
443  void* retVal = ::mmap(0, len,
444  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445  if (MAP_FAILED == retVal) {
446  int errsv = errno;
447  ::close(fd);
448  if (DevZero == s_mmapworks) throw Exception("mmap", errsv);
449  } else {
450  assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
451  s_mmapworks = DevZero;
452  }
453  if (-1 == ::close(fd))
454  throw Exception("close /dev/zero", errno);
455  if (BidirMMapPipe::debugflag() && !msgprinted) {
456  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
457  ", line " << __LINE__ << "): mmapping /dev/zero works, "
458  "very good!" << std::endl;
459  msgprinted = true;
460  }
461  return retVal;
462  }
463  if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
464  char name[] = "/tmp/BidirMMapPipe-XXXXXX";
465  int fd;
466  // open temp file
467  if (-1 == (fd = ::mkstemp(name))) throw Exception("mkstemp", errno);
468  // remove it, but keep fd open
469  if (-1 == ::unlink(name)) {
470  int errsv = errno;
471  ::close(fd);
472  throw Exception("unlink", errsv);
473  }
474  // make it the right size: lseek
475  if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
476  int errsv = errno;
477  ::close(fd);
478  throw Exception("lseek", errsv);
479  }
480  // make it the right size: write a byte
481  if (1 != ::write(fd, name, 1)) {
482  int errsv = errno;
483  ::close(fd);
484  throw Exception("write", errsv);
485  }
486  // do mmap
487  void* retVal = ::mmap(0, len,
488  PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489  if (MAP_FAILED == retVal) {
490  int errsv = errno;
491  ::close(fd);
492  if (FileBacked == s_mmapworks) throw Exception("mmap", errsv);
493  } else {
494  assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
495  s_mmapworks = FileBacked;
496  }
497  if (-1 == ::close(fd)) {
498  int errsv = errno;
499  ::munmap(retVal, len);
500  throw Exception("close", errsv);
501  }
502  if (BidirMMapPipe::debugflag() && !msgprinted) {
503  std::cerr << " INFO: In " << __func__ << " (" << __FILE__ <<
504  ", line " << __LINE__ << "): mmapping temporary files "
505  "works, good!" << std::endl;
506  msgprinted = true;
507  }
508  return retVal;
509  }
510  if (Copy == s_mmapworks || Unknown == s_mmapworks) {
511  // fallback solution: mmap does not work on this OS (or does not
512  // work for what we want to use it), so use a normal buffer of
513  // memory instead, and collect data in that buffer - this needs an
514  // additional write/read to/from the pipe(s), but there you go...
515  if (BidirMMapPipe::debugflag() && !msgprinted) {
516  std::cerr << "WARNING: In " << __func__ << " (" << __FILE__ <<
517  ", line " << __LINE__ << "): anonymous mmapping of "
518  "shared buffers failed, falling back to read/write on "
519  " pipes!" << std::endl;
520  msgprinted = true;
521  }
522  s_mmapworks = Copy;
523  void* retVal = std::malloc(len);
524  if (!retVal) throw Exception("malloc", errno);
525  return retVal;
526  }
527  // should never get here
528  assert(false);
529  return 0;
530  }
531 
532  void PageChunk::domunmap(void* addr, unsigned len)
533  {
534  assert(len && 0 == (len % s_physpgsz));
535  if (addr) {
536  assert(Unknown != s_mmapworks);
537  if (Copy != s_mmapworks) {
538  if (-1 == ::munmap(addr, len))
539  throw Exception("munmap", errno);
540  } else {
541  std::free(addr);
542  }
543  }
544  }
545 
546  void PageChunk::zap(Pages& p)
547  {
548  // try to mprotect the other bits of the pool with no access...
549  // we'd really like a version of mremap here that can unmap all the
550  // other pages in the chunk, but that does not exist, so we protect
551  // the other pages in this chunk such that they may neither be read,
552  // written nor executed, only the pages we're interested in for
553  // communications stay readable and writable
554  //
555  // if an OS does not support changing the protection of a part of an
556  // mmapped area, the mprotect calls below should just fail and not
557  // change any protection, so we're a little less safe against
558  // corruption, but everything should still work
559  if (Copy != s_mmapworks) {
560  unsigned char* p0 = reinterpret_cast<unsigned char*>(m_begin);
561  unsigned char* p1 = reinterpret_cast<unsigned char*>(p[0u]);
562  unsigned char* p2 = p1 + p.npages() * s_physpgsz;
563  unsigned char* p3 = reinterpret_cast<unsigned char*>(m_end);
564  if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565  if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
566  }
567  m_parent = 0;
568  m_freelist.clear();
569  m_nUsedGrp = 1;
570  p.m_pimpl->m_parent = 0;
571  m_begin = m_end = 0;
572  // commit suicide
573  delete this;
574  }
575 
576  PagePool::PagePool(unsigned nPgPerGroup) :
577  m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
578  {
579  // if logical and physical page size differ, we may have to adjust
580  // m_nPgPerGrp to make things fit
581  if (PageChunk::pagesize() != PageChunk::physPgSz()) {
582  const unsigned mult =
583  PageChunk::physPgSz() / PageChunk::pagesize();
584  const unsigned desired = nPgPerGroup * PageChunk::pagesize();
585  // round up to to next physical page boundary
586  const unsigned actual = mult *
587  (desired / mult + bool(desired % mult));
588  const unsigned newPgPerGrp = actual / PageChunk::pagesize();
589  if (BidirMMapPipe::debugflag()) {
590  std::cerr << " INFO: In " << __func__ << " (" <<
591  __FILE__ << ", line " << __LINE__ <<
592  "): physical page size " << PageChunk::physPgSz() <<
593  ", subdividing into logical pages of size " <<
594  PageChunk::pagesize() << ", adjusting nPgPerGroup " <<
595  m_nPgPerGrp << " -> " << newPgPerGrp <<
596  std::endl;
597  }
598  assert(newPgPerGrp >= m_nPgPerGrp);
599  m_nPgPerGrp = newPgPerGrp;
600  }
601  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
602  }
603 
604  PagePool::~PagePool()
605  {
606  m_freelist.clear();
607  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
608  delete *it;
609  m_chunks.clear();
610  }
611 
612  void PagePool::zap(Pages& p)
613  {
614  // unmap all pages but those pointed to by p
615  m_freelist.clear();
616  for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617  if ((*it)->contains(p)) {
618  (*it)->zap(p);
619  } else {
620  delete *it;
621  }
622  }
623  m_chunks.clear();
624  std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
625  m_cursz = minsz;
626  }
627 
628  Pages PagePool::pop()
629  {
630  if (m_freelist.empty()) {
631  // allocate and register new chunk and put it on the freelist
632  const int sz = nextChunkSz();
633  Chunk *c = new Chunk(this,
634  sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
635  m_chunks.push_front(c);
636  m_freelist.push_back(c);
637  updateCurSz(sz, +1);
638  }
639  // get free element from first chunk on _freelist
640  Chunk* c = m_freelist.front();
641  Pages p(c->pop());
642  // full chunks are removed from _freelist
643  if (c->full()) m_freelist.pop_front();
644  return p;
645  }
646 
647  void PagePool::release(PageChunk* chunk)
648  {
649  assert(chunk->empty());
650  // find chunk on freelist and remove
651  ChunkList::iterator it = std::find(
652  m_freelist.begin(), m_freelist.end(), chunk);
653  if (m_freelist.end() == it)
654  throw Exception("PagePool::release(PageChunk*)", EINVAL);
655  m_freelist.erase(it);
656  // find chunk in m_chunks and remove
657  it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658  if (m_chunks.end() == it)
659  throw Exception("PagePool::release(PageChunk*)", EINVAL);
660  m_chunks.erase(it);
661  const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
662  delete chunk;
663  updateCurSz(sz, -1);
664  }
665 
666  void PagePool::putOnFreeList(PageChunk* chunk)
667  {
668  assert(!chunk->full());
669  m_freelist.push_back(chunk);
670  }
671 
672  void PagePool::updateCurSz(int sz, int incr)
673  {
674  m_szmap[(sz - minsz) / szincr] += incr;
675  m_cursz = minsz;
676  for (int i = (maxsz - minsz) / szincr; i--; ) {
677  if (m_szmap[i]) {
678  m_cursz += i * szincr;
679  break;
680  }
681  }
682  }
683 
684  int PagePool::nextChunkSz() const
685  {
686  // no chunks with space available, figure out chunk size
687  int sz = m_cursz;
688  if (m_chunks.empty()) {
689  // if we start allocating chunks, we start from minsz
690  sz = minsz;
691  } else {
692  if (minsz >= sz) {
693  // minimal sized chunks are always grown
694  sz = minsz + szincr;
695  } else {
696  if (1 != m_chunks.size()) {
697  // if we have more than one completely filled chunk, grow
698  sz += szincr;
699  } else {
700  // just one chunk left, try shrinking chunk size
701  sz -= szincr;
702  }
703  }
704  }
705  // clamp size to allowed range
706  if (sz > maxsz) sz = maxsz;
707  if (sz < minsz) sz = minsz;
708  return sz;
709  }
710 }
711 
712 // static BidirMMapPipe members
713 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
714 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
715 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
716 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
717 int BidirMMapPipe::s_debugflag = 0;
718 
719 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
720 {
721  if (!s_pagepool)
722  s_pagepool = new BidirMMapPipe_impl::PagePool(TotPages);
723  return *s_pagepool;
724 }
725 
726 void BidirMMapPipe::teardownall(void)
727 {
728  pthread_mutex_lock(&s_openpipesmutex);
729  while (!s_openpipes.empty()) {
730  BidirMMapPipe *p = s_openpipes.front();
731  pthread_mutex_unlock(&s_openpipesmutex);
732  if (p->m_childPid) kill(p->m_childPid, SIGTERM);
733  p->doClose(true, true);
734  pthread_mutex_lock(&s_openpipesmutex);
735  }
736  pthread_mutex_unlock(&s_openpipesmutex);
737 }
738 
739 BidirMMapPipe::BidirMMapPipe(const BidirMMapPipe&) :
740  m_pages(pagepool().pop())
741 {
742  // free pages again
743  { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
744  if (!s_pagepoolrefcnt) {
745  delete s_pagepool;
746  s_pagepool = 0;
747  }
748 }
749 
750 BidirMMapPipe::BidirMMapPipe(bool useExceptions, bool useSocketpair) :
751  m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
752  m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
753  m_parentPid(::getpid())
754 
755 {
756  ++s_pagepoolrefcnt;
757  assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
758  int fds[4] = { -1, -1, -1, -1 };
759  int myerrno;
760  static bool firstcall = true;
761  if (useExceptions) m_flags |= exceptionsbit;
762 
763  try {
764  if (firstcall) {
765  firstcall = false;
766  // register a cleanup handler to make sure all BidirMMapPipes are torn
767  // down, and child processes are sent a SIGTERM
768  if (0 != atexit(BidirMMapPipe::teardownall))
769  throw Exception("atexit", errno);
770  }
771 
772  // build free lists
773  for (unsigned i = 1; i < TotPages; ++i)
774  m_pages[i - 1]->setNext(m_pages[i]);
775  m_pages[PagesPerEnd - 1]->setNext(0);
776  if (!useSocketpair) {
777  // create pipes
778  if (0 != ::pipe(&fds[0])) throw Exception("pipe", errno);
779  if (0 != ::pipe(&fds[2])) throw Exception("pipe", errno);
780  } else {
781  if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
782  throw Exception("socketpair", errno);
783  }
784  // fork the child
785  pthread_mutex_lock(&s_openpipesmutex);
786  char c;
787  switch ((m_childPid = ::fork())) {
788  case -1: // error in fork()
789  myerrno = errno;
790  pthread_mutex_unlock(&s_openpipesmutex);
791  m_childPid = 0;
792  throw Exception("fork", myerrno);
793  case 0: // child
794  // put the ends in the right place
795  if (-1 != fds[2]) {
796  // pair of pipes
797  if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
798  myerrno = errno;
799  pthread_mutex_unlock(&s_openpipesmutex);
800  throw Exception("close", myerrno);
801  }
802  fds[0] = fds[3] = -1;
803  m_outpipe = fds[1];
804  m_inpipe = fds[2];
805  } else {
806  // socket pair
807  if (-1 == ::close(fds[0])) {
808  myerrno = errno;
809  pthread_mutex_unlock(&s_openpipesmutex);
810  throw Exception("close", myerrno);
811  }
812  fds[0] = -1;
813  m_inpipe = m_outpipe = fds[1];
814  }
815  // close other pipes our parent may have open - we have no business
816  // reading from/writing to those...
817  for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
818  s_openpipes.end() != it; ) {
819  BidirMMapPipe* p = *it;
820  it = s_openpipes.erase(it);
821  p->doClose(true, true);
822  }
823  pagepool().zap(m_pages);
824  s_pagepoolrefcnt = 0;
825  delete s_pagepool;
826  s_pagepool = 0;
827  s_openpipes.push_front(this);
828  pthread_mutex_unlock(&s_openpipesmutex);
829  // ok, put our pages on freelist
830  m_freelist = m_pages[PagesPerEnd];
831  // handshare with other end (to make sure it's alive)...
832  c = 'C'; // ...hild
833  if (1 != xferraw(m_outpipe, &c, 1, ::write))
834  throw Exception("handshake: xferraw write", EPIPE);
835  if (1 != xferraw(m_inpipe, &c, 1, ::read))
836  throw Exception("handshake: xferraw read", EPIPE);
837  if ('P' != c) throw Exception("handshake", EPIPE);
838  break;
839  default: // parent
840  // put the ends in the right place
841  if (-1 != fds[2]) {
842  // pair of pipes
843  if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
844  myerrno = errno;
845  pthread_mutex_unlock(&s_openpipesmutex);
846  throw Exception("close", myerrno);
847  }
848  fds[1] = fds[2] = -1;
849  m_outpipe = fds[3];
850  m_inpipe = fds[0];
851  } else {
852  // socketpair
853  if (-1 == ::close(fds[1])) {
854  myerrno = errno;
855  pthread_mutex_unlock(&s_openpipesmutex);
856  throw Exception("close", myerrno);
857  }
858  fds[1] = -1;
859  m_inpipe = m_outpipe = fds[0];
860  }
861  // put on list of open pipes (so we can kill child processes
862  // if things go wrong)
863  s_openpipes.push_front(this);
864  pthread_mutex_unlock(&s_openpipesmutex);
865  // ok, put our pages on freelist
866  m_freelist = m_pages[0u];
867  // handshare with other end (to make sure it's alive)...
868  c = 'P'; // ...arent
869  if (1 != xferraw(m_outpipe, &c, 1, ::write))
870  throw Exception("handshake: xferraw write", EPIPE);
871  if (1 != xferraw(m_inpipe, &c, 1, ::read))
872  throw Exception("handshake: xferraw read", EPIPE);
873  if ('C' != c) throw Exception("handshake", EPIPE);
874  break;
875  }
876  // mark file descriptors for close on exec (we do not want to leak the
877  // connection to anything we happen to exec)
878  int fdflags = 0;
879  if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
880  throw Exception("fcntl", errno);
881  fdflags |= FD_CLOEXEC;
882  if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
883  throw Exception("fcntl", errno);
884  if (m_inpipe != m_outpipe) {
885  if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
886  throw Exception("fcntl", errno);
887  fdflags |= FD_CLOEXEC;
888  if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
889  throw Exception("fcntl", errno);
890  }
891  // ok, finally, clear the failbit
892  m_flags &= ~failbit;
893  // all done
894  } catch (BidirMMapPipe::Exception&) {
895  if (0 != m_childPid) kill(m_childPid, SIGTERM);
896  for (int i = 0; i < 4; ++i)
897  if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
898  {
899  // free resources associated with mmapped pages
900  BidirMMapPipe_impl::Pages p; p.swap(m_pages);
901  }
902  if (!--s_pagepoolrefcnt) {
903  delete s_pagepool;
904  s_pagepool = 0;
905  }
906  throw;
907  }
908 }
909 
910 int BidirMMapPipe::close()
911 {
912  assert(!(m_flags & failbit));
913  return doClose(false);
914 }
915 
916 int BidirMMapPipe::doClose(bool force, bool holdlock)
917 {
918  if (m_flags & failbit) return 0;
919  // flush data to be written
920  if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
921  // shut down the write direction (no more writes from our side)
922  if (m_inpipe == m_outpipe) {
923  if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
924  throw Exception("shutdown", errno);
925  m_outpipe = -1;
926  } else {
927  if (-1 != m_outpipe && -1 == ::close(m_outpipe))
928  if (!force) throw Exception("close", errno);
929  m_outpipe = -1;
930  }
931  // shut down the write direction (no more writes from our side)
932  // drain anything the other end might still want to send
933  if (!force && -1 != m_inpipe) {
934  // **************** THIS IS EXTREMELY UGLY: ****************
935  // POLLHUP is not set reliably on pipe/socket shutdown on all
936  // platforms, unfortunately, so we poll for readability here until
937  // the other end closes, too
938  //
939  // the read loop below ensures that the other end sees the POLLIN that
940  // is set on shutdown instead, and goes ahead to close its end
941  //
942  // if we don't do this, and close straight away, the other end
943  // will catch a SIGPIPE or similar, and we don't want that
944  int err;
945  struct pollfd fds;
946  fds.fd = m_inpipe;
947  fds.events = POLLIN;
948  fds.revents = 0;
949  do {
950  while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
951  if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) break;
952  if (fds.revents & POLLIN) {
953  char c;
954  if (1 > ::read(m_inpipe, &c, 1)) break;
955  }
956  }
957  } while (0 > err && EINTR == errno);
958  // ignore all other poll errors
959  }
960  // close read end
961  if (-1 != m_inpipe && -1 == ::close(m_inpipe))
962  if (!force) throw Exception("close", errno);
963  m_inpipe = -1;
964  // unmap memory
965  try {
966  { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
967  if (!--s_pagepoolrefcnt) {
968  delete s_pagepool;
969  s_pagepool = 0;
970  }
971  } catch (std::exception&) {
972  if (!force) throw;
973  }
974  m_busylist = m_freelist = m_dirtylist = 0;
975  // wait for child process
976  int retVal = 0;
977  if (isParent()) {
978  int tmp;
979  do {
980  tmp = waitpid(m_childPid, &retVal, 0);
981  } while (-1 == tmp && EINTR == errno);
982  if (-1 == tmp)
983  if (!force) throw Exception("waitpid", errno);
984  m_childPid = 0;
985  }
986  // remove from list of open pipes
987  if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
988  std::list<BidirMMapPipe*>::iterator it = std::find(
989  s_openpipes.begin(), s_openpipes.end(), this);
990  if (s_openpipes.end() != it) s_openpipes.erase(it);
991  if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
992  m_flags |= failbit;
993  return retVal;
994 }
995 
996 BidirMMapPipe::~BidirMMapPipe()
997 { doClose(false); }
998 
999 BidirMMapPipe::size_type BidirMMapPipe::xferraw(
1000  int fd, void* addr, size_type len,
1001  ssize_t (*xferfn)(int, void*, std::size_t))
1002 {
1003  size_type xferred = 0;
1004  unsigned char* buf = reinterpret_cast<unsigned char*>(addr);
1005  while (len) {
1006  ssize_t tmp = xferfn(fd, buf, len);
1007  if (tmp > 0) {
1008  xferred += tmp;
1009  len -= tmp;
1010  buf += tmp;
1011  continue;
1012  } else if (0 == tmp) {
1013  // check for end-of-file on pipe
1014  break;
1015  } else if (-1 == tmp) {
1016  // ok some error occurred, so figure out if we want to retry of throw
1017  switch (errno) {
1018  default:
1019  // if anything was transferred, return number of bytes
1020  // transferred so far, we can start throwing on the next
1021  // transfer...
1022  if (xferred) return xferred;
1023  // else throw
1024  throw Exception("xferraw", errno);
1025  case EAGAIN: // fallthrough intended
1026 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1027  case EWOULDBLOCK: // fallthrough intended
1028 #endif
1029  std::cerr << " ERROR: In " << __func__ << " (" <<
1030  __FILE__ << ", line " << __LINE__ <<
1031  "): expect transfer to block!" << std::endl;
1032  case EINTR:
1033  break;
1034  }
1035  continue;
1036  } else {
1037  throw Exception("xferraw: unexpected return value from read/write",
1038  errno);
1039  }
1040  }
1041  return xferred;
1042 }
1043 
1044 void BidirMMapPipe::sendpages(Page* plist)
1045 {
1046  if (plist) {
1047  unsigned char pg = m_pages[plist];
1048  if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1049  if (BidirMMapPipe_impl::PageChunk::Copy ==
1050  BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1051  // ok, have to copy pages through pipe
1052  for (Page* p = plist; p; p = p->next()) {
1053  if (sizeof(Page) + p->size() !=
1054  xferraw(m_outpipe, p, sizeof(Page) + p->size(),
1055  ::write)) {
1056  throw Exception("sendpages: short write", EPIPE);
1057  }
1058  }
1059  }
1060  } else {
1061  throw Exception("sendpages: short write", EPIPE);
1062  }
1063  } else { assert(plist); }
1064 }
1065 
1066 unsigned BidirMMapPipe::recvpages()
1067 {
1068  unsigned char pg;
1069  unsigned retVal = 0;
1070  Page *plisthead = 0, *plisttail = 0;
1071  if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1072  plisthead = plisttail = m_pages[pg];
1073  // ok, have number of pages
1074  if (BidirMMapPipe_impl::PageChunk::Copy ==
1075  BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1076  // ok, need to copy pages through pipe
1077  for (; plisttail; ++retVal) {
1078  Page* p = plisttail;
1079  if (sizeof(Page) == xferraw(m_inpipe, p, sizeof(Page),
1080  ::read)) {
1081  plisttail = p->next();
1082  if (!p->size()) continue;
1083  // break in case of read error
1084  if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1085  ::read)) break;
1086  }
1087  }
1088  } else {
1089  retVal = lenPageList(plisthead);
1090  }
1091  }
1092  // put list of pages we just received into correct lists (busy/free)
1093  if (plisthead) feedPageLists(plisthead);
1094  // ok, retVal contains the number of pages read, so put them on the
1095  // correct lists
1096  return retVal;
1097 }
1098 
1099 unsigned BidirMMapPipe::recvpages_nonblock()
1100 {
1101  struct pollfd fds;
1102  fds.fd = m_inpipe;
1103  fds.events = POLLIN;
1104  fds.revents = 0;
1105  unsigned retVal = 0;
1106  do {
1107  int rc = ::poll(&fds, 1, 0);
1108  if (0 > rc) {
1109  if (EINTR == errno) continue;
1110  break;
1111  }
1112  if (1 == retVal && fds.revents & POLLIN &&
1113  !(fds.revents & (POLLNVAL | POLLERR))) {
1114  // ok, we can read without blocking, so the other end has
1115  // something for us
1116  return recvpages();
1117  } else {
1118  break;
1119  }
1120  } while (true);
1121  return retVal;
1122 }
1123 
1124 unsigned BidirMMapPipe::lenPageList(const Page* p)
1125 {
1126  unsigned n = 0;
1127  for ( ; p; p = p->next()) ++n;
1128  return n;
1129 }
1130 
1131 void BidirMMapPipe::feedPageLists(Page* plist)
1132 {
1133  assert(plist);
1134  // get end of busy list
1135  Page *blend = m_busylist;
1136  while (blend && blend->next()) blend = blend->next();
1137  // ok, might have to send free pages to other end, and (if we do have to
1138  // send something to the other end) while we're at it, send any dirty
1139  // pages which are completely full, too
1140  Page *sendlisthead = 0, *sendlisttail = 0;
1141  // loop over plist
1142  while (plist) {
1143  Page* p = plist;
1144  plist = p->next();
1145  p->setNext(0);
1146  if (p->size()) {
1147  // busy page...
1148  p->pos() = 0;
1149  // put at end of busy list
1150  if (blend) blend->setNext(p);
1151  else m_busylist = p;
1152  blend = p;
1153  } else {
1154  // free page...
1155  // Very simple algorithm: once we're done with a page, we send it back
1156  // where it came from. If it's from our end, we put it on the free list, if
1157  // it's from the other end, we send it back.
1158  if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1159  (isChild() && m_pages[p] < PagesPerEnd)) {
1160  // page "belongs" to other end
1161  if (!sendlisthead) sendlisthead = p;
1162  if (sendlisttail) sendlisttail->setNext(p);
1163  sendlisttail = p;
1164  } else {
1165  // add page to freelist
1166  p->setNext(m_freelist);
1167  m_freelist = p;
1168  }
1169  }
1170  }
1171  // check if we have to send stuff to the other end
1172  if (sendlisthead) {
1173  // go through our list of dirty pages, and see what we can
1174  // send along
1175  Page* dp;
1176  while ((dp = m_dirtylist) && dp->full()) {
1177  Page* p = dp;
1178  // move head of dirty list
1179  m_dirtylist = p->next();
1180  // queue for sending
1181  p->setNext(0);
1182  sendlisttail->setNext(p);
1183  sendlisttail = p;
1184  }
1185  // poll if the other end is still alive - this needs that we first
1186  // close the write pipe of the other end when the remote end of the
1187  // connection is shutting down in doClose; we'll see that because we
1188  // get a POLLHUP on our inpipe
1189  const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1190  struct pollfd fds[2];
1191  fds[0].fd = m_outpipe;
1192  fds[0].events = fds[0].revents = 0;
1193  if (m_outpipe != m_inpipe) {
1194  fds[1].fd = m_inpipe;
1195  fds[1].events = fds[1].revents = 0;
1196  } else {
1197  fds[0].events |= POLLIN;
1198  }
1199  int retVal = 0;
1200  do {
1201  retVal = ::poll(fds, nfds, 0);
1202  if (0 > retVal && EINTR == errno)
1203  continue;
1204  break;
1205  } while (true);
1206  if (0 <= retVal) {
1207  bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208  if (m_outpipe != m_inpipe) {
1209  ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1210  } else {
1211  if (ok && fds[0].revents & POLLIN) {
1212  unsigned ret = recvpages();
1213  if (!ret) ok = false;
1214  }
1215  }
1216 
1217  if (ok) sendpages(sendlisthead);
1218  // (if the pipe is dead already, we don't care that we leak the
1219  // contents of the pages on the send list here, so that is why
1220  // there's no else clause here)
1221  } else {
1222  throw Exception("feedPageLists: poll", errno);
1223  }
1224  }
1225 }
1226 
1227 void BidirMMapPipe::markPageDirty(Page* p)
1228 {
1229  assert(p);
1230  assert(p == m_freelist);
1231  // remove from freelist
1232  m_freelist = p->next();
1233  p->setNext(0);
1234  // append to dirty list
1235  Page* dl = m_dirtylist;
1236  while (dl && dl->next()) dl = dl->next();
1237  if (dl) dl->setNext(p);
1238  else m_dirtylist = p;
1239 }
1240 
1241 BidirMMapPipe::Page* BidirMMapPipe::busypage()
1242 {
1243  // queue any pages available for reading we can without blocking
1244  recvpages_nonblock();
1245  Page* p;
1246  // if there are no busy pages, try to get them from the other end,
1247  // block if we have to...
1248  while (!(p = m_busylist)) if (!recvpages()) return 0;
1249  return p;
1250 }
1251 
1252 BidirMMapPipe::Page* BidirMMapPipe::dirtypage()
1253 {
1254  // queue any pages available for reading we can without blocking
1255  recvpages_nonblock();
1256  Page* p = m_dirtylist;
1257  // go to end of dirty list
1258  if (p) while (p->next()) p = p->next();
1259  if (!p || p->full()) {
1260  // need to append free page, so get one
1261  while (!(p = m_freelist)) if (!recvpages()) return 0;
1262  markPageDirty(p);
1263  }
1264  return p;
1265 }
1266 
1267 void BidirMMapPipe::flush()
1268 { return doFlush(true); }
1269 
1270 void BidirMMapPipe::doFlush(bool forcePartialPages)
1271 {
1272  assert(!(m_flags & failbit));
1273  // build a list of pages to flush
1274  Page *flushlisthead = 0, *flushlisttail = 0;
1275  while (m_dirtylist) {
1276  Page* p = m_dirtylist;
1277  if (!forcePartialPages && !p->full()) break;
1278  // remove dirty page from dirty list
1279  m_dirtylist = p->next();
1280  p->setNext(0);
1281  // and send it to other end
1282  if (!flushlisthead) flushlisthead = p;
1283  if (flushlisttail) flushlisttail->setNext(p);
1284  flushlisttail = p;
1285  }
1286  if (flushlisthead) sendpages(flushlisthead);
1287 }
1288 
1289 void BidirMMapPipe::purge()
1290 {
1291  assert(!(m_flags & failbit));
1292  // join busy and dirty lists
1293  {
1294  Page *l = m_busylist;
1295  while (l && l->next()) l = l->next();
1296  if (l) l->setNext(m_dirtylist);
1297  else m_busylist = m_dirtylist;
1298  }
1299  // empty busy and dirty pages
1300  for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1301  // put them on the free list
1302  if (m_busylist) feedPageLists(m_busylist);
1303  m_busylist = m_dirtylist = 0;
1304 }
1305 
1306 BidirMMapPipe::size_type BidirMMapPipe::bytesReadableNonBlocking()
1307 {
1308  // queue all pages waiting for consumption in the pipe before we give an
1309  // answer
1310  recvpages_nonblock();
1311  size_type retVal = 0;
1312  for (Page* p = m_busylist; p; p = p->next())
1313  retVal += p->size() - p->pos();
1314  return retVal;
1315 }
1316 
1317 BidirMMapPipe::size_type BidirMMapPipe::bytesWritableNonBlocking()
1318 {
1319  // queue all pages waiting for consumption in the pipe before we give an
1320  // answer
1321  recvpages_nonblock();
1322  // check if we could write to the pipe without blocking (we need to know
1323  // because we might need to check if flushing of dirty pages would block)
1324  bool couldwrite = false;
1325  {
1326  struct pollfd fds;
1327  fds.fd = m_outpipe;
1328  fds.events = POLLOUT;
1329  fds.revents = 0;
1330  int retVal = 0;
1331  do {
1332  retVal = ::poll(&fds, 1, 0);
1333  if (0 > retVal) {
1334  if (EINTR == errno) continue;
1335  throw Exception("bytesWritableNonBlocking: poll", errno);
1336  }
1337  if (1 == retVal && fds.revents & POLLOUT &&
1338  !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1339  couldwrite = true;
1340  break;
1341  } while (true);
1342  }
1343  // ok, start counting bytes
1344  size_type retVal = 0;
1345  unsigned npages = 0;
1346  // go through the dirty list
1347  for (Page* p = m_dirtylist; p; p = p->next()) {
1348  ++npages;
1349  // if page only partially filled
1350  if (!p->full())
1351  retVal += p->free();
1352  if (npages >= FlushThresh && !couldwrite) break;
1353  }
1354  // go through the free list
1355  for (Page* p = m_freelist; p && (!m_dirtylist ||
1356  npages < FlushThresh || couldwrite); p = p->next()) {
1357  ++npages;
1358  retVal += Page::capacity();
1359  }
1360  return retVal;
1361 }
1362 
1363 BidirMMapPipe::size_type BidirMMapPipe::read(void* addr, size_type sz)
1364 {
1365  assert(!(m_flags & failbit));
1366  size_type nread = 0;
1367  unsigned char *ap = reinterpret_cast<unsigned char*>(addr);
1368  try {
1369  while (sz) {
1370  // find next page to read from
1371  Page* p = busypage();
1372  if (!p) {
1373  m_flags |= eofbit;
1374  return nread;
1375  }
1376  unsigned char* pp = p->begin() + p->pos();
1377  size_type csz = std::min(size_type(p->remaining()), sz);
1378  std::copy(pp, pp + csz, ap);
1379  nread += csz;
1380  ap += csz;
1381  sz -= csz;
1382  p->pos() += csz;
1383  assert(p->size() >= p->pos());
1384  if (p->size() == p->pos()) {
1385  // if no unread data remains, page is free
1386  m_busylist = p->next();
1387  p->setNext(0);
1388  p->size() = 0;
1389  feedPageLists(p);
1390  }
1391  }
1392  } catch (Exception&) {
1393  m_flags |= rderrbit;
1394  if (m_flags & exceptionsbit) throw;
1395  }
1396  return nread;
1397 }
1398 
1399 BidirMMapPipe::size_type BidirMMapPipe::write(const void* addr, size_type sz)
1400 {
1401  assert(!(m_flags & failbit));
1402  size_type written = 0;
1403  const unsigned char *ap = reinterpret_cast<const unsigned char*>(addr);
1404  try {
1405  while (sz) {
1406  // find next page to write to
1407  Page* p = dirtypage();
1408  if (!p) {
1409  m_flags |= eofbit;
1410  return written;
1411  }
1412  unsigned char* pp = p->begin() + p->size();
1413  size_type csz = std::min(size_type(p->free()), sz);
1414  std::copy(ap, ap + csz, pp);
1415  written += csz;
1416  ap += csz;
1417  p->size() += csz;
1418  sz -= csz;
1419  assert(p->capacity() >= p->size());
1420  if (p->full()) {
1421  // if page is full, see if we're above the flush threshold of
1422  // 3/4 of our pages
1423  if (lenPageList(m_dirtylist) >= FlushThresh)
1424  doFlush(false);
1425  }
1426  }
1427  } catch (Exception&) {
1428  m_flags |= wrerrbit;
1429  if (m_flags & exceptionsbit) throw;
1430  }
1431  return written;
1432 }
1433 
1434 int BidirMMapPipe::poll(BidirMMapPipe::PollVector& pipes, int timeout)
1435 {
1436  // go through pipes, and change flags where we already know without really
1437  // polling - stuff where we don't need poll to wait for its timeout in the
1438  // OS...
1439  bool canskiptimeout = false;
1440  std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1441  std::vector<unsigned>::iterator mit = masks.begin();
1442  for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1443  ++it, ++mit) {
1444  PollEntry& pe = *it;
1445  pe.revents = None;
1446  // null pipe pointer or closed pipe is invalid
1447  if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1448  // check for error
1449  if (pe.pipe->bad()) pe.revents |= Error;
1450  // check for end of file
1451  if (pe.pipe->eof()) pe.revents |= EndOfFile;
1452  // check if readable
1453  if (pe.events & Readable) {
1454  *mit |= Readable;
1455  if (pe.pipe->m_busylist) pe.revents |= Readable;
1456  }
1457  // check if writable
1458  if (pe.events & Writable) {
1459  *mit |= Writable;
1460  if (pe.pipe->m_freelist) {
1461  pe.revents |= Writable;
1462  } else {
1463  Page *dl = pe.pipe->m_dirtylist;
1464  while (dl && dl->next()) dl = dl->next();
1465  if (dl && dl->pos() < Page::capacity())
1466  pe.revents |= Writable;
1467  }
1468  }
1469  if (pe.revents) canskiptimeout = true;
1470  }
1471  // set up the data structures required for the poll syscall
1472  std::vector<pollfd> fds;
1473  fds.reserve(2 * pipes.size());
1474  std::map<int, PollEntry*> fds2pipes;
1475  for (PollVector::const_iterator it = pipes.begin();
1476  pipes.end() != it; ++it) {
1477  const PollEntry& pe = *it;
1478  struct pollfd tmp;
1479  fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1480  const_cast<PollEntry*>(&pe)));
1481  tmp.events = tmp.revents = 0;
1482  // we always poll for readability; this allows us to queue pages
1483  // early
1484  tmp.events |= POLLIN;
1485  if (pe.pipe->m_outpipe != tmp.fd) {
1486  // ok, it's a pair of pipes
1487  fds.push_back(tmp);
1488  fds2pipes.insert(std::make_pair(
1489  unsigned(tmp.fd = pe.pipe->m_outpipe),
1490  const_cast<PollEntry*>(&pe)));
1491  tmp.events = 0;
1492 
1493  }
1494  if (pe.events & Writable) tmp.events |= POLLOUT;
1495  fds.push_back(tmp);
1496  }
1497  // poll
1498  int retVal = 0;
1499  do {
1500  retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1501  if (0 > retVal) {
1502  if (EINTR == errno) continue;
1503  throw Exception("poll", errno);
1504  }
1505  break;
1506  } while (true);
1507  // fds may have changed state, so update...
1508  for (std::vector<pollfd>::iterator it = fds.begin();
1509  fds.end() != it; ++it) {
1510  pollfd& fe = *it;
1511  //if (!fe.revents) continue;
1512  --retVal;
1513  PollEntry& pe = *fds2pipes[fe.fd];
1514 oncemore:
1515  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1516  pe.revents |= ReadInvalid;
1517  if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1518  pe.revents |= WriteInvalid;
1519  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1520  pe.revents |= ReadError;
1521  if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1522  pe.revents |= WriteError;
1523  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1524  pe.revents |= ReadEndOfFile;
1525  if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1526  pe.revents |= WriteEndOfFile;
1527  if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1528  !(fe.revents & (POLLNVAL | POLLERR))) {
1529  // ok, there is at least one page for us to receive from the
1530  // other end
1531  if (0 == pe.pipe->recvpages()) continue;
1532  // more pages there?
1533  do {
1534  int tmp = ::poll(&fe, 1, 0);
1535  if (tmp > 0) goto oncemore; // yippie! I don't even feel bad!
1536  if (0 > tmp) {
1537  if (EINTR == errno) continue;
1538  throw Exception("poll", errno);
1539  }
1540  break;
1541  } while (true);
1542  }
1543  if (pe.pipe->m_busylist) pe.revents |= Readable;
1544  if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1545  if (pe.pipe->m_freelist) {
1546  pe.revents |= Writable;
1547  } else {
1548  Page *dl = pe.pipe->m_dirtylist;
1549  while (dl && dl->next()) dl = dl->next();
1550  if (dl && dl->pos() < Page::capacity())
1551  pe.revents |= Writable;
1552  }
1553  }
1554  }
1555  // apply correct masks, and count pipes with pending events
1556  int npipes = 0;
1557  mit = masks.begin();
1558  for (PollVector::iterator it = pipes.begin();
1559  pipes.end() != it; ++it, ++mit)
1560  if ((it->revents &= *mit)) ++npipes;
1561  return npipes;
1562 }
1563 
1564 BidirMMapPipe& BidirMMapPipe::operator<<(const char* str)
1565 {
1566  size_t sz = std::strlen(str);
1567  *this << sz;
1568  if (sz) write(str, sz);
1569  return *this;
1570 }
1571 
1572 BidirMMapPipe& BidirMMapPipe::operator>>(char* (&str))
1573 {
1574  size_t sz = 0;
1575  *this >> sz;
1576  if (good() && !eof()) {
1577  str = reinterpret_cast<char*>(std::realloc(str, sz + 1));
1578  if (!str) throw Exception("realloc", errno);
1579  if (sz) read(str, sz);
1580  str[sz] = 0;
1581  }
1582  return *this;
1583 }
1584 
1585 BidirMMapPipe& BidirMMapPipe::operator<<(const std::string& str)
1586 {
1587  size_t sz = str.size();
1588  *this << sz;
1589  write(str.data(), sz);
1590  return *this;
1591 }
1592 
1593 BidirMMapPipe& BidirMMapPipe::operator>>(std::string& str)
1594 {
1595  str.clear();
1596  size_t sz = 0;
1597  *this >> sz;
1598  if (good() && !eof()) {
1599  str.reserve(sz);
1600  for (unsigned char c; sz--; str.push_back(c)) *this >> c;
1601  }
1602  return *this;
1603 }
1604 
1605 END_NAMESPACE_ROOFIT
1606 
1607 #ifdef TEST_BIDIRMMAPPIPE
1608 using namespace RooFit;
1609 
1610 int simplechild(BidirMMapPipe& pipe)
1611 {
1612  // child does an echo loop
1613  while (pipe.good() && !pipe.eof()) {
1614  // read a string
1615  std::string str;
1616  pipe >> str;
1617  if (!pipe) return -1;
1618  if (pipe.eof()) break;
1619  if (!str.empty()) {
1620  std::cout << "[CHILD] : read: " << str << std::endl;
1621  str = "... early in the morning?";
1622  }
1623  pipe << str << BidirMMapPipe::flush;
1624  // did our parent tell us to shut down?
1625  if (str.empty()) break;
1626  if (!pipe) return -1;
1627  if (pipe.eof()) break;
1628  std::cout << "[CHILD] : wrote: " << str << std::endl;
1629  }
1630  pipe.close();
1631  return 0;
1632 }
1633 
1634 #include <sstream>
1635 int randomchild(BidirMMapPipe& pipe)
1636 {
1637  // child sends out something at random intervals
1638  ::srand48(::getpid());
1639  {
1640  // wait for parent's go ahead signal
1641  std::string s;
1642  pipe >> s;
1643  }
1644  // no shutdown sequence needed on this side - we're producing the data,
1645  // and the parent can just read until we're done (when it'll get EOF)
1646  for (int i = 0; i < 5; ++i) {
1647  // sleep a random time between 0 and .9 seconds
1648  ::usleep(int(1e6 * ::drand48()));
1649  std::ostringstream buf;
1650  buf << "child pid " << ::getpid() << " sends message " << i;
1651  std::string str = buf.str();
1652  std::cout << "[CHILD] : " << str << std::endl;
1653  pipe << str << BidirMMapPipe::flush;
1654  if (!pipe) return -1;
1655  if (pipe.eof()) break;
1656  }
1657  // tell parent we're shutting down
1658  pipe << "" << BidirMMapPipe::flush;
1659  // wait for parent to acknowledge
1660  std::string s;
1661  pipe >> s;
1662  pipe.close();
1663  return 0;
1664 }
1665 
1666 int benchchildrtt(BidirMMapPipe& pipe)
1667 {
1668  // child does the equivalent of listening for pings and sending the
1669  // packet back
1670  char* str = 0;
1671  while (pipe && !pipe.eof()) {
1672  pipe >> str;
1673  if (!pipe) {
1674  std::free(str);
1675  pipe.close();
1676  return -1;
1677  }
1678  if (pipe.eof()) break;
1679  pipe << str << BidirMMapPipe::flush;
1680  // if we have just completed the shutdown handshake, we break here
1681  if (!std::strlen(str)) break;
1682  }
1683  std::free(str);
1684  pipe.close();
1685  return 0;
1686 }
1687 
1688 int benchchildsink(BidirMMapPipe& pipe)
1689 {
1690  // child behaves like a sink
1691  char* str = 0;
1692  while (pipe && !pipe.eof()) {
1693  pipe >> str;
1694  if (!std::strlen(str)) break;
1695  }
1696  pipe << "" << BidirMMapPipe::flush;
1697  std::free(str);
1698  pipe.close();
1699  return 0;
1700 }
1701 
1702 int benchchildsource(BidirMMapPipe& pipe)
1703 {
1704  // child behaves like a source
1705  char* str = 0;
1706  for (unsigned i = 0; i <= 24; ++i) {
1707  str = reinterpret_cast<char*>(std::realloc(str, (1 << i) + 1));
1708  std::memset(str, '4', 1 << i);
1709  str[1 << i] = 0;
1710  for (unsigned j = 0; j < 1 << 7; ++j) {
1711  pipe << str;
1712  if (!pipe || pipe.eof()) {
1713  std::free(str);
1714  pipe.close();
1715  return -1;
1716  }
1717  }
1718  // tell parent we're done with this block size
1719  pipe << "" << BidirMMapPipe::flush;
1720  }
1721  // tell parent to shut down
1722  pipe << "" << BidirMMapPipe::flush;
1723  std::free(str);
1724  pipe.close();
1725  return 0;
1726 }
1727 
1728 BidirMMapPipe* spawnChild(int (*childexec)(BidirMMapPipe&))
1729 {
1730  // create a pipe with the given child at the remote end
1731  BidirMMapPipe *p = new BidirMMapPipe();
1732  if (p->isChild()) {
1733  int retVal = childexec(*p);
1734  delete p;
1735  std::exit(retVal);
1736  }
1737  return p;
1738 }
1739 
1740 #include <sys/time.h>
1741 #include <iomanip>
1742 int main()
1743 {
1744  // simple echo loop test
1745  {
1746  std::cout << "[PARENT]: simple challenge-response test, "
1747  "one child:" << std::endl;
1748  BidirMMapPipe* pipe = spawnChild(simplechild);
1749  for (int i = 0; i < 5; ++i) {
1750  std::string str("What shall we do with a drunken sailor...");
1751  *pipe << str << BidirMMapPipe::flush;
1752  if (!*pipe) return -1;
1753  std::cout << "[PARENT]: wrote: " << str << std::endl;
1754  *pipe >> str;
1755  if (!*pipe) return -1;
1756  std::cout << "[PARENT]: read: " << str << std::endl;
1757  }
1758  // send shutdown string
1759  *pipe << "" << BidirMMapPipe::flush;
1760  // wait for shutdown handshake
1761  std::string s;
1762  *pipe >> s;
1763  int retVal = pipe->close();
1764  std::cout << "[PARENT]: exit status of child: " << retVal <<
1765  std::endl;
1766  if (retVal) return retVal;
1767  delete pipe;
1768  }
1769  // simple poll test - children send 5 results in random intervals
1770  {
1771  unsigned nch = 20;
1772  std::cout << std::endl << "[PARENT]: polling test, " << nch <<
1773  " children:" << std::endl;
1774  typedef BidirMMapPipe::PollEntry PollEntry;
1775  // poll data structure
1776  BidirMMapPipe::PollVector pipes;
1777  pipes.reserve(nch);
1778  // spawn children
1779  for (unsigned i = 0; i < nch; ++i) {
1780  std::cout << "[PARENT]: spawning child " << i << std::endl;
1781  pipes.push_back(PollEntry(spawnChild(randomchild),
1782  BidirMMapPipe::Readable));
1783  }
1784  // wake children up
1785  std::cout << "[PARENT]: waking up children" << std::endl;
1786  for (unsigned i = 0; i < nch; ++i)
1787  *pipes[i].pipe << "" << BidirMMapPipe::flush;
1788  std::cout << "[PARENT]: waiting for events on children's pipes" << std::endl;
1789  // while at least some children alive
1790  while (!pipes.empty()) {
1791  // poll, wait until status change (infinite timeout)
1792  int npipes = BidirMMapPipe::poll(pipes, -1);
1793  // scan for pipes with changed status
1794  for (std::vector<PollEntry>::iterator it = pipes.begin();
1795  npipes && pipes.end() != it; ) {
1796  if (!it->revents) {
1797  // unchanged, next one
1798  ++it;
1799  continue;
1800  }
1801  --npipes; // maybe we can stop early...
1802  // read from pipes which are readable
1803  if (it->revents & BidirMMapPipe::Readable) {
1804  std::string s;
1805  *(it->pipe) >> s;
1806  if (!s.empty()) {
1807  std::cout << "[PARENT]: Read from pipe " << it->pipe <<
1808  ": " << s << std::endl;
1809  ++it;
1810  continue;
1811  } else {
1812  // child is shutting down...
1813  *(it->pipe) << "" << BidirMMapPipe::flush;
1814  goto childcloses;
1815  }
1816  }
1817  // retire pipes with error or end-of-file condition
1818  if (it->revents & (BidirMMapPipe::Error |
1819  BidirMMapPipe::EndOfFile |
1820  BidirMMapPipe::Invalid)) {
1821  std::cerr << "[DEBUG]: Event on pipe " << it->pipe <<
1822  " revents" <<
1823  ((it->revents & BidirMMapPipe::Readable) ? " Readable" : "") <<
1824  ((it->revents & BidirMMapPipe::Writable) ? " Writable" : "") <<
1825  ((it->revents & BidirMMapPipe::ReadError) ? " ReadError" : "") <<
1826  ((it->revents & BidirMMapPipe::WriteError) ? " WriteError" : "") <<
1827  ((it->revents & BidirMMapPipe::ReadEndOfFile) ? " ReadEndOfFile" : "") <<
1828  ((it->revents & BidirMMapPipe::WriteEndOfFile) ? " WriteEndOfFile" : "") <<
1829  ((it->revents & BidirMMapPipe::ReadInvalid) ? " ReadInvalid" : "") <<
1830  ((it->revents & BidirMMapPipe::WriteInvalid) ? " WriteInvalid" : "") <<
1831  std::endl;
1832 childcloses:
1833  int retVal = it->pipe->close();
1834  std::cout << "[PARENT]: child exit status: " <<
1835  retVal << ", number of children still alive: " <<
1836  (pipes.size() - 1) << std::endl;
1837  if (retVal) return retVal;
1838  delete it->pipe;
1839  it = pipes.erase(it);
1840  continue;
1841  }
1842  }
1843  }
1844  }
1845  // little benchmark - round trip time
1846  {
1847  std::cout << std::endl << "[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848  for (unsigned i = 0; i <= 24; ++i) {
1849  char *s = new char[1 + (1 << i)];
1850  std::memset(s, 'A', 1 << i);
1851  s[1 << i] = 0;
1852  const unsigned n = 1 << 7;
1853  double avg = 0., min = 1e42, max = -1e42;
1854  BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1855  for (unsigned j = n; j--; ) {
1856  struct timeval t1;
1857  ::gettimeofday(&t1, 0);
1858  *pipe << s << BidirMMapPipe::flush;
1859  if (!*pipe || pipe->eof()) break;
1860  *pipe >> s;
1861  if (!*pipe || pipe->eof()) break;
1862  struct timeval t2;
1863  ::gettimeofday(&t2, 0);
1864  t2.tv_sec -= t1.tv_sec;
1865  t2.tv_usec -= t1.tv_usec;
1866  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867  if (dt < min) min = dt;
1868  if (dt > max) max = dt;
1869  avg += dt;
1870  }
1871  // send a shutdown string
1872  *pipe << "" << BidirMMapPipe::flush;
1873  // get child's shutdown ok
1874  *pipe >> s;
1875  avg /= double(n);
1876  avg *= 1e6; min *= 1e6; max *= 1e6;
1877  int retVal = pipe->close();
1878  if (retVal) {
1879  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1880  delete[] s;
1881  return retVal;
1882  }
1883  delete pipe;
1884  // there is a factor 2 in the formula for the transfer rate below,
1885  // because we transfer data of twice the size of the block - once
1886  // to the child, and once for the return trip
1887  std::cout << "block size " << std::setw(9) << (1 << i) <<
1888  " avg " << std::setw(7) << avg << " us min " <<
1889  std::setw(7) << min << " us max " << std::setw(7) << max <<
1890  "us speed " << std::setw(9) <<
1891  2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1892  " MB/s" << std::endl;
1893  delete[] s;
1894  }
1895  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1896  }
1897  // little benchmark - child as sink
1898  {
1899  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1900  for (unsigned i = 0; i <= 24; ++i) {
1901  char *s = new char[1 + (1 << i)];
1902  std::memset(s, 'A', 1 << i);
1903  s[1 << i] = 0;
1904  const unsigned n = 1 << 7;
1905  double avg = 0., min = 1e42, max = -1e42;
1906  BidirMMapPipe *pipe = spawnChild(benchchildsink);
1907  for (unsigned j = n; j--; ) {
1908  struct timeval t1;
1909  ::gettimeofday(&t1, 0);
1910  // streaming mode - we do not flush here
1911  *pipe << s;
1912  if (!*pipe || pipe->eof()) break;
1913  struct timeval t2;
1914  ::gettimeofday(&t2, 0);
1915  t2.tv_sec -= t1.tv_sec;
1916  t2.tv_usec -= t1.tv_usec;
1917  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1918  if (dt < min) min = dt;
1919  if (dt > max) max = dt;
1920  avg += dt;
1921  }
1922  // send a shutdown string
1923  *pipe << "" << BidirMMapPipe::flush;
1924  // get child's shutdown ok
1925  *pipe >> s;
1926  avg /= double(n);
1927  avg *= 1e6; min *= 1e6; max *= 1e6;
1928  int retVal = pipe->close();
1929  if (retVal) {
1930  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1931  return retVal;
1932  }
1933  delete pipe;
1934  std::cout << "block size " << std::setw(9) << (1 << i) <<
1935  " avg " << std::setw(7) << avg << " us min " <<
1936  std::setw(7) << min << " us max " << std::setw(7) << max <<
1937  "us speed " << std::setw(9) <<
1938  (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1939  " MB/s" << std::endl;
1940  delete[] s;
1941  }
1942  std::cout << "[PARENT]: all children had exit code 0" << std::endl;
1943  }
1944  // little benchmark - child as source
1945  {
1946  std::cout << std::endl << "[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1947  char *s = 0;
1948  double avg = 0., min = 1e42, max = -1e42;
1949  unsigned n = 0, bsz = 0;
1950  BidirMMapPipe *pipe = spawnChild(benchchildsource);
1951  while (*pipe && !pipe->eof()) {
1952  struct timeval t1;
1953  ::gettimeofday(&t1, 0);
1954  // streaming mode - we do not flush here
1955  *pipe >> s;
1956  if (!*pipe || pipe->eof()) break;
1957  struct timeval t2;
1958  ::gettimeofday(&t2, 0);
1959  t2.tv_sec -= t1.tv_sec;
1960  t2.tv_usec -= t1.tv_usec;
1961  double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1962  if (std::strlen(s)) {
1963  ++n;
1964  if (dt < min) min = dt;
1965  if (dt > max) max = dt;
1966  avg += dt;
1967  bsz = std::strlen(s);
1968  } else {
1969  if (!n) break;
1970  // next block size
1971  avg /= double(n);
1972  avg *= 1e6; min *= 1e6; max *= 1e6;
1973 
1974  std::cout << "block size " << std::setw(9) << bsz <<
1975  " avg " << std::setw(7) << avg << " us min " <<
1976  std::setw(7) << min << " us max " << std::setw(7) <<
1977  max << "us speed " << std::setw(9) <<
1978  (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1979  " MB/s" << std::endl;
1980  n = 0;
1981  avg = 0.;
1982  min = 1e42;
1983  max = -1e42;
1984  }
1985  }
1986  int retVal = pipe->close();
1987  std::cout << "[PARENT]: child exited with code " << retVal << std::endl;
1988  if (retVal) return retVal;
1989  delete pipe;
1990  std::free(s);
1991  }
1992  return 0;
1993 }
1994 #endif // TEST_BIDIRMMAPPIPE
1995 #endif // _WIN32
1996 
1997 // vim: ft=cpp:sw=4:tw=78:et