1 // Written in the D programming language.
2 /*
3 NYSL Version 0.9982
4 
5 A. This software is "Everyone'sWare". It means:
6   Anybody who has this software can use it as if he/she is
7   the author.
8 
9   A-1. Freeware. No fee is required.
10   A-2. You can freely redistribute this software.
11   A-3. You can freely modify this software. And the source
12       may be used in any software with no limitation.
13   A-4. When you release a modified version to public, you
14       must publish it with your name.
15 
16 B. The author is not responsible for any kind of damages or loss
17   while using or misusing this software, which is distributed
18   "AS IS". No warranty of any kind is expressed or implied.
19   You use AT YOUR OWN RISK.
20 
21 C. Copyrighted to Kazuki KOMATSU
22 
23 D. Above three clauses are applied both to source and binary
24   form of this software.
25 */
26 
27 /**
28 このモジュールでは、$(D_CODE std.concurrency)とは違うスレッド間通信を提供します。
29 
30 このモジュールで提供するスレッド間通信は指定した型のみを送受信できるようなものです。
31 送受信ともに非同期に行われます。
32 */
33 module carbon.channel;
34 
35 import carbon.templates;
36 
37 import std.typetuple;
38 
39 import lock_free.dlist;
40 
41 
42 /**
43 スレッド間通信を提供します。
44 たとえば、スレッド間でデータを転送する場合に使用します。
45 */
46 shared struct Channel(T...)
47 {
48     static shared(Channel!T) opCall()
49     {
50         typeof(return) dst;
51         foreach(i, E; T)
52             dst._chs[i] = new shared AtomicDList!E();
53 
54         return dst;
55     }
56 
57 
58     shared(Sender) sender() @property
59     {
60         return shared(Sender)(this);
61     }
62 
63 
64     shared(Receiver) reciever() @property
65     {
66         return shared(Receiver)(this);
67     }
68 
69 
70     static shared struct Sender
71     {
72         void put(U)(U v)
73         if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T))
74         {
75             _ch.put(v);
76         }
77 
78 
79       private:
80         shared(Channel!T) _ch;
81     }
82 
83 
84     static shared struct Receiver
85     {
86         shared(U)* pop(U)() @property
87         if(isOneOfT!(U, T))
88         {
89             return _ch.pop!U;
90         }
91 
92 
93         shared(AtomicDList!U) queue(U)() @property
94         if(isOneOfT!(U, T))
95         {
96             return _ch.queue!U;
97         }
98 
99       private:
100         shared(Channel!T) _ch;
101     }
102 
103 
104     void put(U)(U v)
105     if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T))
106     {
107         foreach(i, E; T){
108           static if(is(U == E))
109             _chs[i].pushBack(v);
110         }
111     }
112 
113 
114     shared(U)* pop(U)() @property
115     if(isOneOfT!(U, T))
116     {
117         foreach(i, E; T){
118           static if(is(U == E))
119             return _chs[i].popFront();
120         }
121 
122         assert(0);
123     }
124 
125 
126     shared(AtomicDList!U) queue(U)() @property
127     if(isOneOfT!(U, T))
128     {
129         foreach(i, E; T){
130           static if(is(U == E))
131             return _chs[i];
132         }
133 
134         assert(0);
135     }
136 
137 
138   private:
139     AtomicDLists _chs;
140 
141     alias AtomicDLists = ToTuple!(TRMap!(AtomicDList, ToTRange!T));
142 
143     template isOneOfT(A, T...)
144     {
145       static if(T.length == 0)
146         enum bool isOneOfT = false;
147       else
148         enum bool isOneOfT = is(A == T[0]) || isOneOfT!(A, T[1 .. $]);
149     }
150 
151 
152     template CastOffShared(T)
153     {
154       static if(is(T == shared(U), U))
155         alias CastOffShared = U;
156       else
157         alias CastOffShared = T;
158     }
159 }
160 
161 
162 shared struct NChannel(size_t N, T...)
163 {
164     static shared(NChannel!(N, T)) opCall()
165     {
166         typeof(return) dst;
167 
168         foreach(i, E; T){
169             foreach(j; 0 .. N)
170                 _chs[i][j] = new shared AtomicDList!E;
171         }
172 
173         return dst;
174     }
175 
176 
177     shared(Node!i) node(size_t i)() @property
178     {
179         return shared(Node!i)(this);
180     }
181 
182 
183     static shared struct Node(size_t i)
184     if(i < N)
185     {
186         void put(size_t j, U)(U v)
187         if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && i != j)
188         {
189             _ch.put!j(v);
190         }
191 
192 
193         shared(U)* pop(U)() @property
194         if(isOneOfT!(U, T))
195         {
196             return _ch.pop!i();
197         }
198 
199 
200         shared(AtomicDList!U) queue(U)() @property
201         if(isOneOfT!(U, T))
202         {
203             return _ch.queue!(i, U)();
204         }
205 
206 
207       private:
208         shared(NChannel!(N, T)) _ch;
209     }
210 
211 
212     void put(size_t i, U)(U v)
213     if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && isOneOfT!(U, T))
214     {
215         foreach(j, E; T){
216           static if(is(U == E))
217             _chs[j][i].pushBack(v);
218         }
219     }
220 
221 
222     shared(U)* pop(size_t i, U)(shared U v)
223     if(isOneOfT!(U, T))
224     {
225         foreach(j, E; T){
226           static if(is(U == E))
227             return _chs[j][i].popFront();
228         }
229         assert(0);
230     }
231 
232 
233     shared(AtomicDList!U) queue(size_t i, U)() @property
234     if(isOneOfT!(U, T))
235     {
236         foreach(j, E; T){
237           static if(is(U == E))
238             return _chs[j][i];
239         }
240     }
241 
242   private:
243     AtomicDLists _chs;
244 
245     alias AtomicDListN(T) = AtomicDList!T[N];
246     alias AtomicDLists = ToTuple!(TRMap!(AtomicDListN, ToTRange!T));
247 
248     alias isOneOfT = Channel!(int).isOneOfT;
249 }
250 
251 
252 /**
253 */
254 shared(Channel!T) channel(T...)() @property
255 {
256     return shared(Channel!T)();
257 }
258 
259 
260 ///
261 shared(NChannel!(N, T)) nchannel(size_t N, T...)() @property
262 {
263     return shared(NChannel!(N, T))();
264 }
265 
266 ///
267 unittest{
268     import std.concurrency;
269     import std.conv;
270 
271     auto ch1 = channel!(int, long, string);
272     auto ch2 = channel!(int, long, string);
273 
274     static void spawnFuncSender(shared ch1.Sender s)
275     {
276         foreach(i; 0 .. 100){
277             s.put(cast(int)i);
278             s.put(i + (long.max >> 1) + 1);
279             s.put(to!string(i));
280         }
281     }
282 
283 
284     static void spawnFuncMiddle(shared ch1.Receiver r,
285                                 shared ch2.Sender s)
286     {
287         foreach(i; 0 .. 100){
288             foreach(E; TypeTuple!(int, long, string))
289             {
290                 while(r.queue!E.empty){}
291                 s.put!E(*r.pop!E);
292             }
293         }
294     }
295 
296     spawn(&spawnFuncSender, ch1.sender);
297     spawn(&spawnFuncMiddle, ch1.reciever,
298                             ch2.sender);
299 
300     foreach(i; 0 .. 100){
301         foreach(E; TypeTuple!(int, long, string))
302         {
303             while(ch2.queue!E.empty){}
304 
305           static if(is(E == int))
306             assert(*ch2.pop!E == i);
307           else static if(is(E == long))
308             assert(*ch2.pop!E == i + (long.max >> 1) + 1);
309           else
310             assert(*ch2.pop!E == i.to!string);
311         }
312     }
313 }