1 module carbon.actor; 2 3 import core.thread; 4 import core.atomic; 5 6 import std.concurrency; 7 import std.traits; 8 9 import carbon.traits; 10 11 12 /** 13 Actorが有するイベントハンドラを示すためのUDAです. 14 */ 15 enum ThreadEvent; 16 17 18 struct ThreadEventMethod(Params...) 19 { 20 alias Parameters = Params; 21 string identifier; 22 } 23 24 25 /** 26 Tがもつ,すべてのActorイベントハンドラをThreadEventMethod型にして返します. 27 */ 28 template ThreadEventMethods(T) 29 { 30 import std.meta : AliasSeq; 31 import std.traits : Parameters; 32 33 template ThreadEventMethodsImpl(Members...) 34 { 35 static if(Members.length == 0) 36 alias ThreadEventMethodsImpl = AliasSeq!(); 37 else 38 { 39 mixin(`alias method = T.` ~ Members[0] ~ ";"); 40 41 static if(hasUDA!(method, ThreadEvent)) 42 alias ThreadEventMethodsImpl = AliasSeq!(ThreadEventMethod!(Parameters!method)(Members[0]), ThreadEventMethodsImpl!(Members[1 .. $])); 43 else 44 alias ThreadEventMethodsImpl = ThreadEventMethodsImpl!(Members[1 .. $]); 45 } 46 } 47 48 alias ThreadEventMethods = ThreadEventMethodsImpl!(__traits(allMembers, T)); 49 } 50 51 /// 52 unittest 53 { 54 static struct TestActor1 { @ThreadEvent void foo(string) {} } 55 56 static assert(ThreadEventMethods!TestActor1.length == 1); 57 static assert(ThreadEventMethods!TestActor1[0].identifier == "foo"); 58 static assert(is(ThreadEventMethods!TestActor1[0].Parameters[0] == string)); 59 60 61 static struct TestActor2 { void foo(string) {} } 62 63 static assert(ThreadEventMethods!TestActor2.length == 0); 64 65 66 static struct TestActor3 67 { 68 @ThreadEvent 69 void foo() {} 70 71 @ThreadEvent 72 void bar(string) {} 73 74 void hoge(int) {} 75 } 76 77 78 static assert(ThreadEventMethods!TestActor3.length == 2); 79 } 80 81 82 /** 83 型Tがアクターかどうかチェックします. 84 */ 85 enum bool isActor(T) = (ThreadEventMethods!T.length > 0) && 86 is(typeof((T t){ 87 if(t.isEnd) {} 88 })); 89 90 91 /** 92 型Tが,onUpdateを持つアクターかどうかチェックします. 93 */ 94 enum bool isIncessantActor(T) = isActor!T && is(typeof((T t){ 95 Duration dur = t.maxInterval; 96 t.onUpdate(); 97 })); 98 99 100 /** 101 型Tが,onResurrectionを持つアクターかどうかチェックします. 102 */ 103 enum bool isPhoenixActor(T) = isActor!T && is(typeof((T t){ 104 Exception ex; 105 t.onResurrection(ex); 106 107 Error err; 108 t.onResurrection(err); 109 })); 110 111 112 private 113 struct ActorEventMedia(string identifier, Params...) 114 { 115 Params values; 116 } 117 118 119 /** 120 runActorおよびrunPhoenixActorの返り値です. 121 */ 122 struct ActorConnection(A) 123 if(isActor!A) 124 { 125 mixin(generateMethods); 126 127 bool isDestroyed() const @property { return atomicLoad(*_isDestroyed); } 128 129 private: 130 Tid _tid; 131 shared(bool)* _isDestroyed; 132 133 static: 134 string generateMethods() 135 { 136 import std.array; 137 import std.format; 138 139 auto app = appender!string; 140 141 foreach(m; ThreadEventMethods!A) 142 app.formattedWrite(q{ 143 void %1$s(Parameters!(A.%1$s) params) 144 { 145 ActorEventMedia!("%1$s", Parameters!(A.%1$s)) media; 146 media.values = params; 147 _tid.send(media); 148 } 149 }, m.identifier); 150 151 return app.data; 152 } 153 } 154 155 156 /** 157 アクターAを別スレッドで起動し,ActorConnectionを返します. 158 */ 159 ActorConnection!A runActor(A, Params...)(Params params) 160 { 161 shared(bool)* destroyedFlag = new shared bool; 162 return ActorConnection!A(spawn(&(runActorImpl!(A, Params)), destroyedFlag, params), destroyedFlag); 163 } 164 165 166 /// 167 unittest 168 { 169 import core.atomic; 170 // write a test of runActor 171 172 static 173 final synchronized class SharedCounter 174 { 175 this() {} 176 177 private int count; 178 179 void inc() { atomicOp!"+="(count, 1); } 180 int value() { return count; } 181 } 182 183 184 static struct TestActor 185 { 186 this(shared(SharedCounter) counter) { this.counter = counter; } 187 188 @ThreadEvent void inc() { counter.inc(); } 189 bool isEnd() { return counter.value > 2; } 190 191 shared(SharedCounter) counter; 192 } 193 194 auto scnt = new shared SharedCounter(); 195 auto con = runActor!TestActor(scnt); 196 con.inc(); 197 Thread.sleep(dur!"msecs"(100)); 198 assert(scnt.value == 1); 199 200 con.inc(); 201 Thread.sleep(dur!"msecs"(100)); 202 assert(scnt.value == 2); 203 204 con.inc(); 205 Thread.sleep(dur!"msecs"(100)); 206 assert(scnt.value == 3); 207 208 con.inc(); 209 Thread.sleep(dur!"msecs"(100)); 210 assert(scnt.value == 3); 211 } 212 213 214 // 215 private 216 void runActorImpl(A, Params...)(shared(bool)* destroyedFlag, Params params) 217 if(isActor!A) 218 { 219 scope(exit) 220 atomicStore(*destroyedFlag, true); 221 222 static if(is(A == class)) 223 A obj = new A(params); 224 else static if(is(A == struct)) 225 A obj = A(params); 226 else static assert(0); 227 228 static if(isIncessantActor!A) 229 { 230 immutable Duration timeoutDur = obj.maxInterval; 231 232 while(!obj.isEnd) 233 { 234 mixin(`receiveTimeout(timeoutDur, ` ~ generateActorHandles!A() ~ `);`); 235 t.onUpdate(); 236 } 237 } 238 else 239 { 240 while(!obj.isEnd) 241 { 242 mixin(`receive(` ~ generateActorHandles!A() ~ `);`); 243 } 244 } 245 246 247 static if(is(typeof((){ obj.onDestroy(); }))) 248 obj.onDestroy(); 249 } 250 251 252 /** 253 runActorと同様に,アクターAを別スレッドで起動しますが,Aで例外が飛んだ場合,ただちに復帰します. 254 */ 255 ActorConnection!A runPhoenixActor(A, Params...)(Params params) 256 { 257 shared(bool)* destroyedFlag = new shared bool; 258 return ActorConnection!A(spawn(&(runPhoenixActorImpl!(A, Params)), destroyedFlag, params), destroyedFlag); 259 } 260 261 262 /// 263 unittest 264 { 265 import core.atomic; 266 // write a test of runActor 267 268 static 269 final synchronized class SharedCounter 270 { 271 this() {} 272 273 private int count; 274 275 void inc() { atomicOp!"+="(count, 1); } 276 int value() { return count; } 277 } 278 279 280 static struct TestActor 281 { 282 import std.exception; 283 284 this(shared(SharedCounter) c1, shared(SharedCounter) c2) { this.c1 = c1; this.c2 = c2; } 285 286 @ThreadEvent void inc() { c1.inc(); enforce(false); } 287 bool isEnd() { return c1.value > 2; } 288 void onResurrection(Throwable) { c2.inc(); } 289 290 shared(SharedCounter) c1, c2; 291 } 292 293 auto scnt1 = new shared SharedCounter(), 294 scnt2 = new shared SharedCounter(); 295 auto con = runPhoenixActor!TestActor(scnt1, scnt2); 296 297 con.inc(); 298 Thread.sleep(dur!"msecs"(100)); 299 assert(scnt1.value == 1); 300 assert(scnt2.value == 1); 301 302 con.inc(); 303 Thread.sleep(dur!"msecs"(100)); 304 assert(scnt1.value == 2); 305 assert(scnt2.value == 2); 306 307 con.inc(); 308 Thread.sleep(dur!"msecs"(100)); 309 assert(scnt1.value == 3); 310 assert(scnt2.value == 3); 311 312 con.inc(); 313 Thread.sleep(dur!"msecs"(100)); 314 assert(scnt1.value == 3); 315 assert(scnt2.value > 0); 316 } 317 318 319 320 // 321 private 322 void runPhoenixActorImpl(A, Params...)(shared(bool)* destroyedFlag, Params params) 323 if(isPhoenixActor!A) 324 { 325 scope(exit) 326 atomicStore(*destroyedFlag, true); 327 328 static if(is(A == class)) 329 A obj = new A(params); 330 else static if(is(A == struct)) 331 A obj = A(params); 332 else static assert(0); 333 334 static if(isIncessantActor!A) 335 { 336 immutable Duration timeoutDur = obj.maxInterval; 337 338 while(1) 339 { 340 try{ 341 if(obj.isEnd) break; 342 343 mixin(`receiveTimeout(timeoutDur, ` ~ generateActorHandles!A() ~ `);`); 344 obj.onUpdate(); 345 } 346 catch(Error err) obj.onResurrection(err); 347 catch(Exception err) obj.onResurrection(err); 348 } 349 } 350 else 351 { 352 while(1) 353 { 354 try{ 355 if(obj.isEnd) break; 356 357 mixin(`receive(` ~ generateActorHandles!A() ~ `);`); 358 } 359 catch(Error err) obj.onResurrection(err); 360 catch(Exception err) obj.onResurrection(err); 361 } 362 } 363 364 static if(is(typeof((){ obj.onDestroy(); }))) 365 obj.onDestroy(); 366 } 367 368 369 private 370 string generateActorHandles(A)() 371 { 372 import std.array; 373 import std.format; 374 375 auto app = appender!string; 376 377 foreach(m; ThreadEventMethods!A) 378 app.formattedWrite(q{(ActorEventMedia!("%1$s", Parameters!(A.%1$s)) params) { obj.%1$s(params.values); },}, m.identifier); 379 380 return app.data; 381 }