[Dune] (no subject)
Jö Fahlke
jorrit at jorrit.de
Wed Nov 10 01:16:49 CET 2010
From 6846c8b3fe16490e9f79ca1c0f8ddae407fb4889 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=B6=20Fahlke?= <jorrit at jorrit.de>
Date: Tue, 9 Nov 2010 23:32:21 +0100
Subject: [PATCH] [oparbufstream] Stream adaptor class for parallel debugging.
This class buffers output in the processes. On command it will send the
buffered output to the root process which writes it to an underlying stream in
rank order.
---
dune/common/Makefile.am | 1 +
dune/common/parallelstream.hh | 171 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 172 insertions(+), 0 deletions(-)
create mode 100644 dune/common/parallelstream.hh
diff --git a/dune/common/Makefile.am b/dune/common/Makefile.am
index 8309ab8..d1b9c1e 100644
--- a/dune/common/Makefile.am
+++ b/dune/common/Makefile.am
@@ -59,6 +59,7 @@ commoninclude_HEADERS = \
mpihelper.hh \
mpitraits.hh \
nullptr.hh \
+ parallelstream.hh \
parametertree.hh \
path.hh \
polyallocator.hh \
diff --git a/dune/common/parallelstream.hh b/dune/common/parallelstream.hh
new file mode 100644
index 0000000..86ac572
--- /dev/null
+++ b/dune/common/parallelstream.hh
@@ -0,0 +1,171 @@
+// -*- tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*-
+// vi: set ts=8 sw=2 et sts=2:
+
+#ifndef DUNE_COMMON_PARALLELSTREAM_HH
+#define DUNE_COMMON_PARALLELSTREAM_HH
+
+#include <algorithm>
+#include <cstddef>
+#include <memory>
+#include <ostream>
+#include <sstream>
+#include <string>
+
+namespace Dune{
+ /** @addtogroup Common
+ *
+ * @{
+ */
+ /**
+ * @file
+ * @brief Streams for use in parallel programs
+ * @author Jö Fahlke
+ */
+
+ //! Stream adaptor for buffered parallel output
+ /**
+ * This stream adaptor collects output in a buffer on the process. Once a
+ * chunk of output is ready, the program can call the member method
+ * parflush() to gather all output in the root process. The root process
+ * will then write the data to the underlying stream in rank order. After a
+ * call to parflush(), the stream is ready to receive new output.
+ *
+ * \note The stream is meant for debugging of with little output per process
+ * and a small number of processes. Due to limitations in
+ * CollectiveCommunication, it has to allocate a buffer of size
+ * number_of_processes*max_size_of_per_process_output, which can
+ * easily exhaust the available memory.
+ *
+ * \tparam Comm Type of CollectiveCommunication to use.
+ * \tparam charT Character type.
+ * \tparam traits Character traits.
+ * \tparam Allocator Allocator for characters.
+ *
+ * \sa oparbufstream, woparbufstream
+ */
+ template <class Comm, class charT, class traits = std::char_traits<charT>,
+ class Allocator = std::allocator<charT> >
+ class basic_oparbufstream :
+ public std::basic_ostringstream<charT, traits, Allocator>
+ {
+ typedef std::basic_ostringstream<charT, traits, Allocator> Base;
+
+ const Comm &comm;
+ std::basic_ostream<charT, traits> &backend;
+ int root;
+
+ public:
+ //! constructor
+ /**
+ * \param comm_ The CollectiveCommunication object to use.
+ * \param backend_ The backend stream to write output to when parflush()
+ * is called. This must be provided on all processes, but
+ * is only used on the root process.
+ * \param root_ Rank of the process which is to be considered root.
+ */
+ basic_oparbufstream(const Comm &comm_,
+ std::basic_ostream<charT, traits> &backend_,
+ int root_ = 0) :
+ comm(comm_), backend(backend_), root(root_)
+ { }
+
+ //! destructor
+ /**
+ * Simply calls parflush().
+ */
+ ~basic_oparbufstream() { parflush(); }
+
+ //! flush output to the root process
+ void parflush() {
+ const std::basic_string<charT, traits, Allocator> &my_str = this->str();
+ this->str(""); // clear underlying strstreams buffer now, so we are in
+ // a sane state if an exception is thrown later on
+
+ std::size_t all_size[comm.size()];
+ std::size_t my_size = my_str.size();
+ comm.gather(&my_size, all_size, 1, root);
+ std::size_t max_size;
+ if(comm.rank() == root)
+ max_size = *std::max_element(all_size, all_size+comm.size());
+ comm.broadcast(&max_size, 1, root);
+ charT my_data[max_size];
+ std::copy(my_str.begin(), my_str.end(), my_data);
+ std::fill(my_data+my_size, my_data+max_size, charT());
+
+ charT *all_data = 0;
+ try {
+ if(comm.rank() == root)
+ all_data = new charT[max_size*comm.size()];
+ comm.gather(my_data, all_data, max_size, root);
+ if(comm.rank() == root)
+ for(int r = 0; r < comm.size(); ++r)
+ backend.write(all_data+r*max_size, all_size[r]);
+ delete[] all_data;
+ all_data = 0;
+ }
+ catch(...) { delete[] all_data; throw; }
+
+ typename Base::iostate state = backend.rdstate();
+ comm.broadcast(&state, 1, root);
+ this->clear(state);
+ }
+ };
+
+ //! Stream adaptor for buffered parallel output -- narrow version
+ /**
+ * This stream adaptor collects output in a buffer on the process. Once a
+ * chunk of output is ready, the program can call the member method
+ * parflush() to gather all output in the root process. The root process
+ * will then write the data to the underlying stream in rank order. After a
+ * call to parflush(), the stream is ready to receive new output.
+ *
+ * \note The stream is meant for debugging of with little output per process
+ * and a small number of processes. Due to limitations in
+ * CollectiveCommunication, it has to allocate a buffer of size
+ * number_of_processes*max_size_of_per_process_output, which can
+ * easily exhaust the available memory.
+ *
+ * \tparam Comm Type of CollectiveCommunication to use.
+ *
+ * \sa basic_oparbufstream, woparbufstream
+ */
+ template<class Comm>
+ struct oparbufstream : public basic_oparbufstream<Comm, char>
+ {
+ //! \copydoc basic_oparbufstream::basic_oparbufstream
+ oparbufstream(const Comm &comm_, std::ostream &backend_, int root_ = 0) :
+ basic_oparbufstream<Comm, char>(comm_, backend_, root_)
+ { }
+ };
+
+ //! Stream adaptor for buffered parallel output -- wide version
+ /**
+ * This stream adaptor collects output in a buffer on the process. Once a
+ * chunk of output is ready, the program can call the member method
+ * parflush() to gather all output in the root process. The root process
+ * will then write the data to the underlying stream in rank order. After a
+ * call to parflush(), the stream is ready to receive new output.
+ *
+ * \note The stream is meant for debugging of with little output per process
+ * and a small number of processes. Due to limitations in
+ * CollectiveCommunication, it has to allocate a buffer of size
+ * number_of_processes*max_size_of_per_process_output, which can
+ * easily exhaust the available memory.
+ *
+ * \tparam Comm Type of CollectiveCommunication to use.
+ *
+ * \sa basic_oparbufstream, oparbufstream
+ */
+ template<class Comm>
+ struct woparbufstream : public basic_oparbufstream<Comm, wchar_t>
+ {
+ //! \copydoc basic_oparbufstream::basic_oparbufstream
+ woparbufstream(const Comm &comm_, std::wostream &backend_, int root_ = 0) :
+ basic_oparbufstream<Comm, wchar_t>(comm_, backend_, root_)
+ { }
+ };
+
+ /** }@ */
+}
+
+#endif // DUNE_COMMON_PARALLELSTREAM_HH
--
1.7.2.3
More information about the Dune
mailing list