1 // Written in the D programming language.
2 /*
3 NYSL Version 0.9982
4 
5 A. This software is "Everyone'sWare". It means:
6   Anybody who has this software can use it as if he/she is
7   the author.
8 
9   A-1. Freeware. No fee is required.
10   A-2. You can freely redistribute this software.
11   A-3. You can freely modify this software. And the source
12       may be used in any software with no limitation.
13   A-4. When you release a modified version to public, you
14       must publish it with your name.
15 
16 B. The author is not responsible for any kind of damages or loss
17   while using or misusing this software, which is distributed
18   "AS IS". No warranty of any kind is expressed or implied.
19   You use AT YOUR OWN RISK.
20 
21 C. Copyrighted to Kazuki KOMATSU
22 
23 D. Above three clauses are applied both to source and binary
24   form of this software.
25 */
26 
27 /**
28 このモジュールでは、$(D_CODE std.concurrency)とは違うスレッド間通信を提供します。
29 
30 このモジュールで提供するスレッド間通信は指定した型のみを送受信できるようなものです。
31 送受信ともに非同期に行われます。
32 */
33 module carbon.channel;
34 
35 import carbon.templates;
36 
37 import std.typetuple;
38 
39 import lock_free.dlist;
40 
41 
42 /**
43 スレッド間通信を提供します。
44 たとえば、スレッド間でデータを転送する場合に使用します。
45 */
46 shared struct Channel(T...)
47 {
48     static shared(Channel!T) opCall()
49     {
50         typeof(return) dst;
51         foreach(i, E; T)
52             dst._chs[i] = new shared AtomicDList!E();
53 
54         return dst;
55     }
56 
57 
58     shared(Sender) sender() @property
59     {
60         return shared(Sender)(this);
61     }
62 
63 
64     shared(Receiver) receiver() @property
65     {
66         return shared(Receiver)(this);
67     }
68 
69 
70     static shared struct Sender
71     {
72         void put(U)(U v)
73         if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T))
74         {
75             _ch.put(v);
76         }
77 
78 
79       private:
80         shared(Channel!T) _ch;
81     }
82 
83 
84     static shared struct Receiver
85     {
86         shared(U)* pop(U)() @property
87         if(isOneOfT!(U, T))
88         {
89             return _ch.pop!U;
90         }
91 
92 
93         shared(AtomicDList!U) queue(U)() @property
94         if(isOneOfT!(U, T))
95         {
96             return _ch.queue!U;
97         }
98 
99       private:
100         shared(Channel!T) _ch;
101     }
102 
103 
104     void put(U)(U v)
105     if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T))
106     {
107         foreach(i, E; T){
108           static if(is(U == E))
109             _chs[i].pushBack(v);
110         }
111     }
112 
113 
114     shared(U)* pop(U)() @property
115     if(isOneOfT!(U, T))
116     {
117         foreach(i, E; T){
118           static if(is(U == E))
119             return _chs[i].popFront();
120         }
121 
122         assert(0);
123     }
124 
125 
126     /// return a queue managed this channel
127     shared(AtomicDList!U) queue(U)() @property
128     if(isOneOfT!(U, T))
129     {
130         foreach(i, E; T){
131           static if(is(U == E))
132             return _chs[i];
133         }
134 
135         assert(0);
136     }
137 
138 
139     ///// Wait until appending new element. This function is implemented by core.sync.Condition
140     //void waitNewElement()
141     //{
142     //    _cnd.wait();
143     //}
144 
145 
146     ///// if this channel is empty, wait until appending new element. This function is implemented by core.sync.Condtion
147     //void waitOneElement()
148     //{
149     //    if(_chs.empty) waitNewElement();
150     //}
151 
152 
153   private:
154     AtomicDLists _chs;
155     //Condition _cnd;
156 
157     alias AtomicDLists = ToTuple!(TRMap!(AtomicDList, ToTRange!T));
158 }
159 
160 
161 shared struct NChannel(size_t N, T...)
162 {
163     static shared(NChannel!(N, T)) opCall()
164     {
165         typeof(return) dst;
166 
167         foreach(i, E; T){
168             foreach(j; 0 .. N)
169                 _chs[i][j] = new shared AtomicDList!E;
170         }
171 
172         return dst;
173     }
174 
175 
176     shared(Node!i) node(size_t i)() @property
177     {
178         return shared(Node!i)(this);
179     }
180 
181 
182     static shared struct Node(size_t i)
183     if(i < N)
184     {
185         void put(size_t j, U)(U v)
186         if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && i != j)
187         {
188             _ch.put!j(v);
189         }
190 
191 
192         shared(U)* pop(U)() @property
193         if(isOneOfT!(U, T))
194         {
195             return _ch.pop!i();
196         }
197 
198 
199         shared(AtomicDList!U) queue(U)() @property
200         if(isOneOfT!(U, T))
201         {
202             return _ch.queue!(i, U)();
203         }
204 
205 
206       private:
207         shared(NChannel!(N, T)) _ch;
208     }
209 
210 
211     void put(size_t i, U)(U v)
212     if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && isOneOfT!(U, T))
213     {
214         foreach(j, E; T){
215           static if(is(U == E))
216             _chs[j][i].pushBack(v);
217         }
218     }
219 
220 
221     shared(U)* pop(size_t i, U)(shared U v)
222     if(isOneOfT!(U, T))
223     {
224         foreach(j, E; T){
225           static if(is(U == E))
226             return _chs[j][i].popFront();
227         }
228         assert(0);
229     }
230 
231 
232     shared(AtomicDList!U) queue(size_t i, U)() @property
233     if(isOneOfT!(U, T))
234     {
235         foreach(j, E; T){
236           static if(is(U == E))
237             return _chs[j][i];
238         }
239     }
240 
241   private:
242     AtomicDLists _chs;
243 
244     alias AtomicDListN(T) = AtomicDList!T[N];
245     alias AtomicDLists = ToTuple!(TRMap!(AtomicDListN, ToTRange!T));
246 
247     alias isOneOfT = Channel!(int).isOneOfT;
248 }
249 
250 
251 /**
252 */
253 shared(Channel!T) channel(T...)() @property
254 {
255     return shared(Channel!T)();
256 }
257 
258 
259 ///
260 shared(NChannel!(N, T)) nchannel(size_t N, T...)() @property
261 {
262     return shared(NChannel!(N, T))();
263 }
264 
265 ///
266 unittest{
267     import std.concurrency;
268     import std.conv;
269 
270     auto ch1 = channel!(int, long, string);
271     auto ch2 = channel!(int, long, string);
272 
273     static void spawnFuncSender(shared ch1.Sender s)
274     {
275         foreach(i; 0 .. 100){
276             s.put(cast(int)i);
277             s.put(i + (long.max >> 1) + 1);
278             s.put(to!string(i));
279         }
280     }
281 
282 
283     static void spawnFuncMiddle(shared ch1.Receiver r,
284                                 shared ch2.Sender s)
285     {
286         foreach(i; 0 .. 100){
287             foreach(E; TypeTuple!(int, long, string))
288             {
289                 while(r.queue!E.empty){}
290                 s.put!E(*r.pop!E);
291             }
292         }
293     }
294 
295     spawn(&spawnFuncSender, ch1.sender);
296     spawn(&spawnFuncMiddle, ch1.receiver,
297                             ch2.sender);
298 
299     foreach(i; 0 .. 100){
300         foreach(E; TypeTuple!(int, long, string))
301         {
302             while(ch2.queue!E.empty){}
303 
304           static if(is(E == int))
305             assert(*ch2.pop!E == i);
306           else static if(is(E == long))
307             assert(*ch2.pop!E == i + (long.max >> 1) + 1);
308           else
309             assert(*ch2.pop!E == i.to!string);
310         }
311     }
312 }
313 
314 unittest{
315     import std.variant;
316 
317     alias Msg = Algebraic!(int, string);
318     alias Ch = shared(Channel!Msg);
319 
320     auto ch = Ch();
321 }
322 
323 
324 private:
325 template isOneOfT(A, T...)
326 {
327   static if(T.length == 0)
328     enum bool isOneOfT = false;
329   else
330     enum bool isOneOfT = is(A == T[0]) || isOneOfT!(A, T[1 .. $]);
331 }
332 
333 
334 template CastOffShared(T)
335 {
336   static if(is(T == shared(U), U))
337     alias CastOffShared = U;
338   else
339     alias CastOffShared = T;
340 }