1 // Written in the D programming language. 2 /* 3 NYSL Version 0.9982 4 5 A. This software is "Everyone'sWare". It means: 6 Anybody who has this software can use it as if he/she is 7 the author. 8 9 A-1. Freeware. No fee is required. 10 A-2. You can freely redistribute this software. 11 A-3. You can freely modify this software. And the source 12 may be used in any software with no limitation. 13 A-4. When you release a modified version to public, you 14 must publish it with your name. 15 16 B. The author is not responsible for any kind of damages or loss 17 while using or misusing this software, which is distributed 18 "AS IS". No warranty of any kind is expressed or implied. 19 You use AT YOUR OWN RISK. 20 21 C. Copyrighted to Kazuki KOMATSU 22 23 D. Above three clauses are applied both to source and binary 24 form of this software. 25 */ 26 27 /** 28 このモジュールでは、$(D_CODE std.concurrency)とは違うスレッド間通信を提供します。 29 30 このモジュールで提供するスレッド間通信は指定した型のみを送受信できるようなものです。 31 送受信ともに非同期に行われます。 32 */ 33 module carbon.channel; 34 35 import carbon.templates; 36 37 import std.typetuple; 38 39 import lock_free.dlist; 40 41 42 /** 43 スレッド間通信を提供します。 44 たとえば、スレッド間でデータを転送する場合に使用します。 45 */ 46 shared struct Channel(T...) 47 { 48 static shared(Channel!T) opCall() 49 { 50 typeof(return) dst; 51 foreach(i, E; T) 52 dst._chs[i] = new shared AtomicDList!E(); 53 54 return dst; 55 } 56 57 58 shared(Sender) sender() @property 59 { 60 return shared(Sender)(this); 61 } 62 63 64 shared(Receiver) reciever() @property 65 { 66 return shared(Receiver)(this); 67 } 68 69 70 static shared struct Sender 71 { 72 void put(U)(U v) 73 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 74 { 75 _ch.put(v); 76 } 77 78 79 private: 80 shared(Channel!T) _ch; 81 } 82 83 84 static shared struct Receiver 85 { 86 shared(U)* pop(U)() @property 87 if(isOneOfT!(U, T)) 88 { 89 return _ch.pop!U; 90 } 91 92 93 shared(AtomicDList!U) queue(U)() @property 94 if(isOneOfT!(U, T)) 95 { 96 return _ch.queue!U; 97 } 98 99 private: 100 shared(Channel!T) _ch; 101 } 102 103 104 void put(U)(U v) 105 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T)) 106 { 107 foreach(i, E; T){ 108 static if(is(U == E)) 109 _chs[i].pushBack(v); 110 } 111 } 112 113 114 shared(U)* pop(U)() @property 115 if(isOneOfT!(U, T)) 116 { 117 foreach(i, E; T){ 118 static if(is(U == E)) 119 return _chs[i].popFront(); 120 } 121 122 assert(0); 123 } 124 125 126 shared(AtomicDList!U) queue(U)() @property 127 if(isOneOfT!(U, T)) 128 { 129 foreach(i, E; T){ 130 static if(is(U == E)) 131 return _chs[i]; 132 } 133 134 assert(0); 135 } 136 137 138 private: 139 AtomicDLists _chs; 140 141 alias AtomicDLists = ToTuple!(TRMap!(AtomicDList, ToTRange!T)); 142 143 template isOneOfT(A, T...) 144 { 145 static if(T.length == 0) 146 enum bool isOneOfT = false; 147 else 148 enum bool isOneOfT = is(A == T[0]) || isOneOfT!(A, T[1 .. $]); 149 } 150 151 152 template CastOffShared(T) 153 { 154 static if(is(T == shared(U), U)) 155 alias CastOffShared = U; 156 else 157 alias CastOffShared = T; 158 } 159 } 160 161 162 shared struct NChannel(size_t N, T...) 163 { 164 static shared(NChannel!(N, T)) opCall() 165 { 166 typeof(return) dst; 167 168 foreach(i, E; T){ 169 foreach(j; 0 .. N) 170 _chs[i][j] = new shared AtomicDList!E; 171 } 172 173 return dst; 174 } 175 176 177 shared(Node!i) node(size_t i)() @property 178 { 179 return shared(Node!i)(this); 180 } 181 182 183 static shared struct Node(size_t i) 184 if(i < N) 185 { 186 void put(size_t j, U)(U v) 187 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && i != j) 188 { 189 _ch.put!j(v); 190 } 191 192 193 shared(U)* pop(U)() @property 194 if(isOneOfT!(U, T)) 195 { 196 return _ch.pop!i(); 197 } 198 199 200 shared(AtomicDList!U) queue(U)() @property 201 if(isOneOfT!(U, T)) 202 { 203 return _ch.queue!(i, U)(); 204 } 205 206 207 private: 208 shared(NChannel!(N, T)) _ch; 209 } 210 211 212 void put(size_t i, U)(U v) 213 if(is(U : shared(U)) && isOneOfT!(CastOffShared!U, T) && isOneOfT!(U, T)) 214 { 215 foreach(j, E; T){ 216 static if(is(U == E)) 217 _chs[j][i].pushBack(v); 218 } 219 } 220 221 222 shared(U)* pop(size_t i, U)(shared U v) 223 if(isOneOfT!(U, T)) 224 { 225 foreach(j, E; T){ 226 static if(is(U == E)) 227 return _chs[j][i].popFront(); 228 } 229 assert(0); 230 } 231 232 233 shared(AtomicDList!U) queue(size_t i, U)() @property 234 if(isOneOfT!(U, T)) 235 { 236 foreach(j, E; T){ 237 static if(is(U == E)) 238 return _chs[j][i]; 239 } 240 } 241 242 private: 243 AtomicDLists _chs; 244 245 alias AtomicDListN(T) = AtomicDList!T[N]; 246 alias AtomicDLists = ToTuple!(TRMap!(AtomicDListN, ToTRange!T)); 247 248 alias isOneOfT = Channel!(int).isOneOfT; 249 } 250 251 252 /** 253 */ 254 shared(Channel!T) channel(T...)() @property 255 { 256 return shared(Channel!T)(); 257 } 258 259 260 /// 261 shared(NChannel!(N, T)) nchannel(size_t N, T...)() @property 262 { 263 return shared(NChannel!(N, T))(); 264 } 265 266 /// 267 unittest{ 268 import std.concurrency; 269 import std.conv; 270 271 auto ch1 = channel!(int, long, string); 272 auto ch2 = channel!(int, long, string); 273 274 static void spawnFuncSender(shared ch1.Sender s) 275 { 276 foreach(i; 0 .. 100){ 277 s.put(cast(int)i); 278 s.put(i + (long.max >> 1) + 1); 279 s.put(to!string(i)); 280 } 281 } 282 283 284 static void spawnFuncMiddle(shared ch1.Receiver r, 285 shared ch2.Sender s) 286 { 287 foreach(i; 0 .. 100){ 288 foreach(E; TypeTuple!(int, long, string)) 289 { 290 while(r.queue!E.empty){} 291 s.put!E(*r.pop!E); 292 } 293 } 294 } 295 296 spawn(&spawnFuncSender, ch1.sender); 297 spawn(&spawnFuncMiddle, ch1.reciever, 298 ch2.sender); 299 300 foreach(i; 0 .. 100){ 301 foreach(E; TypeTuple!(int, long, string)) 302 { 303 while(ch2.queue!E.empty){} 304 305 static if(is(E == int)) 306 assert(*ch2.pop!E == i); 307 else static if(is(E == long)) 308 assert(*ch2.pop!E == i + (long.max >> 1) + 1); 309 else 310 assert(*ch2.pop!E == i.to!string); 311 } 312 } 313 }