30 #include <sys/socket.h>
34 #define BEGIN_NAMESPACE_ROOFIT namespace RooFit {
35 #define END_NAMESPACE_ROOFIT }
37 BEGIN_NAMESPACE_ROOFIT
40 namespace BidirMMapPipe_impl {
46 class BidirMMapPipeException :
public std::exception
55 static int dostrerror_r(
int err,
char* buf, std::size_t sz,
56 int (*f)(
int,
char*, std::size_t))
57 {
return f(err, buf, sz); }
59 static int dostrerror_r(
int,
char*, std::size_t,
60 char* (*f)(
int,
char*, std::size_t));
63 BidirMMapPipeException(
const char* msg,
int err);
65 virtual const char* what() const noexcept {
return m_buf; }
68 BidirMMapPipeException::BidirMMapPipeException(
const char* msg,
int err)
70 std::size_t msgsz = std::strlen(msg);
72 msgsz = std::min(msgsz, std::size_t(s_sz));
73 std::copy(msg, msg + msgsz, m_buf);
74 if (msgsz < s_sz) { m_buf[msgsz] =
':'; ++msgsz; }
75 if (msgsz < s_sz) { m_buf[msgsz] =
' '; ++msgsz; }
80 dostrerror_r(err, &m_buf[msgsz], s_sz - msgsz, ::strerror_r);
85 int BidirMMapPipeException::dostrerror_r(
int err,
char* buf,
86 std::size_t sz,
char* (*f)(
int,
char*, std::size_t))
89 char *tmp = f(err, buf, sz);
90 if (tmp && tmp != buf) {
91 std::strncpy(buf, tmp, sz);
93 if (std::strlen(tmp) > sz - 1)
return ERANGE;
113 unsigned short m_size;
114 unsigned short m_pos;
118 Page& operator=(
const Page&) =
delete;
121 Page() : m_next(0), m_size(0), m_pos(0)
125 assert(std::numeric_limits<unsigned short>::max() >=
126 PageChunk::pagesize());
129 void setNext(
const Page* p);
133 unsigned short& size() {
return m_size; }
135 unsigned size()
const {
return m_size; }
137 unsigned short& pos() {
return m_pos; }
139 unsigned pos()
const {
return m_pos; }
141 inline unsigned char* begin()
const
142 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
145 inline unsigned char* end()
const
146 {
return reinterpret_cast<unsigned char*
>(
const_cast<Page*
>(
this))
147 + PageChunk::pagesize(); }
149 static unsigned capacity()
150 {
return PageChunk::pagesize() -
sizeof(Page); }
152 bool empty()
const {
return !m_size; }
154 bool filled()
const {
return !empty(); }
156 unsigned free()
const {
return capacity() - m_size; }
158 unsigned remaining()
const {
return m_size - m_pos; }
160 bool full()
const {
return !free(); }
163 void Page::setNext(
const Page* p)
168 const char* p1 =
reinterpret_cast<char*
>(
this);
169 const char* p2 =
reinterpret_cast<const char*
>(p);
170 std::ptrdiff_t tmp = p2 - p1;
172 assert(!(tmp % PageChunk::pagesize()));
173 tmp /=
static_cast<std::ptrdiff_t
>(PageChunk::pagesize());
176 assert(m_next == tmp);
182 Page* Page::next()
const
184 if (!m_next)
return 0;
185 char* ptmp =
reinterpret_cast<char*
>(
const_cast<Page*
>(
this));
186 ptmp += std::ptrdiff_t(m_next) * PageChunk::pagesize();
187 return reinterpret_cast<Page*
>(ptmp);
210 typedef BidirMMapPipeException Exception;
218 typedef BidirMMapPipe_impl::PageChunk Chunk;
220 typedef std::list<Chunk*> ChunkList;
222 friend class BidirMMapPipe_impl::PageChunk;
225 typedef PageChunk::MMapVariety MMapVariety;
227 PagePool(
unsigned nPagesPerGroup);
234 static unsigned pagesize() {
return PageChunk::pagesize(); }
236 static MMapVariety mmapVariety()
237 {
return PageChunk::mmapVariety(); }
240 unsigned nPagesPerGroup()
const {
return m_nPgPerGrp; }
249 ChunkList m_freelist;
251 unsigned m_szmap[(maxsz - minsz) / szincr];
255 unsigned m_nPgPerGrp;
258 void updateCurSz(
int sz,
int incr);
260 int nextChunkSz()
const;
262 void putOnFreeList(Chunk* chunk);
264 void release(Chunk* chunk);
267 Pages::Pages(PageChunk* parent, Page* pages,
unsigned npg) :
271 m_pimpl->m_parent = parent;
272 m_pimpl->m_pages = pages;
273 m_pimpl->m_refcnt = 1;
274 m_pimpl->m_npages = npg;
276 for (
unsigned i = 0; i < m_pimpl->m_npages; ++i)
new(page(i)) Page();
279 unsigned PageChunk::s_physpgsz = PageChunk::getPageSize();
280 unsigned PageChunk::s_pagesize = std::min(PageChunk::s_physpgsz, 16384u);
281 PageChunk::MMapVariety PageChunk::s_mmapworks = PageChunk::Unknown;
285 if (m_pimpl && !--(m_pimpl->m_refcnt)) {
286 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*
this);
291 Pages::Pages(
const Pages& other) :
292 m_pimpl(other.m_pimpl)
293 { ++(m_pimpl->m_refcnt); }
295 Pages& Pages::operator=(
const Pages& other)
297 if (&other ==
this)
return *
this;
298 if (--(m_pimpl->m_refcnt)) {
299 if (m_pimpl->m_parent) m_pimpl->m_parent->push(*
this);
302 m_pimpl = other.m_pimpl;
303 ++(m_pimpl->m_refcnt);
307 unsigned Pages::pagesize() {
return PageChunk::pagesize(); }
309 Page* Pages::page(
unsigned pgno)
const
311 assert(pgno < m_pimpl->m_npages);
312 unsigned char* pptr =
313 reinterpret_cast<unsigned char*
>(m_pimpl->m_pages);
314 pptr += pgno * pagesize();
315 return reinterpret_cast<Page*
>(pptr);
318 unsigned Pages::pageno(Page* p)
const
320 const unsigned char* pptr =
321 reinterpret_cast<const unsigned char*
>(p);
322 const unsigned char* bptr =
323 reinterpret_cast<const unsigned char*
>(m_pimpl->m_pages);
324 assert(0 == ((pptr - bptr) % pagesize()));
325 const unsigned nr = (pptr - bptr) / pagesize();
326 assert(nr < m_pimpl->m_npages);
330 unsigned PageChunk::getPageSize()
333 long pgsz = sysconf(_SC_PAGESIZE);
334 if (-1 == pgsz)
throw Exception(
"sysconf", errno);
335 if (pgsz > 512 && pgsz >
long(
sizeof(Page)))
344 PageChunk::PageChunk(PagePool* parent,
345 unsigned length,
unsigned nPgPerGroup) :
346 m_begin(dommap(length)),
347 m_end(reinterpret_cast<void*>(
348 reinterpret_cast<unsigned char*>(m_begin) + length)),
349 m_parent(parent), m_nPgPerGrp(nPgPerGroup), m_nUsedGrp(0)
352 unsigned char* p =
reinterpret_cast<unsigned char*
>(m_begin);
353 unsigned char* pend =
reinterpret_cast<unsigned char*
>(m_end);
355 m_freelist.push_back(reinterpret_cast<void*>(p));
356 p += nPgPerGroup * PagePool::pagesize();
360 PageChunk::~PageChunk()
362 if (m_parent) assert(empty());
363 if (m_begin) domunmap(m_begin, len());
366 bool PageChunk::contains(
const Pages& p)
const
367 {
return p.m_pimpl->m_parent ==
this; }
369 Pages PageChunk::pop()
371 assert(!m_freelist.empty());
372 void* p = m_freelist.front();
373 m_freelist.pop_front();
375 return Pages(
this, reinterpret_cast<Page*>(p), m_nPgPerGrp);
378 void PageChunk::push(
const Pages& p)
381 bool wasempty = m_freelist.empty();
382 m_freelist.push_front(reinterpret_cast<void*>(p[0u]));
386 if (wasempty) m_parent->putOnFreeList(
this);
388 if (empty())
return m_parent->release(
this);
392 void* PageChunk::dommap(
unsigned len)
394 assert(len && 0 == (len % s_physpgsz));
406 static bool msgprinted =
false;
407 if (Anonymous == s_mmapworks || Unknown == s_mmapworks) {
408 #if defined(MAP_ANONYMOUS)
410 #define MYANONFLAG MAP_ANONYMOUS
411 #elif defined(MAP_ANON)
413 #define MYANONFLAG MAP_ANON
418 void* retVal = ::mmap(0, len, PROT_READ | PROT_WRITE,
419 MYANONFLAG | MAP_SHARED, -1, 0);
420 if (MAP_FAILED == retVal) {
421 if (Anonymous == s_mmapworks)
throw Exception(
"mmap", errno);
423 assert(Unknown == s_mmapworks || Anonymous == s_mmapworks);
424 s_mmapworks = Anonymous;
425 if (BidirMMapPipe::debugflag() && !msgprinted) {
426 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
427 __FILE__ <<
", line " << __LINE__ <<
428 "): anonymous mmapping works, excellent!" <<
437 if (DevZero == s_mmapworks || Unknown == s_mmapworks) {
440 int fd = ::open(
"/dev/zero", O_RDWR);
442 throw Exception(
"open /dev/zero", errno);
443 void* retVal = ::mmap(0, len,
444 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
445 if (MAP_FAILED == retVal) {
448 if (DevZero == s_mmapworks)
throw Exception(
"mmap", errsv);
450 assert(Unknown == s_mmapworks || DevZero == s_mmapworks);
451 s_mmapworks = DevZero;
453 if (-1 == ::close(fd))
454 throw Exception(
"close /dev/zero", errno);
455 if (BidirMMapPipe::debugflag() && !msgprinted) {
456 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
457 ", line " << __LINE__ <<
"): mmapping /dev/zero works, "
458 "very good!" << std::endl;
463 if (FileBacked == s_mmapworks || Unknown == s_mmapworks) {
464 char name[] =
"/tmp/BidirMMapPipe-XXXXXX";
467 if (-1 == (fd = ::mkstemp(name)))
throw Exception(
"mkstemp", errno);
469 if (-1 == ::unlink(name)) {
472 throw Exception(
"unlink", errsv);
475 if (-1 == ::lseek(fd, len - 1, SEEK_SET)) {
478 throw Exception(
"lseek", errsv);
481 if (1 != ::write(fd, name, 1)) {
484 throw Exception(
"write", errsv);
487 void* retVal = ::mmap(0, len,
488 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
489 if (MAP_FAILED == retVal) {
492 if (FileBacked == s_mmapworks)
throw Exception(
"mmap", errsv);
494 assert(Unknown == s_mmapworks || FileBacked == s_mmapworks);
495 s_mmapworks = FileBacked;
497 if (-1 == ::close(fd)) {
499 ::munmap(retVal, len);
500 throw Exception(
"close", errsv);
502 if (BidirMMapPipe::debugflag() && !msgprinted) {
503 std::cerr <<
" INFO: In " << __func__ <<
" (" << __FILE__ <<
504 ", line " << __LINE__ <<
"): mmapping temporary files "
505 "works, good!" << std::endl;
510 if (Copy == s_mmapworks || Unknown == s_mmapworks) {
515 if (BidirMMapPipe::debugflag() && !msgprinted) {
516 std::cerr <<
"WARNING: In " << __func__ <<
" (" << __FILE__ <<
517 ", line " << __LINE__ <<
"): anonymous mmapping of "
518 "shared buffers failed, falling back to read/write on "
519 " pipes!" << std::endl;
523 void* retVal = std::malloc(len);
524 if (!retVal)
throw Exception(
"malloc", errno);
532 void PageChunk::domunmap(
void* addr,
unsigned len)
534 assert(len && 0 == (len % s_physpgsz));
536 assert(Unknown != s_mmapworks);
537 if (Copy != s_mmapworks) {
538 if (-1 == ::munmap(addr, len))
539 throw Exception(
"munmap", errno);
546 void PageChunk::zap(Pages& p)
559 if (Copy != s_mmapworks) {
560 unsigned char* p0 =
reinterpret_cast<unsigned char*
>(m_begin);
561 unsigned char* p1 =
reinterpret_cast<unsigned char*
>(p[0u]);
562 unsigned char* p2 = p1 + p.npages() * s_physpgsz;
563 unsigned char* p3 =
reinterpret_cast<unsigned char*
>(m_end);
564 if (p1 != p0) ::mprotect(p0, p1 - p0, PROT_NONE);
565 if (p2 != p3) ::mprotect(p2, p3 - p2, PROT_NONE);
570 p.m_pimpl->m_parent = 0;
576 PagePool::PagePool(
unsigned nPgPerGroup) :
577 m_cursz(minsz), m_nPgPerGrp(nPgPerGroup)
581 if (PageChunk::pagesize() != PageChunk::physPgSz()) {
582 const unsigned mult =
583 PageChunk::physPgSz() / PageChunk::pagesize();
584 const unsigned desired = nPgPerGroup * PageChunk::pagesize();
586 const unsigned actual = mult *
587 (desired / mult + bool(desired % mult));
588 const unsigned newPgPerGrp = actual / PageChunk::pagesize();
589 if (BidirMMapPipe::debugflag()) {
590 std::cerr <<
" INFO: In " << __func__ <<
" (" <<
591 __FILE__ <<
", line " << __LINE__ <<
592 "): physical page size " << PageChunk::physPgSz() <<
593 ", subdividing into logical pages of size " <<
594 PageChunk::pagesize() <<
", adjusting nPgPerGroup " <<
595 m_nPgPerGrp <<
" -> " << newPgPerGrp <<
598 assert(newPgPerGrp >= m_nPgPerGrp);
599 m_nPgPerGrp = newPgPerGrp;
601 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
604 PagePool::~PagePool()
607 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it)
612 void PagePool::zap(Pages& p)
616 for (ChunkList::iterator it = m_chunks.begin(); m_chunks.end() != it; ++it) {
617 if ((*it)->contains(p)) {
624 std::fill(m_szmap, m_szmap + ((maxsz - minsz) / szincr), 0);
628 Pages PagePool::pop()
630 if (m_freelist.empty()) {
632 const int sz = nextChunkSz();
633 Chunk *c =
new Chunk(
this,
634 sz * m_nPgPerGrp * pagesize(), m_nPgPerGrp);
635 m_chunks.push_front(c);
636 m_freelist.push_back(c);
640 Chunk* c = m_freelist.front();
643 if (c->full()) m_freelist.pop_front();
647 void PagePool::release(PageChunk* chunk)
649 assert(chunk->empty());
651 ChunkList::iterator it = std::find(
652 m_freelist.begin(), m_freelist.end(), chunk);
653 if (m_freelist.end() == it)
654 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
655 m_freelist.erase(it);
657 it = std::find(m_chunks.begin(), m_chunks.end(), chunk);
658 if (m_chunks.end() == it)
659 throw Exception(
"PagePool::release(PageChunk*)", EINVAL);
661 const unsigned sz = chunk->len() / (pagesize() * m_nPgPerGrp);
666 void PagePool::putOnFreeList(PageChunk* chunk)
668 assert(!chunk->full());
669 m_freelist.push_back(chunk);
672 void PagePool::updateCurSz(
int sz,
int incr)
674 m_szmap[(sz - minsz) / szincr] += incr;
676 for (
int i = (maxsz - minsz) / szincr; i--; ) {
678 m_cursz += i * szincr;
684 int PagePool::nextChunkSz()
const
688 if (m_chunks.empty()) {
696 if (1 != m_chunks.size()) {
706 if (sz > maxsz) sz = maxsz;
707 if (sz < minsz) sz = minsz;
713 pthread_mutex_t BidirMMapPipe::s_openpipesmutex = PTHREAD_MUTEX_INITIALIZER;
714 std::list<BidirMMapPipe*> BidirMMapPipe::s_openpipes;
715 BidirMMapPipe_impl::PagePool* BidirMMapPipe::s_pagepool = 0;
716 unsigned BidirMMapPipe::s_pagepoolrefcnt = 0;
717 int BidirMMapPipe::s_debugflag = 0;
719 BidirMMapPipe_impl::PagePool& BidirMMapPipe::pagepool()
722 s_pagepool =
new BidirMMapPipe_impl::PagePool(TotPages);
726 void BidirMMapPipe::teardownall(
void)
728 pthread_mutex_lock(&s_openpipesmutex);
729 while (!s_openpipes.empty()) {
730 BidirMMapPipe *p = s_openpipes.front();
731 pthread_mutex_unlock(&s_openpipesmutex);
732 if (p->m_childPid) kill(p->m_childPid, SIGTERM);
733 p->doClose(
true,
true);
734 pthread_mutex_lock(&s_openpipesmutex);
736 pthread_mutex_unlock(&s_openpipesmutex);
739 BidirMMapPipe::BidirMMapPipe(
const BidirMMapPipe&) :
740 m_pages(pagepool().pop())
743 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
744 if (!s_pagepoolrefcnt) {
750 BidirMMapPipe::BidirMMapPipe(
bool useExceptions,
bool useSocketpair) :
751 m_pages(pagepool().pop()), m_busylist(0), m_freelist(0), m_dirtylist(0),
752 m_inpipe(-1), m_outpipe(-1), m_flags(failbit), m_childPid(0),
753 m_parentPid(::getpid())
757 assert(0 < TotPages && 0 == (TotPages & 1) && TotPages <= 256);
758 int fds[4] = { -1, -1, -1, -1 };
760 static bool firstcall =
true;
761 if (useExceptions) m_flags |= exceptionsbit;
768 if (0 != atexit(BidirMMapPipe::teardownall))
769 throw Exception(
"atexit", errno);
773 for (
unsigned i = 1; i < TotPages; ++i)
774 m_pages[i - 1]->setNext(m_pages[i]);
775 m_pages[PagesPerEnd - 1]->setNext(0);
776 if (!useSocketpair) {
778 if (0 != ::pipe(&fds[0]))
throw Exception(
"pipe", errno);
779 if (0 != ::pipe(&fds[2]))
throw Exception(
"pipe", errno);
781 if (0 != ::socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]))
782 throw Exception(
"socketpair", errno);
785 pthread_mutex_lock(&s_openpipesmutex);
787 switch ((m_childPid = ::fork())) {
790 pthread_mutex_unlock(&s_openpipesmutex);
792 throw Exception(
"fork", myerrno);
797 if (-1 == ::close(fds[0]) || (-1 == ::close(fds[3]))) {
799 pthread_mutex_unlock(&s_openpipesmutex);
800 throw Exception(
"close", myerrno);
802 fds[0] = fds[3] = -1;
807 if (-1 == ::close(fds[0])) {
809 pthread_mutex_unlock(&s_openpipesmutex);
810 throw Exception(
"close", myerrno);
813 m_inpipe = m_outpipe = fds[1];
817 for (std::list<BidirMMapPipe*>::iterator it = s_openpipes.begin();
818 s_openpipes.end() != it; ) {
819 BidirMMapPipe* p = *it;
820 it = s_openpipes.erase(it);
821 p->doClose(
true,
true);
823 pagepool().zap(m_pages);
824 s_pagepoolrefcnt = 0;
827 s_openpipes.push_front(
this);
828 pthread_mutex_unlock(&s_openpipesmutex);
830 m_freelist = m_pages[PagesPerEnd];
833 if (1 != xferraw(m_outpipe, &c, 1, ::write))
834 throw Exception(
"handshake: xferraw write", EPIPE);
835 if (1 != xferraw(m_inpipe, &c, 1, ::read))
836 throw Exception(
"handshake: xferraw read", EPIPE);
837 if (
'P' != c)
throw Exception(
"handshake", EPIPE);
843 if (-1 == ::close(fds[1]) || -1 == ::close(fds[2])) {
845 pthread_mutex_unlock(&s_openpipesmutex);
846 throw Exception(
"close", myerrno);
848 fds[1] = fds[2] = -1;
853 if (-1 == ::close(fds[1])) {
855 pthread_mutex_unlock(&s_openpipesmutex);
856 throw Exception(
"close", myerrno);
859 m_inpipe = m_outpipe = fds[0];
863 s_openpipes.push_front(
this);
864 pthread_mutex_unlock(&s_openpipesmutex);
866 m_freelist = m_pages[0u];
869 if (1 != xferraw(m_outpipe, &c, 1, ::write))
870 throw Exception(
"handshake: xferraw write", EPIPE);
871 if (1 != xferraw(m_inpipe, &c, 1, ::read))
872 throw Exception(
"handshake: xferraw read", EPIPE);
873 if (
'C' != c)
throw Exception(
"handshake", EPIPE);
879 if (-1 == ::fcntl(m_outpipe, F_GETFD, &fdflags))
880 throw Exception(
"fcntl", errno);
881 fdflags |= FD_CLOEXEC;
882 if (-1 == ::fcntl(m_outpipe, F_SETFD, fdflags))
883 throw Exception(
"fcntl", errno);
884 if (m_inpipe != m_outpipe) {
885 if (-1 == ::fcntl(m_inpipe, F_GETFD, &fdflags))
886 throw Exception(
"fcntl", errno);
887 fdflags |= FD_CLOEXEC;
888 if (-1 == ::fcntl(m_inpipe, F_SETFD, fdflags))
889 throw Exception(
"fcntl", errno);
894 }
catch (BidirMMapPipe::Exception&) {
895 if (0 != m_childPid) kill(m_childPid, SIGTERM);
896 for (
int i = 0; i < 4; ++i)
897 if (-1 != fds[i] && 0 != fds[i]) ::close(fds[i]);
900 BidirMMapPipe_impl::Pages p; p.swap(m_pages);
902 if (!--s_pagepoolrefcnt) {
910 int BidirMMapPipe::close()
912 assert(!(m_flags & failbit));
913 return doClose(
false);
916 int BidirMMapPipe::doClose(
bool force,
bool holdlock)
918 if (m_flags & failbit)
return 0;
920 if (!force && -1 != m_outpipe && -1 != m_inpipe) flush();
922 if (m_inpipe == m_outpipe) {
923 if (-1 != m_outpipe && !force && -1 == ::shutdown(m_outpipe, SHUT_WR))
924 throw Exception(
"shutdown", errno);
927 if (-1 != m_outpipe && -1 == ::close(m_outpipe))
928 if (!force)
throw Exception(
"close", errno);
933 if (!force && -1 != m_inpipe) {
950 while ((err = ::poll(&fds, 1, 1 << 20)) >= 0) {
951 if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
break;
952 if (fds.revents & POLLIN) {
954 if (1 > ::read(m_inpipe, &c, 1))
break;
957 }
while (0 > err && EINTR == errno);
961 if (-1 != m_inpipe && -1 == ::close(m_inpipe))
962 if (!force)
throw Exception(
"close", errno);
966 { BidirMMapPipe_impl::Pages p; p.swap(m_pages); }
967 if (!--s_pagepoolrefcnt) {
971 }
catch (std::exception&) {
974 m_busylist = m_freelist = m_dirtylist = 0;
980 tmp = waitpid(m_childPid, &retVal, 0);
981 }
while (-1 == tmp && EINTR == errno);
983 if (!force)
throw Exception(
"waitpid", errno);
987 if (!holdlock) pthread_mutex_lock(&s_openpipesmutex);
988 std::list<BidirMMapPipe*>::iterator it = std::find(
989 s_openpipes.begin(), s_openpipes.end(),
this);
990 if (s_openpipes.end() != it) s_openpipes.erase(it);
991 if (!holdlock) pthread_mutex_unlock(&s_openpipesmutex);
996 BidirMMapPipe::~BidirMMapPipe()
999 BidirMMapPipe::size_type BidirMMapPipe::xferraw(
1000 int fd,
void* addr, size_type len,
1001 ssize_t (*xferfn)(
int,
void*, std::size_t))
1003 size_type xferred = 0;
1004 unsigned char* buf =
reinterpret_cast<unsigned char*
>(addr);
1006 ssize_t tmp = xferfn(fd, buf, len);
1012 }
else if (0 == tmp) {
1015 }
else if (-1 == tmp) {
1022 if (xferred)
return xferred;
1024 throw Exception(
"xferraw", errno);
1026 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1029 std::cerr <<
" ERROR: In " << __func__ <<
" (" <<
1030 __FILE__ <<
", line " << __LINE__ <<
1031 "): expect transfer to block!" << std::endl;
1037 throw Exception(
"xferraw: unexpected return value from read/write",
1044 void BidirMMapPipe::sendpages(Page* plist)
1047 unsigned char pg = m_pages[plist];
1048 if (1 == xferraw(m_outpipe, &pg, 1, ::write)) {
1049 if (BidirMMapPipe_impl::PageChunk::Copy ==
1050 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1052 for (Page* p = plist; p; p = p->next()) {
1053 if (
sizeof(Page) + p->size() !=
1054 xferraw(m_outpipe, p,
sizeof(Page) + p->size(),
1056 throw Exception(
"sendpages: short write", EPIPE);
1061 throw Exception(
"sendpages: short write", EPIPE);
1063 }
else { assert(plist); }
1066 unsigned BidirMMapPipe::recvpages()
1069 unsigned retVal = 0;
1070 Page *plisthead = 0, *plisttail = 0;
1071 if (1 == xferraw(m_inpipe, &pg, 1, ::read)) {
1072 plisthead = plisttail = m_pages[pg];
1074 if (BidirMMapPipe_impl::PageChunk::Copy ==
1075 BidirMMapPipe_impl::PageChunk::mmapVariety()) {
1077 for (; plisttail; ++retVal) {
1078 Page* p = plisttail;
1079 if (
sizeof(Page) == xferraw(m_inpipe, p,
sizeof(Page),
1081 plisttail = p->next();
1082 if (!p->size())
continue;
1084 if (p->size() != xferraw(m_inpipe, p->begin(), p->size(),
1089 retVal = lenPageList(plisthead);
1093 if (plisthead) feedPageLists(plisthead);
1099 unsigned BidirMMapPipe::recvpages_nonblock()
1103 fds.events = POLLIN;
1105 unsigned retVal = 0;
1107 int rc = ::poll(&fds, 1, 0);
1109 if (EINTR == errno)
continue;
1112 if (1 == retVal && fds.revents & POLLIN &&
1113 !(fds.revents & (POLLNVAL | POLLERR))) {
1124 unsigned BidirMMapPipe::lenPageList(
const Page* p)
1127 for ( ; p; p = p->next()) ++n;
1131 void BidirMMapPipe::feedPageLists(Page* plist)
1135 Page *blend = m_busylist;
1136 while (blend && blend->next()) blend = blend->next();
1140 Page *sendlisthead = 0, *sendlisttail = 0;
1150 if (blend) blend->setNext(p);
1151 else m_busylist = p;
1158 if ((isParent() && m_pages[p] >= PagesPerEnd) ||
1159 (isChild() && m_pages[p] < PagesPerEnd)) {
1161 if (!sendlisthead) sendlisthead = p;
1162 if (sendlisttail) sendlisttail->setNext(p);
1166 p->setNext(m_freelist);
1176 while ((dp = m_dirtylist) && dp->full()) {
1179 m_dirtylist = p->next();
1182 sendlisttail->setNext(p);
1189 const int nfds = (m_outpipe == m_inpipe) ? 1 : 2;
1190 struct pollfd fds[2];
1191 fds[0].fd = m_outpipe;
1192 fds[0].events = fds[0].revents = 0;
1193 if (m_outpipe != m_inpipe) {
1194 fds[1].fd = m_inpipe;
1195 fds[1].events = fds[1].revents = 0;
1197 fds[0].events |= POLLIN;
1201 retVal = ::poll(fds, nfds, 0);
1202 if (0 > retVal && EINTR == errno)
1207 bool ok = !(fds[0].revents & (POLLERR | POLLNVAL | POLLHUP));
1208 if (m_outpipe != m_inpipe) {
1209 ok = ok && !(fds[1].revents & (POLLERR | POLLNVAL | POLLHUP));
1211 if (ok && fds[0].revents & POLLIN) {
1212 unsigned ret = recvpages();
1213 if (!ret) ok =
false;
1217 if (ok) sendpages(sendlisthead);
1222 throw Exception(
"feedPageLists: poll", errno);
1227 void BidirMMapPipe::markPageDirty(Page* p)
1230 assert(p == m_freelist);
1232 m_freelist = p->next();
1235 Page* dl = m_dirtylist;
1236 while (dl && dl->next()) dl = dl->next();
1237 if (dl) dl->setNext(p);
1238 else m_dirtylist = p;
1241 BidirMMapPipe::Page* BidirMMapPipe::busypage()
1244 recvpages_nonblock();
1248 while (!(p = m_busylist))
if (!recvpages())
return 0;
1252 BidirMMapPipe::Page* BidirMMapPipe::dirtypage()
1255 recvpages_nonblock();
1256 Page* p = m_dirtylist;
1258 if (p)
while (p->next()) p = p->next();
1259 if (!p || p->full()) {
1261 while (!(p = m_freelist))
if (!recvpages())
return 0;
1267 void BidirMMapPipe::flush()
1268 {
return doFlush(
true); }
1270 void BidirMMapPipe::doFlush(
bool forcePartialPages)
1272 assert(!(m_flags & failbit));
1274 Page *flushlisthead = 0, *flushlisttail = 0;
1275 while (m_dirtylist) {
1276 Page* p = m_dirtylist;
1277 if (!forcePartialPages && !p->full())
break;
1279 m_dirtylist = p->next();
1282 if (!flushlisthead) flushlisthead = p;
1283 if (flushlisttail) flushlisttail->setNext(p);
1286 if (flushlisthead) sendpages(flushlisthead);
1289 void BidirMMapPipe::purge()
1291 assert(!(m_flags & failbit));
1294 Page *l = m_busylist;
1295 while (l && l->next()) l = l->next();
1296 if (l) l->setNext(m_dirtylist);
1297 else m_busylist = m_dirtylist;
1300 for (Page* p = m_busylist; p; p = p->next()) p->size() = 0;
1302 if (m_busylist) feedPageLists(m_busylist);
1303 m_busylist = m_dirtylist = 0;
1306 BidirMMapPipe::size_type BidirMMapPipe::bytesReadableNonBlocking()
1310 recvpages_nonblock();
1311 size_type retVal = 0;
1312 for (Page* p = m_busylist; p; p = p->next())
1313 retVal += p->size() - p->pos();
1317 BidirMMapPipe::size_type BidirMMapPipe::bytesWritableNonBlocking()
1321 recvpages_nonblock();
1324 bool couldwrite =
false;
1328 fds.events = POLLOUT;
1332 retVal = ::poll(&fds, 1, 0);
1334 if (EINTR == errno)
continue;
1335 throw Exception(
"bytesWritableNonBlocking: poll", errno);
1337 if (1 == retVal && fds.revents & POLLOUT &&
1338 !(fds.revents & (POLLNVAL | POLLERR | POLLHUP)))
1344 size_type retVal = 0;
1345 unsigned npages = 0;
1347 for (Page* p = m_dirtylist; p; p = p->next()) {
1351 retVal += p->free();
1352 if (npages >= FlushThresh && !couldwrite)
break;
1355 for (Page* p = m_freelist; p && (!m_dirtylist ||
1356 npages < FlushThresh || couldwrite); p = p->next()) {
1358 retVal += Page::capacity();
1363 BidirMMapPipe::size_type BidirMMapPipe::read(
void* addr, size_type sz)
1365 assert(!(m_flags & failbit));
1366 size_type nread = 0;
1367 unsigned char *ap =
reinterpret_cast<unsigned char*
>(addr);
1371 Page* p = busypage();
1376 unsigned char* pp = p->begin() + p->pos();
1377 size_type csz = std::min(size_type(p->remaining()), sz);
1378 std::copy(pp, pp + csz, ap);
1383 assert(p->size() >= p->pos());
1384 if (p->size() == p->pos()) {
1386 m_busylist = p->next();
1392 }
catch (Exception&) {
1393 m_flags |= rderrbit;
1394 if (m_flags & exceptionsbit)
throw;
1399 BidirMMapPipe::size_type BidirMMapPipe::write(
const void* addr, size_type sz)
1401 assert(!(m_flags & failbit));
1402 size_type written = 0;
1403 const unsigned char *ap =
reinterpret_cast<const unsigned char*
>(addr);
1407 Page* p = dirtypage();
1412 unsigned char* pp = p->begin() + p->size();
1413 size_type csz = std::min(size_type(p->free()), sz);
1414 std::copy(ap, ap + csz, pp);
1419 assert(p->capacity() >= p->size());
1423 if (lenPageList(m_dirtylist) >= FlushThresh)
1427 }
catch (Exception&) {
1428 m_flags |= wrerrbit;
1429 if (m_flags & exceptionsbit)
throw;
1434 int BidirMMapPipe::poll(BidirMMapPipe::PollVector& pipes,
int timeout)
1439 bool canskiptimeout =
false;
1440 std::vector<unsigned> masks(pipes.size(), ~(Readable | Writable));
1441 std::vector<unsigned>::iterator mit = masks.begin();
1442 for (PollVector::iterator it = pipes.begin(); pipes.end() != it;
1444 PollEntry& pe = *it;
1447 if (!pe.pipe || pe.pipe->closed()) pe.revents |= Invalid;
1449 if (pe.pipe->bad()) pe.revents |= Error;
1451 if (pe.pipe->eof()) pe.revents |= EndOfFile;
1453 if (pe.events & Readable) {
1455 if (pe.pipe->m_busylist) pe.revents |= Readable;
1458 if (pe.events & Writable) {
1460 if (pe.pipe->m_freelist) {
1461 pe.revents |= Writable;
1463 Page *dl = pe.pipe->m_dirtylist;
1464 while (dl && dl->next()) dl = dl->next();
1465 if (dl && dl->pos() < Page::capacity())
1466 pe.revents |= Writable;
1469 if (pe.revents) canskiptimeout =
true;
1472 std::vector<pollfd> fds;
1473 fds.reserve(2 * pipes.size());
1474 std::map<int, PollEntry*> fds2pipes;
1475 for (PollVector::const_iterator it = pipes.begin();
1476 pipes.end() != it; ++it) {
1477 const PollEntry& pe = *it;
1479 fds2pipes.insert(std::make_pair((tmp.fd = pe.pipe->m_inpipe),
1480 const_cast<PollEntry*>(&pe)));
1481 tmp.events = tmp.revents = 0;
1484 tmp.events |= POLLIN;
1485 if (pe.pipe->m_outpipe != tmp.fd) {
1488 fds2pipes.insert(std::make_pair(
1489 unsigned(tmp.fd = pe.pipe->m_outpipe),
1490 const_cast<PollEntry*>(&pe)));
1494 if (pe.events & Writable) tmp.events |= POLLOUT;
1500 retVal = ::poll(&fds[0], fds.size(), canskiptimeout ? 0 : timeout);
1502 if (EINTR == errno)
continue;
1503 throw Exception(
"poll", errno);
1508 for (std::vector<pollfd>::iterator it = fds.begin();
1509 fds.end() != it; ++it) {
1513 PollEntry& pe = *fds2pipes[fe.fd];
1515 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_inpipe)
1516 pe.revents |= ReadInvalid;
1517 if (fe.revents & POLLNVAL && fe.fd == pe.pipe->m_outpipe)
1518 pe.revents |= WriteInvalid;
1519 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_inpipe)
1520 pe.revents |= ReadError;
1521 if (fe.revents & POLLERR && fe.fd == pe.pipe->m_outpipe)
1522 pe.revents |= WriteError;
1523 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_inpipe)
1524 pe.revents |= ReadEndOfFile;
1525 if (fe.revents & POLLHUP && fe.fd == pe.pipe->m_outpipe)
1526 pe.revents |= WriteEndOfFile;
1527 if ((fe.revents & POLLIN) && fe.fd == pe.pipe->m_inpipe &&
1528 !(fe.revents & (POLLNVAL | POLLERR))) {
1531 if (0 == pe.pipe->recvpages())
continue;
1534 int tmp = ::poll(&fe, 1, 0);
1535 if (tmp > 0)
goto oncemore;
1537 if (EINTR == errno)
continue;
1538 throw Exception(
"poll", errno);
1543 if (pe.pipe->m_busylist) pe.revents |= Readable;
1544 if (fe.revents & POLLOUT && fe.fd == pe.pipe->m_outpipe) {
1545 if (pe.pipe->m_freelist) {
1546 pe.revents |= Writable;
1548 Page *dl = pe.pipe->m_dirtylist;
1549 while (dl && dl->next()) dl = dl->next();
1550 if (dl && dl->pos() < Page::capacity())
1551 pe.revents |= Writable;
1557 mit = masks.begin();
1558 for (PollVector::iterator it = pipes.begin();
1559 pipes.end() != it; ++it, ++mit)
1560 if ((it->revents &= *mit)) ++npipes;
1564 BidirMMapPipe& BidirMMapPipe::operator<<(
const char* str)
1566 size_t sz = std::strlen(str);
1568 if (sz) write(str, sz);
1572 BidirMMapPipe& BidirMMapPipe::operator>>(
char* (&str))
1576 if (good() && !eof()) {
1577 str =
reinterpret_cast<char*
>(std::realloc(str, sz + 1));
1578 if (!str)
throw Exception(
"realloc", errno);
1579 if (sz) read(str, sz);
1585 BidirMMapPipe& BidirMMapPipe::operator<<(
const std::string& str)
1587 size_t sz = str.size();
1589 write(str.data(), sz);
1593 BidirMMapPipe& BidirMMapPipe::operator>>(std::string& str)
1598 if (good() && !eof()) {
1600 for (
unsigned char c; sz--; str.push_back(c)) *
this >> c;
1605 END_NAMESPACE_ROOFIT
1607 #ifdef TEST_BIDIRMMAPPIPE
1608 using namespace RooFit;
1610 int simplechild(BidirMMapPipe& pipe)
1613 while (pipe.good() && !pipe.eof()) {
1617 if (!pipe)
return -1;
1618 if (pipe.eof())
break;
1620 std::cout <<
"[CHILD] : read: " << str << std::endl;
1621 str =
"... early in the morning?";
1623 pipe << str << BidirMMapPipe::flush;
1625 if (str.empty())
break;
1626 if (!pipe)
return -1;
1627 if (pipe.eof())
break;
1628 std::cout <<
"[CHILD] : wrote: " << str << std::endl;
1635 int randomchild(BidirMMapPipe& pipe)
1638 ::srand48(::getpid());
1646 for (
int i = 0; i < 5; ++i) {
1648 ::usleep(
int(1e6 * ::drand48()));
1649 std::ostringstream buf;
1650 buf <<
"child pid " << ::getpid() <<
" sends message " << i;
1651 std::string str = buf.str();
1652 std::cout <<
"[CHILD] : " << str << std::endl;
1653 pipe << str << BidirMMapPipe::flush;
1654 if (!pipe)
return -1;
1655 if (pipe.eof())
break;
1658 pipe <<
"" << BidirMMapPipe::flush;
1666 int benchchildrtt(BidirMMapPipe& pipe)
1671 while (pipe && !pipe.eof()) {
1678 if (pipe.eof())
break;
1679 pipe << str << BidirMMapPipe::flush;
1681 if (!std::strlen(str))
break;
1688 int benchchildsink(BidirMMapPipe& pipe)
1692 while (pipe && !pipe.eof()) {
1694 if (!std::strlen(str))
break;
1696 pipe <<
"" << BidirMMapPipe::flush;
1702 int benchchildsource(BidirMMapPipe& pipe)
1706 for (
unsigned i = 0; i <= 24; ++i) {
1707 str =
reinterpret_cast<char*
>(std::realloc(str, (1 << i) + 1));
1708 std::memset(str,
'4', 1 << i);
1710 for (
unsigned j = 0; j < 1 << 7; ++j) {
1712 if (!pipe || pipe.eof()) {
1719 pipe <<
"" << BidirMMapPipe::flush;
1722 pipe <<
"" << BidirMMapPipe::flush;
1728 BidirMMapPipe* spawnChild(
int (*childexec)(BidirMMapPipe&))
1731 BidirMMapPipe *p =
new BidirMMapPipe();
1733 int retVal = childexec(*p);
1740 #include <sys/time.h>
1746 std::cout <<
"[PARENT]: simple challenge-response test, "
1747 "one child:" << std::endl;
1748 BidirMMapPipe* pipe = spawnChild(simplechild);
1749 for (
int i = 0; i < 5; ++i) {
1750 std::string str(
"What shall we do with a drunken sailor...");
1751 *pipe << str << BidirMMapPipe::flush;
1752 if (!*pipe)
return -1;
1753 std::cout <<
"[PARENT]: wrote: " << str << std::endl;
1755 if (!*pipe)
return -1;
1756 std::cout <<
"[PARENT]: read: " << str << std::endl;
1759 *pipe <<
"" << BidirMMapPipe::flush;
1763 int retVal = pipe->close();
1764 std::cout <<
"[PARENT]: exit status of child: " << retVal <<
1766 if (retVal)
return retVal;
1772 std::cout << std::endl <<
"[PARENT]: polling test, " << nch <<
1773 " children:" << std::endl;
1774 typedef BidirMMapPipe::PollEntry PollEntry;
1776 BidirMMapPipe::PollVector pipes;
1779 for (
unsigned i = 0; i < nch; ++i) {
1780 std::cout <<
"[PARENT]: spawning child " << i << std::endl;
1781 pipes.push_back(PollEntry(spawnChild(randomchild),
1782 BidirMMapPipe::Readable));
1785 std::cout <<
"[PARENT]: waking up children" << std::endl;
1786 for (
unsigned i = 0; i < nch; ++i)
1787 *pipes[i].pipe <<
"" << BidirMMapPipe::flush;
1788 std::cout <<
"[PARENT]: waiting for events on children's pipes" << std::endl;
1790 while (!pipes.empty()) {
1792 int npipes = BidirMMapPipe::poll(pipes, -1);
1794 for (std::vector<PollEntry>::iterator it = pipes.begin();
1795 npipes && pipes.end() != it; ) {
1803 if (it->revents & BidirMMapPipe::Readable) {
1807 std::cout <<
"[PARENT]: Read from pipe " << it->pipe <<
1808 ": " << s << std::endl;
1813 *(it->pipe) <<
"" << BidirMMapPipe::flush;
1818 if (it->revents & (BidirMMapPipe::Error |
1819 BidirMMapPipe::EndOfFile |
1820 BidirMMapPipe::Invalid)) {
1821 std::cerr <<
"[DEBUG]: Event on pipe " << it->pipe <<
1823 ((it->revents & BidirMMapPipe::Readable) ?
" Readable" :
"") <<
1824 ((it->revents & BidirMMapPipe::Writable) ?
" Writable" :
"") <<
1825 ((it->revents & BidirMMapPipe::ReadError) ?
" ReadError" :
"") <<
1826 ((it->revents & BidirMMapPipe::WriteError) ?
" WriteError" :
"") <<
1827 ((it->revents & BidirMMapPipe::ReadEndOfFile) ?
" ReadEndOfFile" :
"") <<
1828 ((it->revents & BidirMMapPipe::WriteEndOfFile) ?
" WriteEndOfFile" :
"") <<
1829 ((it->revents & BidirMMapPipe::ReadInvalid) ?
" ReadInvalid" :
"") <<
1830 ((it->revents & BidirMMapPipe::WriteInvalid) ?
" WriteInvalid" :
"") <<
1833 int retVal = it->pipe->close();
1834 std::cout <<
"[PARENT]: child exit status: " <<
1835 retVal <<
", number of children still alive: " <<
1836 (pipes.size() - 1) << std::endl;
1837 if (retVal)
return retVal;
1839 it = pipes.erase(it);
1847 std::cout << std::endl <<
"[PARENT]: benchmark: round-trip times vs block size" << std::endl;
1848 for (
unsigned i = 0; i <= 24; ++i) {
1849 char *s =
new char[1 + (1 << i)];
1850 std::memset(s,
'A', 1 << i);
1852 const unsigned n = 1 << 7;
1853 double avg = 0., min = 1e42, max = -1e42;
1854 BidirMMapPipe *pipe = spawnChild(benchchildrtt);
1855 for (
unsigned j = n; j--; ) {
1857 ::gettimeofday(&t1, 0);
1858 *pipe << s << BidirMMapPipe::flush;
1859 if (!*pipe || pipe->eof())
break;
1861 if (!*pipe || pipe->eof())
break;
1863 ::gettimeofday(&t2, 0);
1864 t2.tv_sec -= t1.tv_sec;
1865 t2.tv_usec -= t1.tv_usec;
1866 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1867 if (dt < min) min = dt;
1868 if (dt > max) max = dt;
1872 *pipe <<
"" << BidirMMapPipe::flush;
1876 avg *= 1e6; min *= 1e6; max *= 1e6;
1877 int retVal = pipe->close();
1879 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1887 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1888 " avg " << std::setw(7) << avg <<
" us min " <<
1889 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1890 "us speed " << std::setw(9) <<
1891 2. * (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1892 " MB/s" << std::endl;
1895 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1899 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as sink" << std::endl;
1900 for (
unsigned i = 0; i <= 24; ++i) {
1901 char *s =
new char[1 + (1 << i)];
1902 std::memset(s,
'A', 1 << i);
1904 const unsigned n = 1 << 7;
1905 double avg = 0., min = 1e42, max = -1e42;
1906 BidirMMapPipe *pipe = spawnChild(benchchildsink);
1907 for (
unsigned j = n; j--; ) {
1909 ::gettimeofday(&t1, 0);
1912 if (!*pipe || pipe->eof())
break;
1914 ::gettimeofday(&t2, 0);
1915 t2.tv_sec -= t1.tv_sec;
1916 t2.tv_usec -= t1.tv_usec;
1917 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1918 if (dt < min) min = dt;
1919 if (dt > max) max = dt;
1923 *pipe <<
"" << BidirMMapPipe::flush;
1927 avg *= 1e6; min *= 1e6; max *= 1e6;
1928 int retVal = pipe->close();
1930 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1934 std::cout <<
"block size " << std::setw(9) << (1 << i) <<
1935 " avg " << std::setw(7) << avg <<
" us min " <<
1936 std::setw(7) << min <<
" us max " << std::setw(7) << max <<
1937 "us speed " << std::setw(9) <<
1938 (double(1 << i) / double(1 << 20) / (1e-6 * avg)) <<
1939 " MB/s" << std::endl;
1942 std::cout <<
"[PARENT]: all children had exit code 0" << std::endl;
1946 std::cout << std::endl <<
"[PARENT]: benchmark: raw transfer rate with child as source" << std::endl;
1948 double avg = 0., min = 1e42, max = -1e42;
1949 unsigned n = 0, bsz = 0;
1950 BidirMMapPipe *pipe = spawnChild(benchchildsource);
1951 while (*pipe && !pipe->eof()) {
1953 ::gettimeofday(&t1, 0);
1956 if (!*pipe || pipe->eof())
break;
1958 ::gettimeofday(&t2, 0);
1959 t2.tv_sec -= t1.tv_sec;
1960 t2.tv_usec -= t1.tv_usec;
1961 double dt = 1e-6 * double(t2.tv_usec) + double(t2.tv_sec);
1962 if (std::strlen(s)) {
1964 if (dt < min) min = dt;
1965 if (dt > max) max = dt;
1967 bsz = std::strlen(s);
1972 avg *= 1e6; min *= 1e6; max *= 1e6;
1974 std::cout <<
"block size " << std::setw(9) << bsz <<
1975 " avg " << std::setw(7) << avg <<
" us min " <<
1976 std::setw(7) << min <<
" us max " << std::setw(7) <<
1977 max <<
"us speed " << std::setw(9) <<
1978 (double(bsz) / double(1 << 20) / (1e-6 * avg)) <<
1979 " MB/s" << std::endl;
1986 int retVal = pipe->close();
1987 std::cout <<
"[PARENT]: child exited with code " << retVal << std::endl;
1988 if (retVal)
return retVal;
1994 #endif // TEST_BIDIRMMAPPIPE