1 module carbon.actor;
2 
3 import core.thread;
4 import core.atomic;
5 
6 import std.concurrency;
7 import std.traits;
8 
9 import carbon.traits;
10 
11 
12 /**
13 Actorが有するイベントハンドラを示すためのUDAです.
14 */
15 enum ThreadEvent;
16 
17 
18 struct ThreadEventMethod(Params...)
19 {
20     alias Parameters = Params;
21     string identifier;
22 }
23 
24 
25 /**
26 Tがもつ,すべてのActorイベントハンドラをThreadEventMethod型にして返します.
27 */
28 template ThreadEventMethods(T)
29 {
30     import std.meta : AliasSeq;
31     import std.traits : Parameters;
32 
33     template ThreadEventMethodsImpl(Members...)
34     {
35       static if(Members.length == 0)
36         alias ThreadEventMethodsImpl = AliasSeq!();
37       else
38       {
39         mixin(`alias method = T.` ~ Members[0] ~ ";");
40 
41         static if(hasUDA!(method, ThreadEvent))
42           alias ThreadEventMethodsImpl = AliasSeq!(ThreadEventMethod!(Parameters!method)(Members[0]), ThreadEventMethodsImpl!(Members[1 .. $]));
43         else
44           alias ThreadEventMethodsImpl = ThreadEventMethodsImpl!(Members[1 .. $]);
45       }
46     }
47 
48     alias ThreadEventMethods = ThreadEventMethodsImpl!(__traits(allMembers, T));
49 }
50 
51 ///
52 unittest
53 {
54     static struct TestActor1 { @ThreadEvent void foo(string) {} }
55 
56     static assert(ThreadEventMethods!TestActor1.length == 1);
57     static assert(ThreadEventMethods!TestActor1[0].identifier == "foo");
58     static assert(is(ThreadEventMethods!TestActor1[0].Parameters[0] == string));
59 
60 
61     static struct TestActor2 { void foo(string) {} }
62 
63     static assert(ThreadEventMethods!TestActor2.length == 0);
64 
65 
66     static struct TestActor3
67     {
68         @ThreadEvent
69         void foo() {}
70 
71         @ThreadEvent
72         void bar(string) {}
73 
74         void hoge(int) {}
75     }
76 
77 
78     static assert(ThreadEventMethods!TestActor3.length == 2);
79 }
80 
81 
82 /**
83 型Tがアクターかどうかチェックします.
84 */
85 enum bool isActor(T) = (ThreadEventMethods!T.length > 0) && 
86 is(typeof((T t){
87     if(t.isEnd) {}
88 }));
89 
90 
91 /**
92 型Tが,onUpdateを持つアクターかどうかチェックします.
93 */
94 enum bool isIncessantActor(T) = isActor!T && is(typeof((T t){
95     Duration dur = t.maxInterval;
96     t.onUpdate();
97 }));
98 
99 
100 /**
101 型Tが,onResurrectionを持つアクターかどうかチェックします.
102 */
103 enum bool isPhoenixActor(T) = isActor!T && is(typeof((T t){
104     Exception ex;
105     t.onResurrection(ex);
106 
107     Error err;
108     t.onResurrection(err);
109 }));
110 
111 
112 private
113 struct ActorEventMedia(string identifier, Params...)
114 {
115     Params values;
116 }
117 
118 
119 /**
120 runActorおよびrunPhoenixActorの返り値です.
121 */
122 struct ActorConnection(A)
123 if(isActor!A)
124 {
125     mixin(generateMethods);
126 
127     bool isDestroyed() const @property { return atomicLoad(*_isDestroyed); }
128 
129   private:
130     Tid _tid;
131     shared(bool)* _isDestroyed;
132 
133   static:
134     string generateMethods()
135     {
136         import std.array;
137         import std.format;
138 
139         auto app = appender!string;
140 
141         foreach(m; ThreadEventMethods!A)
142             app.formattedWrite(q{
143                 void %1$s(Parameters!(A.%1$s) params)
144                 {
145                     ActorEventMedia!("%1$s", Parameters!(A.%1$s)) media;
146                     media.values = params;
147                     _tid.send(media);
148                 }
149             }, m.identifier);
150 
151         return app.data;
152     }
153 }
154 
155 
156 /**
157 アクターAを別スレッドで起動し,ActorConnectionを返します.
158 */
159 ActorConnection!A runActor(A, Params...)(Params params)
160 {
161     shared(bool)* destroyedFlag = new shared bool;
162     return ActorConnection!A(spawn(&(runActorImpl!(A, Params)), destroyedFlag, params), destroyedFlag);
163 }
164 
165 
166 ///
167 unittest
168 {
169     import core.atomic;
170     // write a test of runActor
171 
172     static
173     final synchronized class SharedCounter
174     {
175         this() {}
176 
177         private int count;
178 
179         void inc() { atomicOp!"+="(count, 1); }
180         int value() { return count; }
181     }
182 
183 
184     static struct TestActor
185     {
186         this(shared(SharedCounter) counter) { this.counter = counter; }
187 
188         @ThreadEvent void inc() { counter.inc(); }
189         bool isEnd() { return counter.value > 2; }
190 
191         shared(SharedCounter) counter;
192     }
193 
194     auto scnt = new shared SharedCounter();
195     auto con = runActor!TestActor(scnt);
196     con.inc();
197     Thread.sleep(dur!"msecs"(100));
198     assert(scnt.value == 1);
199 
200     con.inc();
201     Thread.sleep(dur!"msecs"(100));
202     assert(scnt.value == 2);
203 
204     con.inc();
205     Thread.sleep(dur!"msecs"(100));
206     assert(scnt.value == 3);
207 
208     con.inc();
209     Thread.sleep(dur!"msecs"(100));
210     assert(scnt.value == 3);
211 }
212 
213 
214 //
215 private
216 void runActorImpl(A, Params...)(shared(bool)* destroyedFlag, Params params)
217 if(isActor!A)
218 {
219     scope(exit)
220         atomicStore(*destroyedFlag, true);
221 
222   static if(is(A == class))
223     A obj = new A(params);
224   else static if(is(A == struct))
225     A obj = A(params);
226   else static assert(0);
227 
228   static if(isIncessantActor!A)
229   {
230     immutable Duration timeoutDur = obj.maxInterval;
231 
232     while(!obj.isEnd)
233     {
234         mixin(`receiveTimeout(timeoutDur, ` ~ generateActorHandles!A() ~ `);`);
235         t.onUpdate();
236     }
237   }
238   else
239   {
240     while(!obj.isEnd)
241     {
242         mixin(`receive(` ~ generateActorHandles!A() ~ `);`);
243     }
244   }
245 
246 
247   static if(is(typeof((){ obj.onDestroy(); })))
248     obj.onDestroy();
249 }
250 
251 
252 /**
253 runActorと同様に,アクターAを別スレッドで起動しますが,Aで例外が飛んだ場合,ただちに復帰します.
254 */
255 ActorConnection!A runPhoenixActor(A, Params...)(Params params)
256 {
257     shared(bool)* destroyedFlag = new shared bool;
258     return ActorConnection!A(spawn(&(runPhoenixActorImpl!(A, Params)), destroyedFlag, params), destroyedFlag);
259 }
260 
261 
262 ///
263 unittest
264 {
265     import core.atomic;
266     // write a test of runActor
267 
268     static
269     final synchronized class SharedCounter
270     {
271         this() {}
272 
273         private int count;
274 
275         void inc() { atomicOp!"+="(count, 1); }
276         int value() { return count; }
277     }
278 
279 
280     static struct TestActor
281     {
282         import std.exception;
283 
284         this(shared(SharedCounter) c1, shared(SharedCounter) c2) { this.c1 = c1; this.c2 = c2; }
285 
286         @ThreadEvent void inc() { c1.inc(); enforce(false); }
287         bool isEnd() { return c1.value > 2; }
288         void onResurrection(Throwable) { c2.inc(); }
289 
290         shared(SharedCounter) c1, c2;
291     }
292 
293     auto scnt1 = new shared SharedCounter(),
294          scnt2 = new shared SharedCounter();
295     auto con = runPhoenixActor!TestActor(scnt1, scnt2);
296 
297     con.inc();
298     Thread.sleep(dur!"msecs"(100));
299     assert(scnt1.value == 1);
300     assert(scnt2.value == 1);
301 
302     con.inc();
303     Thread.sleep(dur!"msecs"(100));
304     assert(scnt1.value == 2);
305     assert(scnt2.value == 2);
306 
307     con.inc();
308     Thread.sleep(dur!"msecs"(100));
309     assert(scnt1.value == 3);
310     assert(scnt2.value == 3);
311 
312     con.inc();
313     Thread.sleep(dur!"msecs"(100));
314     assert(scnt1.value == 3);
315     assert(scnt2.value > 0);
316 }
317 
318 
319 
320 //
321 private
322 void runPhoenixActorImpl(A, Params...)(shared(bool)* destroyedFlag, Params params)
323 if(isPhoenixActor!A)
324 {
325     scope(exit)
326         atomicStore(*destroyedFlag, true);
327 
328   static if(is(A == class))
329     A obj = new A(params);
330   else static if(is(A == struct))
331     A obj = A(params);
332   else static assert(0);
333 
334   static if(isIncessantActor!A)
335   {
336     immutable Duration timeoutDur = obj.maxInterval;
337 
338     while(1)
339     {
340         try{
341             if(obj.isEnd) break;
342 
343             mixin(`receiveTimeout(timeoutDur, ` ~ generateActorHandles!A() ~ `);`);
344             obj.onUpdate();
345         }
346         catch(Error err) obj.onResurrection(err);
347         catch(Exception err) obj.onResurrection(err);
348     }
349   }
350   else
351   {
352     while(1)
353     {
354         try{
355             if(obj.isEnd) break;
356 
357             mixin(`receive(` ~ generateActorHandles!A() ~ `);`);
358         }
359         catch(Error err) obj.onResurrection(err);
360         catch(Exception err) obj.onResurrection(err);
361     }
362   }
363 
364   static if(is(typeof((){ obj.onDestroy(); })))
365     obj.onDestroy();
366 }
367 
368 
369 private
370 string generateActorHandles(A)()
371 {
372     import std.array;
373     import std.format;
374 
375     auto app = appender!string;
376 
377     foreach(m; ThreadEventMethods!A)
378         app.formattedWrite(q{(ActorEventMedia!("%1$s", Parameters!(A.%1$s)) params) { obj.%1$s(params.values); },}, m.identifier);
379 
380     return app.data;
381 }