1 // Written in the D programming language. 2 /* 3 NYSL Version 0.9982 4 5 A. This software is "Everyone'sWare". It means: 6 Anybody who has this software can use it as if he/she is 7 the author. 8 9 A-1. Freeware. No fee is required. 10 A-2. You can freely redistribute this software. 11 A-3. You can freely modify this software. And the source 12 may be used in any software with no limitation. 13 A-4. When you release a modified version to public, you 14 must publish it with your name. 15 16 B. The author is not responsible for any kind of damages or loss 17 while using or misusing this software, which is distributed 18 "AS IS". No warranty of any kind is expressed or implied. 19 You use AT YOUR OWN RISK. 20 21 C. Copyrighted to Kazuki KOMATSU 22 23 D. Above three clauses are applied both to source and binary 24 form of this software. 25 */ 26 27 /** 28 このモジュールでは、$(D_CODE std.concurrency)とは違うスレッド間通信を提供します。 29 30 このモジュールで提供するスレッド間通信は指定した型のみを送受信できるようなものです。 31 送受信ともに非同期に行われます。 32 */ 33 module carbon.channel; 34 35 import carbon.templates; 36 37 import std.typetuple; 38 39 import lock_free.dlist; 40 41 42 /** 43 スレッド間通信を提供します。 44 たとえば、スレッド間でデータを転送する場合に使用します。 45 */ 46 shared struct Channel(T...) 47 { 48 static shared(Channel!T) opCall() 49 { 50 typeof(return) dst; 51 foreach(i, E; T) 52 dst._chs[i] = new shared AtomicDList!E(); 53 54 return dst; 55 } 56 57 58 shared(Sender) sender() @property 59 { 60 return shared(Sender)(this); 61 } 62 63 64 shared(Receiver) reciever() @property 65 { 66 return shared(Receiver)(this); 67 } 68 69 70 static shared struct Sender 71 { 72 void put(U)(U v) 73 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 74 { 75 _ch.put(v); 76 } 77 78 79 private: 80 shared(Channel!T) _ch; 81 } 82 83 84 static shared struct Receiver 85 { 86 shared(U)* pop(U)() @property 87 if(isOneOfT!(U, T)) 88 { 89 return _ch.pop!U; 90 } 91 92 93 shared(AtomicDList!U) queue(U)() @property 94 if(isOneOfT!(U, T)) 95 { 96 return _ch.queue!U; 97 } 98 99 private: 100 shared(Channel!T) _ch; 101 } 102 103 104 void put(U)(U v) 105 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 106 { 107 foreach(i, E; T){ 108 static if(is(U == E)) 109 _chs[i].pushBack(v); 110 } 111 } 112 113 114 shared(U)* pop(U)() @property 115 if(isOneOfT!(U, T)) 116 { 117 foreach(i, E; T){ 118 static if(is(U == E)) 119 return _chs[i].popFront(); 120 } 121 122 assert(0); 123 } 124 125 126 shared(AtomicDList!U) queue(U)() @property 127 if(isOneOfT!(U, T)) 128 { 129 foreach(i, E; T){ 130 static if(is(U == E)) 131 return _chs[i]; 132 } 133 134 assert(0); 135 } 136 137 138 private: 139 AtomicDLists _chs; 140 141 alias AtomicDLists = ToTuple!(TRMap!(AtomicDList, ToTRange!T)); 142 143 template isOneOfT(A, T...){ 144 static if(T.length == 0) 145 enum bool isOneOfT = false; 146 else 147 enum bool isOneOfT = is(A == T[0]) || isOneOfT!(A, T[1 .. $]); 148 } 149 150 151 template CastOffShared(T) 152 { 153 static if(is(T == shared(U), U)) 154 alias CastOffShared = U; 155 else 156 alias CastOffShared = T; 157 } 158 } 159 160 161 shared struct NChannel(size_t N, T...) 162 { 163 static shared(NChannel!(N, T)) opCall() 164 { 165 typeof(return) dst; 166 167 foreach(i, E; T){ 168 foreach(j; 0 .. N) 169 _chs[i][j] = new shared AtomicDList!E; 170 } 171 172 return dst; 173 } 174 175 176 shared(Node!i) node(size_t i)() @property 177 { 178 return shared(Node!i)(this); 179 } 180 181 182 static shared struct Node(size_t i) 183 if(i < N) 184 { 185 void put(size_t j, U)(U v) 186 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && i != j) 187 { 188 _ch.put!j(v); 189 } 190 191 192 shared(U)* pop(U)() @property 193 if(isOneOfT!(U, T)) 194 { 195 return _ch.pop!i(); 196 } 197 198 199 shared(AtomicDList!U) queue(U)() @property 200 if(isOneOfT!(U, T)) 201 { 202 return _ch.queue!(i, U)(); 203 } 204 205 206 private: 207 shared(NChannel!(N, T)) _ch; 208 } 209 210 211 void put(size_t i, U)(U v) 212 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && isOneOfT!(U, T)) 213 { 214 foreach(j, E; T){ 215 static if(is(U == E)) 216 _chs[j][i].pushBack(v); 217 } 218 } 219 220 221 shared(U)* pop(size_t i, U)(shared U v) 222 if(isOneOfT!(U, T)) 223 { 224 foreach(j, E; T){ 225 static if(is(U == E)) 226 return _chs[j][i].popFront(); 227 } 228 assert(0); 229 } 230 231 232 shared(AtomicDList!U) queue(size_t i, U)() @property 233 if(isOneOfT!(U, T)) 234 { 235 foreach(j, E; T){ 236 static if(is(U == E)) 237 return _chs[j][i]; 238 } 239 } 240 241 private: 242 AtomicDLists _chs; 243 244 alias AtomicDListN(T) = AtomicDList!T[N]; 245 alias AtomicDLists = ToTuple!(TRMap!(AtomicDListN, ToTRange!T)); 246 247 alias isOneOfT = Channel!(int).isOneOfT; 248 } 249 250 251 /** 252 */ 253 shared(Channel!T) channel(T...)() @property 254 { 255 return shared(Channel!T)(); 256 } 257 258 259 /// 260 shared(NChannel!(N, T)) nchannel(size_t N, T...)() @property 261 { 262 return shared(NChannel!(N, T))(); 263 } 264 265 /// 266 unittest{ 267 import std.concurrency; 268 import std.conv; 269 270 auto ch1 = channel!(int, long, string); 271 auto ch2 = channel!(int, long, string); 272 273 static void spawnFuncSender(shared ch1.Sender s) 274 { 275 foreach(i; 0 .. 100){ 276 s.put(cast(int)i); 277 s.put(i + (long.max >> 1) + 1); 278 s.put(to!string(i)); 279 } 280 } 281 282 283 static void spawnFuncMiddle(shared ch1.Receiver r, 284 shared ch2.Sender s) 285 { 286 foreach(i; 0 .. 100){ 287 foreach(E; TypeTuple!(int, long, string)) 288 { 289 while(r.queue!E.empty){} 290 s.put!E(*r.pop!E); 291 } 292 } 293 } 294 295 spawn(&spawnFuncSender, ch1.sender); 296 spawn(&spawnFuncMiddle, ch1.reciever, 297 ch2.sender); 298 299 foreach(i; 0 .. 100){ 300 foreach(E; TypeTuple!(int, long, string)) 301 { 302 while(ch2.queue!E.empty){} 303 304 static if(is(E == int)) 305 assert(*ch2.pop!E == i); 306 else static if(is(E == long)) 307 assert(*ch2.pop!E == i + (long.max >> 1) + 1); 308 else 309 assert(*ch2.pop!E == i.to!string); 310 } 311 } 312 }