1 // Written in the D programming language. 2 /* 3 NYSL Version 0.9982 4 5 A. This software is "Everyone'sWare". It means: 6 Anybody who has this software can use it as if he/she is 7 the author. 8 9 A-1. Freeware. No fee is required. 10 A-2. You can freely redistribute this software. 11 A-3. You can freely modify this software. And the source 12 may be used in any software with no limitation. 13 A-4. When you release a modified version to public, you 14 must publish it with your name. 15 16 B. The author is not responsible for any kind of damages or loss 17 while using or misusing this software, which is distributed 18 "AS IS". No warranty of any kind is expressed or implied. 19 You use AT YOUR OWN RISK. 20 21 C. Copyrighted to Kazuki KOMATSU 22 23 D. Above three clauses are applied both to source and binary 24 form of this software. 25 */ 26 27 /** 28 このモジュールでは、$(D_CODE std.concurrency)とは違うスレッド間通信を提供します。 29 30 このモジュールで提供するスレッド間通信は指定した型のみを送受信できるようなものです。 31 送受信ともに非同期に行われます。 32 */ 33 module carbon.channel; 34 35 import carbon.templates; 36 37 import std.typetuple; 38 39 import lock_free.dlist; 40 41 42 /** 43 スレッド間通信を提供します。 44 たとえば、スレッド間でデータを転送する場合に使用します。 45 */ 46 shared struct Channel(T...) 47 { 48 static shared(Channel!T) opCall() 49 { 50 typeof(return) dst; 51 foreach(i, E; T) 52 dst._chs[i] = new shared AtomicDList!E(); 53 54 return dst; 55 } 56 57 58 shared(Sender) sender() @property 59 { 60 return shared(Sender)(this); 61 } 62 63 64 shared(Receiver) receiver() @property 65 { 66 return shared(Receiver)(this); 67 } 68 69 70 static shared struct Sender 71 { 72 void put(U)(U v) 73 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 74 { 75 _ch.put(v); 76 } 77 78 79 private: 80 shared(Channel!T) _ch; 81 } 82 83 84 static shared struct Receiver 85 { 86 shared(U)* pop(U)() @property 87 if(isOneOfT!(U, T)) 88 { 89 return _ch.pop!U; 90 } 91 92 93 shared(AtomicDList!U) queue(U)() @property 94 if(isOneOfT!(U, T)) 95 { 96 return _ch.queue!U; 97 } 98 99 private: 100 shared(Channel!T) _ch; 101 } 102 103 104 void put(U)(U v) 105 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 106 { 107 foreach(i, E; T){ 108 static if(is(U == E)) 109 _chs[i].pushBack(v); 110 } 111 } 112 113 114 shared(U)* pop(U)() @property 115 if(isOneOfT!(U, T)) 116 { 117 foreach(i, E; T){ 118 static if(is(U == E)) 119 return _chs[i].popFront(); 120 } 121 122 assert(0); 123 } 124 125 126 /// return a queue managed this channel 127 shared(AtomicDList!U) queue(U)() @property 128 if(isOneOfT!(U, T)) 129 { 130 foreach(i, E; T){ 131 static if(is(U == E)) 132 return _chs[i]; 133 } 134 135 assert(0); 136 } 137 138 139 ///// Wait until appending new element. This function is implemented by core.sync.Condition 140 //void waitNewElement() 141 //{ 142 // _cnd.wait(); 143 //} 144 145 146 ///// if this channel is empty, wait until appending new element. This function is implemented by core.sync.Condtion 147 //void waitOneElement() 148 //{ 149 // if(_chs.empty) waitNewElement(); 150 //} 151 152 153 private: 154 AtomicDLists _chs; 155 //Condition _cnd; 156 157 alias AtomicDLists = ToTuple!(TRMap!(AtomicDList, ToTRange!T)); 158 } 159 160 161 shared struct NChannel(size_t N, T...) 162 { 163 static shared(NChannel!(N, T)) opCall() 164 { 165 typeof(return) dst; 166 167 foreach(i, E; T){ 168 foreach(j; 0 .. N) 169 _chs[i][j] = new shared AtomicDList!E; 170 } 171 172 return dst; 173 } 174 175 176 shared(Node!i) node(size_t i)() @property 177 { 178 return shared(Node!i)(this); 179 } 180 181 182 static shared struct Node(size_t i) 183 if(i < N) 184 { 185 void put(size_t j, U)(U v) 186 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && i != j) 187 { 188 _ch.put!j(v); 189 } 190 191 192 shared(U)* pop(U)() @property 193 if(isOneOfT!(U, T)) 194 { 195 return _ch.pop!i(); 196 } 197 198 199 shared(AtomicDList!U) queue(U)() @property 200 if(isOneOfT!(U, T)) 201 { 202 return _ch.queue!(i, U)(); 203 } 204 205 206 private: 207 shared(NChannel!(N, T)) _ch; 208 } 209 210 211 void put(size_t i, U)(U v) 212 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && isOneOfT!(U, T)) 213 { 214 foreach(j, E; T){ 215 static if(is(U == E)) 216 _chs[j][i].pushBack(v); 217 } 218 } 219 220 221 shared(U)* pop(size_t i, U)(shared U v) 222 if(isOneOfT!(U, T)) 223 { 224 foreach(j, E; T){ 225 static if(is(U == E)) 226 return _chs[j][i].popFront(); 227 } 228 assert(0); 229 } 230 231 232 shared(AtomicDList!U) queue(size_t i, U)() @property 233 if(isOneOfT!(U, T)) 234 { 235 foreach(j, E; T){ 236 static if(is(U == E)) 237 return _chs[j][i]; 238 } 239 } 240 241 private: 242 AtomicDLists _chs; 243 244 alias AtomicDListN(T) = AtomicDList!T[N]; 245 alias AtomicDLists = ToTuple!(TRMap!(AtomicDListN, ToTRange!T)); 246 247 alias isOneOfT = Channel!(int).isOneOfT; 248 } 249 250 251 /** 252 */ 253 shared(Channel!T) channel(T...)() @property 254 { 255 return shared(Channel!T)(); 256 } 257 258 259 /// 260 shared(NChannel!(N, T)) nchannel(size_t N, T...)() @property 261 { 262 return shared(NChannel!(N, T))(); 263 } 264 265 /// 266 unittest{ 267 import std.concurrency; 268 import std.conv; 269 270 auto ch1 = channel!(int, long, string); 271 auto ch2 = channel!(int, long, string); 272 273 static void spawnFuncSender(shared ch1.Sender s) 274 { 275 foreach(i; 0 .. 100){ 276 s.put(cast(int)i); 277 s.put(i + (long.max >> 1) + 1); 278 s.put(to!string(i)); 279 } 280 } 281 282 283 static void spawnFuncMiddle(shared ch1.Receiver r, 284 shared ch2.Sender s) 285 { 286 foreach(i; 0 .. 100){ 287 foreach(E; TypeTuple!(int, long, string)) 288 { 289 while(r.queue!E.empty){} 290 s.put!E(*r.pop!E); 291 } 292 } 293 } 294 295 spawn(&spawnFuncSender, ch1.sender); 296 spawn(&spawnFuncMiddle, ch1.receiver, 297 ch2.sender); 298 299 foreach(i; 0 .. 100){ 300 foreach(E; TypeTuple!(int, long, string)) 301 { 302 while(ch2.queue!E.empty){} 303 304 static if(is(E == int)) 305 assert(*ch2.pop!E == i); 306 else static if(is(E == long)) 307 assert(*ch2.pop!E == i + (long.max >> 1) + 1); 308 else 309 assert(*ch2.pop!E == i.to!string); 310 } 311 } 312 } 313 314 unittest{ 315 import std.variant; 316 317 alias Msg = Algebraic!(int, string); 318 alias Ch = shared(Channel!Msg); 319 320 auto ch = Ch(); 321 } 322 323 324 private: 325 template isOneOfT(A, T...) 326 { 327 static if(T.length == 0) 328 enum bool isOneOfT = false; 329 else 330 enum bool isOneOfT = is(A == T[0]) || isOneOfT!(A, T[1 .. $]); 331 } 332 333 334 template CastOffShared(T) 335 { 336 static if(is(T == shared(U), U)) 337 alias CastOffShared = U; 338 else 339 alias CastOffShared = T; 340 }