ython has GIL. Hence, we cannot do a fully mutlithreaded test using the API
presented in the previous section. The native Python multiprocessor libraries
rely on processes and serialization to do multi-CPU processing and
communication between jobs. This is not what we want to do here. Therefore, we
need to start working threads from C++. We add some code, see below.
01|#include <boost/python.hpp>
02|#include "PythonNonBlockingQueue.hpp"
03|#include "NonBlockingQueue.hpp"
04|#include <ots/otsConfig.hpp>
05|#include <ots/utils/toString.hpp>
06|#include <boost/function.hpp>
07|#include <boost/bind.hpp>
08|#include <ots/math/random/uniform.hpp>
09|
10|namespace ots { namespace scheduler { namespace
testNonBlockingQueue {
11|
12|template
<class Data> class WrapperTmpl : boost::noncopyable
13|{
14|public:
15|explicit
WrapperTmpl(const Data& d) : theData(d) {}
16|const
Data theData;
17|std::string
toString() const
18|{
19|std::stringstream
os;
20|os<<theData<<'\0';
21|return
os.str();
22|}
23|};
24|
25|typedef
std::string Data;
26|typedef
WrapperTmpl<Data> Wrapper;
27|typedef
ots::scheduler::NonBlockingQueue<Wrapper> Queue;
28|typedef
Queue::Node Node;
29|
41|std::pair<bool,std::string>
wrapperToString( volatile Wrapper* w )
42|{
43|const
Wrapper& ww=*const_cast<Wrapper*>(w);
44|return
std::pair<bool,std::string>(true,ww.toString());
45|}
46|std::string
queueToString( const Queue& queue )
47|{
48|return
queue.toString(wrapperToString);
49|}
50|std::string
nodeToString( const Node& node )
51|{
52|return
node.toString(wrapperToString);
53|}
54|
55|Node*
makeNode( Wrapper& w )
56|{
57|return
new Node(w);
58|}
59|
60|struct
StaticQueue
61|{
62|Queue&
queue()
63|{
64|static
Queue q;
65|return
q;
66|}
67|};
68|
69|class
TestThread
70|{
71|private:
72|boost::shared_ptr<std::vector<Node*>
> theNodes;
73|public:
74|TestThread(
const std::string& name, int N, int times )
75|:
theNodes(new std::vector<Node*>)
76|{
77|for(
int i=0; i<N; ++i )
78|{
79|Data*
d=new Data(name+boost::lexical_cast<std::string>(i));
80|Wrapper*
w=new Wrapper(*d);
81|Node*
n=new Node(*w);
82|theNodes->push_back(n);
83|}
84|boost::thread(boost::bind(&TestThread::run,*this,times));
85|}
86|TestThread(
const TestThread& th ) : theNodes(th.theNodes) {}
87|TestThread&
operator=( const TestThread& th ) { theNodes=th.theNodes; return *this;
}
88|void
run( int times )
89|{
90|std::size_t
N=theNodes->size();
91|for(
int i=0; i<times; ++i )
92|{
93|std::vector<Node*>::const_iterator
e=theNodes->end();
94|for(
std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j
)
95|{
96|double
u=ots::random::uniform();
97|if(
u<0.5)
98|StaticQueue().queue().push(**j);
99|}
100|for(
std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j
)
101|{
102|double
u=ots::random::uniform();
103|if(
u<0.5 )
104|(*j)->remove();
105|}
106|double
u=ots::random::uniform();
107|if(
u<0.5 )
108|ots::threading::SmallRandomDelay();
109|}
110|}
111|};
112|
113|void
pythonNonBlockingQueue()
114|{
115|using
namespace boost::python;
116|def("simpleTest",&simpleTest);
117|class_<Wrapper,boost::noncopyable>("Wrapper",init<std::string>())
118|.def("__str__",&Wrapper::toString)
119|;
120|class_<Queue,boost::noncopyable>("Queue")
121|.def("__str__",&queueToString)
122|.def("pop",
123|&Queue::boostPythonPop,
124|"Returning
reference to something that was previously pushed.\n"
125|"Relies
on existence of the object\n",
126|return_internal_reference<>()
127|)
128|.def("push",&Queue::push)
129|;
130|class_<StaticQueue>("StaticQueue")
131|.def("queue",
132|&StaticQueue::queue,
133|"returns
a static Queue object",
134|return_internal_reference<>()
135|)
136|;
137|class_<Node,boost::noncopyable>("Node")
138|.def("__str__",&nodeToString)
139|.def("remove",&Node::remove)
140|;
141|def("makeNode",&makeNode,return_value_policy<manage_new_object>());
142|class_<ots::threading::SmallRandomDelay,boost::noncopyable>("SmallRandomDelay")
143|;
144|class_<TestThread>("TestThread",init<std::string,int,int>())
146|;
147|}
148|
149|}}} //namespace ots,scheduler
The code is based on the previous example. The additions are explained below.
Lines 60-67. We need a static Queue to share among working threads. The
introduction of StaticQueue class is a workaround a boost::python problem. For
some reason, making queue() a stand-alone function does not go well with
return_internal_reference<>() template, line 134.
Lines 69-111. This is the working thread. We plan to instantiate several. In
the constructor 74-85 we create std::string,Wrapper,Node objects and store
them in the std::vector<Node*>.
Line 72. We use shared_ptr because we want TestThread to have pass-by-value
semantics implemented in lines 86,87 and used in the line 84.
Line 84. We start the working thread here. See boost::thread and boost::bind
manuals for explanation of syntax.
Lines 88-109. We push and pop Nodes repeatedly with some intentionally random
behavior. The ots::random::uniform() returns a simulated uniform [0,1] random
variable.
Once we have a binary library "test.pyd" containing the above code we may
start Python interpreter and perform the following session.
In [1]: import test
In [2]: test.TestThread("a",100,1000000)
Out[2]: <test.TestThread object at
0x01CBFAB0>
In [3]: test.TestThread("b",100,1000000)
Out[3]: <test.TestThread object at
0x0217EFC0>
In [4]: test.TestThread("c",100,1000000)
Out[4]: <test.TestThread object at
0x0217EF90>
In [5]: test.TestThread("d",100,1000000)
Out[5]: <test.TestThread object at
0x01D98540>
In [6]: test.TestThread("e",100,1000000)
Out[6]: <test.TestThread object at
0x0218D1B0>
In [7]: test.TestThread("f",100,1000000)
Out[7]: <test.TestThread object at
0x0218D240>
In [8]:
At this point there should be a visible workload on the CPU. The static Queue
receives heavy flow of requests. We may simultaneously experiment with it.
In [8]: q=test.StaticQueue().queue()
In [9]: print q
<Output is removed for brevity>
In [10]: s1="TestString1"
In [11]: w1=test.Wrapper(s1)
In [12]: n1=test.makeNode(w1)
In [13]: print n1
Node( TestString1 )
In [14]: q.push(n1)
In [15]: print q
At this point the TestString1 should be visible among contents of the Queue.
We may also apply pop operation, since it is not tested in the C++ code:
In [16]: L=[q.pop() for x in range(0,100000)]
In [17]: print q
The pop() operation may or may not remove TestString1 from the Queue.
|