1 // Written in the D programming language. 2 /* 3 NYSL Version 0.9982 4 5 A. This software is "Everyone'sWare". It means: 6 Anybody who has this software can use it as if he/she is 7 the author. 8 9 A-1. Freeware. No fee is required. 10 A-2. You can freely redistribute this software. 11 A-3. You can freely modify this software. And the source 12 may be used in any software with no limitation. 13 A-4. When you release a modified version to public, you 14 must publish it with your name. 15 16 B. The author is not responsible for any kind of damages or loss 17 while using or misusing this software, which is distributed 18 "AS IS". No warranty of any kind is expressed or implied. 19 You use AT YOUR OWN RISK. 20 21 C. Copyrighted to Kazuki KOMATSU 22 23 D. Above three clauses are applied both to source and binary 24 form of this software. 25 */ 26 27 /** 28 このモジュールは、信号処理のための試験的なモジュールです。 29 このモジュールで定義されているストリームは、Rangeに拡張を加える事で大量のデータを処理することに最適です。 30 また、状況によってはSIMD最適化を狙えるよう設計になっています。 31 */ 32 module carbon.stream; 33 34 import carbon.math, 35 carbon.functional, 36 carbon.nonametype; 37 38 import std.algorithm, 39 std.container, 40 std.file, 41 std.functional, 42 std.math, 43 std.range, 44 std.stdio, 45 std.traits; 46 47 48 /** 49 入力ストリームは、`read`メソッドに与えられたバッファに要素を格納可能な型です。 50 実行時には、次のような特徴を満たさなければいけません。 51 52 + `stream.empty`は、そのストリームからノンブロッキングで一要素でも取り出せる場合には`false`となる。 53 + `stream.front`は、そのストリームからノンブロッキングで一要素取り出す。 54 + `stream.popFront`は、そのストリームをノンブロッキングでひとつ進める。 55 + `stream.length`が有効である場合、この値はノンブロッキングで読み出すことができる要素数となる。 56 + `stream.read(buf)`は、ノンブロッキングで処理しなければいけない。 57 + `stream.read(buf)`は、bufのうち先頭から読み込みに成功した要素数だけのスライスを返す。 58 - `stream.length >= buf.length`のとき、常に、`buf.length == stream.read(buf).length`となる。 59 //+ `stream.closed`は、いくら待ってもそれ以上要素が取り出せない状況で`true`となる。 60 // - `stream.closed`が真の場合、`stream.empty`であり、`lenght`が有効であれば`stream.length == 0`を満たす。 61 + `stream.fetch()`は、`stream.empty`が偽になるまで、 62 もしくはそのストリームからは今後一切データを取り出せないと判断した時点で処理を返す。 63 `stream.fetch()`は、待機後の`stream.empty()`を返す。 64 + `stream`が無限レンジの場合、`stream.read(buf)`は常に`buf`の全要素に読み込む。 65 */ 66 enum bool isInputStream(T) = isInputRange!T && is(typeof((T t){ 67 alias E = Unqual!(ElementType!T); 68 while(!t.empty || !t.fetch()) E[] var = t.read(new E[1024]); 69 })); 70 71 72 /** 73 入力ストリームから、ストリーム終端になるまで可能な限り読み出します。 74 */ 75 E[] fillBuffer(S, E)(ref S s, E[] buf) 76 if(isInputStream!S) 77 { 78 E[] rem = buf; 79 while(rem.length && (!s.empty || !s.fetch())) 80 rem = s.read(rem); 81 82 return buf[0 .. $ - rem.length]; 83 } 84 85 86 /** 87 演算可能なストリームとは、`readOp`メソッドに与えられたバッファ上に、計算した要素を格納可能な入力ストリームです。 88 これは、次のような実行時条件を満たしている必要があります。 89 90 + `stream.readOp!"op"(buf)`は、`a op= b`という演算をbuf上で行う。 91 つまり、 92 - op : "+"のとき、streamから読み出された値を、bufに加える演算を表す。 93 - op : "-"のとき、streamから読み出された値を、bufから引く演算を表す。 94 + op : ""のときは、`stream.read(buf)`と等価です。 95 + `stream.readOp!func(buf)`は、`func(a, b)`という演算結果をbufに格納する 96 - `stream.readOp!((a, b) => b)(buf)`は、`stream.read(buf)`と等価です。 97 - `stream.readOp!((a, b) => a + b)(buf)`は、`stream.readOp!"+"(buf)`と等価です。 98 */ 99 enum bool isInplaceComputableStream(T, alias op = "", U = Unqual!(ElementType!T)) = isInputStream!T && 100 is(typeof((T t, U[] buf){ 101 buf = t.readOp!op(buf); // as read(buf) 102 })); 103 104 105 /** 106 出力ストリームは、`t.write`メソッドに与えられたバッファを出力可能な型です。 107 実行時には次のような条件を満たしている必要があります。 108 109 + `stream.write(buf)`は、ノンブロッキングでバッファを出力し、まだ出力できていないバッファを返す。 110 + `stream.fill`が偽であれば、一要素でも`stream.write`でノンブロッキング出力可能である 111 + `stream.flush()`は、ブロッキング処理で`t.fill`が偽になるか、 112 もしくはそれ以上出力不可能だと判断した時点で、`t.fill`を返す。 113 */ 114 enum bool isOutputStream(T, E) = 115 is(typeof((T t, E e){ 116 E[] writeData = new E[1024]; 117 while(writeData.length && (!t.fill || !t.flush())) writeData = t.write(writeData); 118 })); 119 120 121 /** 122 */ 123 E[] fillBufferOp(alias op, S, E)(ref S s, E[] buffer) 124 { 125 auto rem = buffer; 126 while(rem.length && (!s.empty || !s.fetch())) 127 rem = s.readOp!op(rem); 128 return buffer[0 .. $ - rem.length]; 129 } 130 131 132 /** 133 内部にバッファを持つような入力ストリームです。 134 135 + `stream.availables`は内部のバッファを返す。 136 + `stream.consume(size)`は、内部バッファの先頭から`size`要素だけを捨てる。 137 */ 138 enum bool isBufferedInputStream(T) = isInputStream!T && 139 is(typeof((T t){ 140 alias E = ElementType!T; 141 const(E)[] buf = t.availables; 142 t.consume(buf.length); 143 })); 144 145 146 /** 147 内部にバッファを持つような出力ストリームです。 148 149 + `stream.buffer`は内部のバッファを返す。 150 + `stream.flush(size)`は、内部バッファの先頭から`size`要素だけ出力する。 151 */ 152 enum bool isBufferedOutputStream(T) = isOutputStream!T && 153 is(typeof((T t){ 154 auto buf = t.buffer; 155 static assert(is(typeof(buf) : E[], E)); 156 static assert(is(typeof(buf) == Unqual!(typeof(buf)))); 157 t.flush(buf.length); 158 })); 159 160 161 /** 162 InplaceComputableStreamを作成する際に有用な、std.functional.binaryFunの拡張です 163 */ 164 auto ref A binaryFunExt(alias op, A, B)(auto ref A a, auto ref B b) 165 if(is(typeof(mixin(`a` ~ op ~ `=b`)))) 166 { 167 mixin(`a` ~ op ~ "=b;"); 168 return a; 169 } 170 171 172 /// ditto 173 auto ref A binaryFunExt(alias func, A, B)(auto ref A a, auto ref B b) 174 if(is(typeof(a = naryFun!func(a, forward!b)))) 175 { 176 a = naryFun!func(a, forward!b); 177 return a; 178 } 179 180 181 /** 182 レンジをisInplaceComputableStreamに変換します 183 */ 184 auto toStream(R)(R range) 185 if(isInputRange!R && !isInputStream!R) 186 { 187 alias E = Unqual!(ElementType!R); 188 189 static struct InputStreamRange() 190 { 191 R range; 192 alias range this; 193 alias closed = range.empty; 194 bool fetch(){ return range.empty; } 195 196 T[] readOp(alias func, T)(T[] buf) 197 { 198 auto p = buf.ptr; 199 const end = () @trusted { return p + buf.length; }(); 200 while(p != end && !range.empty){ 201 binaryFunExt!func(*p, range.front); 202 range.popFront(); 203 () @trusted { ++p; }(); 204 } 205 206 return () @trusted { return buf[0 .. cast(size_t)(p - buf.ptr)]; }(); 207 } 208 209 T[] read(T)(T[] buf) { return readOp!""(buf); } 210 } 211 212 213 return InputStreamRange!()(range); 214 } 215 216 217 /** 218 正確なComplexNCO 219 */ 220 auto preciseComplexNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc 221 { 222 static struct PreciseComplexNCO() 223 { 224 creal front() const @property { return this[0]; } 225 void popFront() { _theta += _dt2PI * _freq; } 226 enum bool empty = false; 227 PreciseComplexNCO!() save() const @property { return this; } 228 creal opIndex(size_t i) const @property { return std.math.expi(_theta + i * _dt2PI * _freq); } 229 struct OpDollar{} enum OpDollar opDollar = OpDollar(); 230 PreciseComplexNCO!() opSlice() const { return this; } 231 PreciseComplexNCO!() opSlice(size_t i, OpDollar) const { PreciseComplexNCO!() dst = this; dst._theta += i * _dt2PI * _freq; return dst; } 232 auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); } 233 234 bool fetch() { return false; } 235 236 T[] readOp(alias op, T)(T[] buf) 237 { 238 auto p = buf.ptr; 239 const end = () @trusted { return p + buf.length; }(); 240 while(p != end){ 241 binaryFunExt!op(*p, std.math.expi(_theta)); 242 _theta += _dt2PI * _freq; 243 () @trusted { ++p; }(); 244 } 245 246 return buf; 247 } 248 249 250 Cpx[] read(Cpx)(Cpx[] buf){ return readOp!""(buf); } 251 252 @property 253 { 254 real freq() const @property { return _freq; } 255 void freq(real f) @property { _freq = f; } 256 257 real deltaT() const @property { return _dt2PI / 2 / PI; } 258 void deltaT(real dt) @property { _dt2PI = dt * 2 * PI; } 259 } 260 261 private: 262 real _freq; 263 real _dt2PI; 264 real _theta; 265 } 266 267 return PreciseComplexNCO!()(freq, deltaT * 2 * PI, theta); 268 } 269 270 271 /// 272 unittest 273 { 274 auto sig1 = preciseComplexNCO(1000, 0.001, 0); 275 static assert(isInputStream!(typeof(sig1))); 276 static assert(isInplaceComputableStream!(typeof(sig1))); 277 static assert(is(ElementType!(typeof(sig1)) == creal)); 278 279 assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im)) 280 (sig1[0 .. 4], cast(creal[])[1, 1, 1, 1])); 281 282 sig1 = preciseComplexNCO(1, 0.25, 0); 283 assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im)) 284 (sig1[0 .. 4], cast(creal[])[1, 0+1i, -1, -1i])); 285 286 sig1 = preciseComplexNCO(1000, 10.0L^^-6, std.math.E); 287 auto buf = sig1[0 .. 1024].array; 288 assert(sig1.readOp!"-"(buf).length == buf.length); 289 assert(equal!"a == 0"(buf, buf)); 290 291 sig1.readOp!"a+b"(buf); 292 } 293 294 295 /** 296 Lookup-Table方式の高速な局部発振器を提供します。 297 テンプレートパラメータの`func`には周期2PIの周期関数を与えることが出来ます。 298 たとえば、`std.math.expi`を与えれば複素発振器となり、`std.math.sin`であれば正弦波を出力します。 299 300 この局部発振器は周波数の動的な制御が可能なので、信号の周波数をフィードバック制御する際に用いることができます。 301 Lookup-Tableは、テンプレートパラメータ毎にプログラムの初期化時に生成されるので、 302 最初の初期化が終われば、実行コストはテーブルの参照のみになり、高速にアクセス可能です。 303 また、テーブル長を伸ばしても初期化コストが増加するだけで、実行コストはあまり大きくならないことも特徴です。 304 */ 305 template lutNCO(alias func, size_t divN) 306 if(isPowOf2(divN)) 307 { 308 alias E = typeof(func(0.0)); 309 immutable(E[]) table; 310 311 shared static this() 312 { 313 E[divN] t; 314 foreach(i, ref e; t) 315 e = func(i * 2 * PI / divN); 316 317 table = t.dup; 318 } 319 320 321 struct LutNCO() 322 { 323 E front() const @property { return table[cast(ptrdiff_t)(_phase) & (divN - 1)]; } 324 void popFront() { _phase += _freq * _deltaT * divN; _phase %= divN; } 325 enum bool empty = false; 326 LutNCO save() const @property { return this; } 327 E opIndex(size_t i) const { return table[cast(ptrdiff_t)(_phase + i * _freq * _deltaT * divN) & (divN - 1)]; } 328 struct OpDollar{} enum opDollar = OpDollar(); 329 LutNCO opSlice() const { return this; } 330 LutNCO opSlice(size_t i, OpDollar) const { auto dst = this[]; dst._phase += i * _freq * _deltaT; return dst; } 331 auto opSlice(size_t i, size_t j) const { return this[i .. $].take(j - i); } 332 333 real freq() const @property { return _freq; } 334 void freq(real f) @property { _freq = f; } 335 real deltaT() const @property { return _deltaT; } 336 void deltaT(real t) @property { _deltaT = t; } 337 338 bool fetch() { return false; } 339 340 T[] readOp(alias op, T)(T[] buf) 341 if(is(typeof(binaryFunExt!op(buf[0], table[0])))) 342 { 343 immutable dph = _freq * _deltaT * divN; 344 auto p = buf.ptr; 345 const end = () @trusted { return p + buf.length; }(); 346 while(p != end){ 347 binaryFunExt!op(*p, table[cast(ptrdiff_t)(_phase) & (divN - 1)]); 348 _phase += dph; 349 () @trusted { ++p; }(); 350 } 351 352 return buf[0 .. $]; 353 } 354 355 356 T[] read(T)(T[] buf) @safe { return readOp!""(buf); } 357 358 private: 359 real _phase; 360 real _freq; 361 real _deltaT; 362 } 363 364 365 LutNCO!() lutNCO(real freq, real deltaT, real theta = 0) pure nothrow @safe @nogc 366 { 367 return LutNCO!()(theta, freq, deltaT); 368 } 369 } 370 371 372 /// 373 unittest{ 374 auto sig1 = lutNCO!(std.math.expi, 4)(1, 0.25, 0); 375 static assert(isInputStream!(typeof(sig1))); 376 static assert(isInplaceComputableStream!(typeof(sig1))); 377 static assert(is(ElementType!(typeof(sig1)) == creal)); 378 379 assert(equal!((a, b) => approxEqual(a.re, b.re) && approxEqual(a.im, b.im)) 380 (sig1[0 .. 1024], preciseComplexNCO(1, 0.25, 0)[0 .. 1024])); 381 382 sig1 = lutNCO!(std.math.expi, 4)(1000, 10.0L^^-6, std.math.E); 383 auto buf = sig1[0 .. 1024].array; 384 assert(sig1.readOp!"-"(buf).length == buf.length); 385 assert(equal!"a == 0"(buf, buf)); 386 } 387 388 389 /** 390 永遠とその配列をループし続けるストリーム 391 */ 392 auto repeatStream(E)(const E[] array) 393 in{ 394 assert(array.length); 395 } 396 body{ 397 static struct RepeatStream() 398 { 399 const(E) front() const @property { return _arr[_pos]; } 400 enum empty = false; 401 void popFront() { ++_pos; _pos %= _arr.length; } 402 RepeatStream!() save() const @property { return this; } 403 const(E) opIndex(size_t i) const { return _arr[_pos % $]; } 404 RepeatStream!() opSlice(){ return this; } 405 struct OpDollar{} enum opDollar = OpDollar(); 406 RepeatStream!() opSlice(size_t i, OpDollar){ typeof(return) dst; dst._pos = (_pos + i) % _arr.length; return dst; } 407 auto opSlice(size_t i, size_t j){ return this[i .. $].take(j - i); } 408 409 bool fetch() { return false; } 410 411 U[] readOp(alias op, U)(U[] buf) 412 { 413 auto rem = buf; 414 while(rem.length){ 415 auto minL = min(rem.length, _arr.length - _pos); 416 417 static if(is(typeof({ mixin(`rem[]` ~ op ~ `=_arr[];`); }))) 418 mixin(`rem[0 .. minL] `~ op ~ "= _arr[_pos .. _pos + minL];"); 419 else{ 420 auto rem_ptr = rem.ptr, 421 buf_ptr = () @trusted { return _arr.ptr + _pos; }(); 422 const rem_end = () @trusted { return rem_ptr + minL; }(); 423 424 while(rem_ptr != rem_end){ 425 binaryFunExt!op(*rem_ptr, *buf_ptr); 426 () @trusted { ++rem_ptr; ++buf_ptr; }(); 427 } 428 } 429 _pos += minL; 430 _pos %= _arr.length; 431 rem = rem[minL .. $]; 432 } 433 434 return buf; 435 } 436 437 438 U[] read(U)(U[] buf) { return readOp!""(buf); } 439 440 441 const(E)[] availables() @property { return _arr[_pos .. $]; } 442 void consume(size_t n) { 443 _pos += n; 444 _pos %= _arr.length; 445 } 446 447 private: 448 const(E)[] _arr; 449 size_t _pos; 450 } 451 452 453 return RepeatStream!()(array, 0); 454 } 455 456 457 /// ditto 458 auto repeatStream(E)(E element) 459 if(!is(E : T[], T)) 460 { 461 static struct RepeatStream() 462 { 463 enum bool empty = false; 464 auto front() const @property { return _e; } 465 void popFront() {} 466 auto save() @property { return this; } 467 auto save() const @property { return .repeatStream(_e); } 468 auto save() immutable @property { return .repeatStream(_e); } 469 auto opIndex(size_t) const @property { return _e; } 470 RepeatStream!() opSlice() const { return this; } 471 struct OpDollar{} enum opDollar = OpDollar(); 472 RepeatStream!() opSlice(size_t, OpDollar) const { return this; } 473 auto opSlice(size_t i , size_t j) const { return this.take(j - i); } 474 475 bool fetch() { return false; } 476 477 478 U[] readOp(alias op, U)(U[] buf) 479 { 480 static if(is(typeof({ mixin(`buf[] ` ~ op ~ "= _e;"); }))) 481 mixin(`buf[] ` ~ op ~ "= _e;"); 482 else 483 { 484 auto p = buf.ptr, 485 e = () @trusted { return p + buf.length; }(); 486 487 while(p != e){ 488 binaryFunExt!op(*p, _e); 489 () @trusted { ++p; }(); 490 } 491 } 492 493 return buf; 494 } 495 496 497 U[] read(U)(U[] buf) 498 { 499 return readOp!""(buf); 500 } 501 502 private: 503 E _e; 504 } 505 506 507 return RepeatStream!()(element); 508 } 509 510 /// 511 unittest{ 512 int[] arr = [0, 1, 2, 3, 4, 5, 6, 7]; 513 auto rs = arr.repeatStream; 514 static assert(isInfinite!(typeof(rs))); 515 static assert(isInputStream!(typeof(rs))); 516 static assert(isInplaceComputableStream!(typeof(rs))); 517 static assert(isBufferedInputStream!(typeof(rs))); 518 519 520 int[] buf1 = new int[24]; 521 assert(rs.readOp!"+"(buf1) == arr ~ arr ~ arr); 522 523 short[] buf2 = new short[17]; 524 assert(rs.readOp!"cast(short)b"(buf2) == buf1[0 .. 17]); 525 } 526 527 /// 528 unittest{ 529 auto rs = 1.repeatStream; 530 static assert(isInfinite!(typeof(rs))); 531 static assert(isInputStream!(typeof(rs))); 532 static assert(isInplaceComputableStream!(typeof(rs))); 533 534 assert(rs.read(new int[4]) == [1, 1, 1, 1]); 535 } 536 537 538 /** 539 一度に巨大なファイルを読み込むことに特化したバッファ持ち入力ストリームです。 540 */ 541 auto rawFileStream(T)(string filename, size_t bufferSize = 1024 * 1024) 542 if(is(Unqual!T == T)) 543 { 544 static struct RawFileStream() 545 { 546 private void refill() 547 { 548 _pos = 0; 549 if(!_file.isOpen){ 550 _buffer = null; 551 return; 552 } 553 554 _pos = 0; 555 immutable beforeSize = _buffer.length; 556 _buffer = _file.rawRead(_buffer); 557 if(_buffer.length != beforeSize) 558 _file.detach(); 559 } 560 561 562 /** 入力レンジのプリミティブ 563 */ 564 T front() const @property { return _buffer[_pos]; } 565 bool empty() const @property { return _buffer.length == _pos; } /// ditto 566 void popFront() { ++_pos; } /// ditto 567 size_t length() const @property { return _buffer.length - _pos; } /// ditto 568 569 570 /** 571 入力ストリームのプリミティブ 572 */ 573 bool fetch() 574 { 575 if(!this.empty) return false; 576 refill(); 577 return this.empty; 578 } 579 580 581 /// ditto 582 E[] readOp(alias func, E)(E[] buf) 583 if(is(typeof(binaryFunExt!func(buf[0], _buffer[0])))) 584 { 585 immutable minL = min(buf.length, _buffer.length - _pos); 586 static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`)))) 587 mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];"); 588 else 589 { 590 auto rem_ptr = buf.ptr, 591 buf_ptr = () @trusted { return _buffer.ptr + _pos; }(); 592 const rem_end = () @trusted { return rem_ptr + minL; }(); 593 594 while(rem_ptr != rem_end){ 595 binaryFunExt!func(*rem_ptr, *buf_ptr); 596 () @trusted { ++rem_ptr; ++buf_ptr; }(); 597 } 598 } 599 _pos += minL; 600 return buf[0 .. minL]; 601 } 602 603 604 /// ditto 605 E[] read(E)(E[] buf) 606 if(isAssignable!(E, T)) 607 { return readOp!""(buf); } 608 609 610 /** 611 バッファ持ち入力レンジのプリミティブ 612 */ 613 const(T)[] availables() const @property { return _buffer[_pos .. $]; } 614 615 616 /** 617 バッファ持ち入力レンジのプリミティブ 618 */ 619 void consume(size_t n) 620 { 621 while(n && !this.empty){ 622 if(n >= _buffer.length - _pos){ 623 n -= _buffer.length - _pos; 624 if(_file.isOpen) 625 refill(); 626 else{ 627 _buffer = null; 628 _pos = 0; 629 } 630 } 631 else{ 632 _pos += n; 633 n = 0; 634 } 635 } 636 } 637 638 639 private: 640 T[] _buffer; 641 size_t _pos; 642 File _file; 643 } 644 645 646 auto dst = RawFileStream!()(new T[bufferSize], 0,File(filename)); 647 dst.refill(); 648 return dst; 649 } 650 651 /// 652 unittest{ 653 immutable fname = "testData.dat"; 654 std.file.write(fname, cast(ubyte[])[0, 1, 2, 3, 4, 5]); // 6byte 655 scope(exit) 656 std.file.remove(fname); 657 658 auto sig1 = rawFileStream!int(fname); 659 static assert(isInputStream!(typeof(sig1))); 660 static assert(isInplaceComputableStream!(typeof(sig1))); 661 static assert(isBufferedInputStream!(typeof(sig1))); 662 663 assert(sig1.availables.length == 1); 664 sig1.consume(1); 665 assert(sig1.empty && sig1.fetch()); 666 } 667 668 /// 669 unittest{ 670 immutable fname = "testData.dat"; 671 std.file.write(fname, [0, 1, 2, 3, 4, 5]); // 24byte 672 scope(exit) 673 std.file.remove(fname); 674 675 auto sig1 = rawFileStream!int(fname); 676 assert(sig1.availables == [0, 1, 2, 3, 4, 5]); 677 sig1.consume(6); 678 assert(sig1.empty && sig1.fetch()); 679 } 680 681 682 /** 683 二つの信号の積を返します。 684 2つ目の信号は演算による理想信号でなければいけません。 685 */ 686 auto mixer(Sg1, Sg2)(Sg1 sg1, Sg2 sg2) 687 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "*") && isInfinite!Sg2) 688 { 689 static struct Mixer() 690 { 691 auto ref front() const @property { return _sg1.front * _sg2.front; } 692 693 static if(isInfinite!Sg1) 694 enum bool empty = false; 695 else 696 bool empty() const @property { return _sg1.empty; } 697 698 void popFront() { _sg1.popFront(); _sg2.popFront(); } 699 700 bool fetch() 701 { 702 static if(isInfinite!Sg1) 703 return false; 704 else 705 { 706 if(!_sg1.empty) return false; 707 //_sg2.fetch(); 708 return _sg1.fetch(); 709 } 710 } 711 712 E[] read(E)(E[] buf) 713 { 714 return readOp!""(buf); 715 } 716 717 718 E[] readOp(alias op, E)(E[] buf) 719 if((op == "*" || op == "/") 720 && isInplaceComputableStream!(Sg1, op) 721 && isInplaceComputableStream!(Sg2, op)) 722 { 723 return _sg2.readOp!op(_sg1.readOp!op(buf)); 724 } 725 726 727 E[] readOp(alias op : "", E)(E[] buf) 728 { 729 return _sg2.readOp!"*"(_sg1.read(buf)); 730 } 731 732 733 private: 734 Sg1 _sg1; 735 Sg2 _sg2; 736 } 737 738 739 return Mixer!()(sg1, sg2); 740 } 741 742 743 /// 744 unittest 745 { 746 auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream; 747 auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream; 748 auto mx1 = arr1.mixer(arr2); 749 static assert(isInfinite!(typeof(mx1))); 750 static assert(isInputStream!(typeof(mx1))); 751 752 int[] buf1 = new int[16]; 753 assert(mx1.read(buf1) == [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1]); 754 755 auto arr3 = [0, 0, 0, 0, 1, 1, 1, 1].repeatStream; 756 auto mx2 = mx1.mixer(arr3); 757 758 int[] buf2 = new int[16]; 759 assert(mx2.read(buf2) == [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1]); 760 } 761 762 763 /** 764 二つの信号を足します。 765 2つ目の信号は、演算により生成された理想信号でなければいけません。 766 */ 767 auto adder(Sg1, Sg2)(Sg1 sg1, Sg2 sg2) 768 if(isInputStream!Sg1 && isInplaceComputableStream!(Sg2, "+") && isInfinite!Sg2) 769 { 770 static struct Adder() 771 { 772 auto ref front() const @property { return _sg1.front + _sg2.front; } 773 774 static if(isInfinite!Sg1) 775 enum bool empty = false; 776 else 777 bool empty() const @property { return _sg1.empty; } 778 779 void popFront() { _sg1.popFront(); _sg2.popFront(); } 780 781 bool fetch() { 782 static if(isInfinite!Sg1) 783 return false; 784 else 785 { 786 if(!_sg1.empty) return false; 787 //_sg2.fetch(); 788 return _sg1.fetch(); 789 } 790 } 791 792 E[] read(E)(E[] buf) 793 { 794 return readOp!""(buf); 795 } 796 797 798 E[] readOp(alias op, E)(E[] buf) 799 if((op == "+" || op == "-") 800 && isInplaceComputableStream!(Sg1, op) 801 && isInplaceComputableStream!(Sg2, op)) 802 { 803 return _sg2.readOp!op(_sg1.readOp!op(buf)); 804 } 805 806 807 E[] readOp(alias op : "", E)(E[] buf) 808 { 809 return _sg2.readOp!"+"(_sg1.read(buf)); 810 } 811 812 813 private: 814 Sg1 _sg1; 815 Sg2 _sg2; 816 } 817 818 819 return Adder!()(sg1, sg2); 820 } 821 822 823 /// 824 unittest 825 { 826 auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream; 827 auto arr2 = [0, 0, 1, 1, 0, 0, 1, 1].repeatStream; 828 auto mx1 = arr1.adder(arr2); 829 static assert(isInfinite!(typeof(mx1))); 830 static assert(isInputStream!(typeof(mx1))); 831 832 int[] buf1 = new int[16]; 833 assert(mx1.read(buf1) == [0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2]); 834 } 835 836 /// 837 unittest 838 { 839 auto arr1 = [0, 1, 0, 1, 0, 1, 0, 1].repeatStream; 840 auto buf1 = arr1.adder(arr1).adder(arr1).read(new int[16]); 841 auto buf2 = arr1.amplifier(3).read(new int[16]); 842 843 assert(buf1 == buf2); 844 } 845 846 847 /** 848 積分器です。 849 積分するサンプル量が大きい場合の使用に適しています。 850 */ 851 auto accumulator(E)(size_t integN) 852 { 853 static struct Accumulator 854 { 855 E[] write(E[] buf) 856 { 857 while(buf.length){ 858 immutable n = min(_integN - _cnt, buf.length); 859 auto ptr = buf.ptr; 860 const end = buf.ptr + n; 861 auto val = _buffer.back; 862 while(ptr != end){ 863 val += *ptr; 864 ++ptr; 865 } 866 867 _cnt += n; 868 _buffer.back = val; 869 buf = buf[n .. $]; 870 if(_cnt == _integN){ 871 _buffer ~= cast(E)0; 872 _cnt = 0; 873 ++_avaN; 874 } 875 } 876 877 return buf; 878 } 879 880 881 enum bool fill = false; 882 bool flush() { return false; } 883 884 885 auto opSlice(){ return _buffer[].take(_avaN); } 886 887 E front() const @property { return _buffer.front; } 888 void popFront() { _buffer.removeFront(); --_avaN; } 889 bool empty() const @property { return _avaN == 0; } 890 891 892 private: 893 DList!E _buffer; 894 size_t _cnt; 895 size_t _integN; 896 size_t _avaN; 897 } 898 899 900 return Accumulator(DList!E(cast(E)0), 0, integN, 0); 901 } 902 903 /// 904 unittest 905 { 906 auto acc = accumulator!int(4); 907 static assert(isOutputStream!(typeof(acc), int)); 908 909 assert(acc.write([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).empty); 910 assert(equal(acc[], [0+1+2+3, 4+5+6+7])); 911 912 acc.write([10, 11]); 913 assert(equal(acc[], [0+1+2+3, 4+5+6+7, 8+9+10+11])); 914 } 915 916 917 /** 918 積分器です。 919 たとえば、連続する少数の個数のサンプルの和を取るような用途に適しています。 920 */ 921 auto accumulator(size_t N, Sg)(Sg sg, size_t bufSize = 1024 * 1024) 922 { 923 alias E = Unqual!(ElementType!Sg); 924 return accumulator(sg, new E[bufSize]); 925 } 926 927 928 /// ditto 929 auto accumulator(size_t N, Sg, E)(Sg sg, E[] buffer) 930 if(isInputStream!Sg && is(ElementType!Sg : E)) 931 { 932 static struct Accumulator() 933 { 934 auto front() const @property { return availables[0]; } 935 void popFront() { consume(1); } 936 bool empty() const @property { return _pos == _end; } 937 938 bool fetch() 939 { 940 if(!this.empty) return false; 941 auto p = _buffer.ptr + _buffer.length - _remN, 942 f = _buffer.ptr; 943 const e = _buffer.ptr + _buffer.length; 944 while(p != e) { *f = *p; ++f; ++p; } 945 _pos = 0; 946 _end = 0; 947 948 while(this.empty && (!_sg.empty || !_sg.fetch())){ 949 auto buf = _sg.read(_buffer[_remN .. $]); 950 immutable size_t len = (_remN + buf.length) / N; 951 _end += len; 952 953 auto pp = _buffer.ptr, 954 ff = _buffer.ptr; 955 auto ee = _buffer.ptr + _end; 956 957 while(pp != ee){ 958 size_t s = 1; 959 if(pp != _buffer.ptr) { *pp = 0; s = 0; } 960 961 ff += s; 962 foreach(i; s .. N){ 963 *pp += *ff; 964 ++ff; 965 } 966 967 ++pp; 968 } 969 _remN = (_remN + buf.length) - _end * N; 970 } 971 972 return this.empty; 973 } 974 975 976 T[] readOp(alias func, T)(T[] buf) 977 { 978 immutable minL = min(buf.length, _end - _pos); 979 static if(is(typeof(mixin(`buf[]` ~ func ~ `= _buffer[]`)))) 980 mixin(`buf[0 .. minL] ` ~ func ~ "= _buffer[_pos .. _pos + minL];"); 981 else 982 { 983 auto rem_ptr = buf.ptr, 984 buf_ptr = () @trusted { return _buffer.ptr + _pos; }(); 985 const rem_end = () @trusted { return rem_ptr + minL; }(); 986 987 while(rem_ptr != rem_end){ 988 binaryFunExt!func(*rem_ptr, *buf_ptr); 989 () @trusted { ++rem_ptr; ++buf_ptr; }(); 990 } 991 } 992 _pos += minL; 993 return buf[0 .. minL]; 994 } 995 996 997 T[] read(T)(T[] buf) { return readOp!""(buf); } 998 999 1000 const(E)[] availables() const @property { return _buffer[_pos .. _end]; } 1001 void consume(size_t n) 1002 { 1003 while(n && (!this.empty || !this.fetch())){ 1004 if(n <= _end - _pos){ 1005 _pos += n; 1006 n = 0; 1007 }else{ 1008 n -= _end - _pos; 1009 _pos = _end; 1010 } 1011 } 1012 } 1013 1014 1015 private: 1016 Sg _sg; 1017 E[] _buffer; 1018 size_t _pos; 1019 size_t _end; 1020 size_t _remN; 1021 } 1022 1023 1024 return Accumulator!()(sg, buffer, 0, 0, 0); 1025 } 1026 1027 /// 1028 unittest 1029 { 1030 auto arr1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].repeatStream; 1031 auto acc = accumulator!4(arr1, new int[14]); 1032 static assert(isBufferedInputStream!(typeof(acc))); 1033 1034 int[] buf = new int[3]; 1035 assert(!acc.empty || !acc.fetch()); 1036 assert(acc.read(buf) == [0+1+2+3, 4+5+6+7, 8+9+0+1]); 1037 assert(!acc.empty || !acc.fetch()); 1038 assert(acc.read(buf) == [2+3+4+5, 6+7+8+9, 0+1+2+3]); 1039 } 1040 1041 /+ ベンチマーク, rangeの3倍程度の性能 1042 unittest { 1043 static real[] vector; 1044 vector = new real[1024 * 1024]; 1045 foreach(ref e; vector) e = 0; 1046 1047 1048 void funcStream() 1049 { 1050 alias sin128LutNCO = lutNCO!(std.math.sin, 128); 1051 1052 auto mx = mixer(sin128LutNCO(6.5E6L, 1E-6L, 0), 1053 sin128LutNCO(6.6E6L, 1E-6L, 0)); 1054 1055 mx.readOp!"*"(vector); 1056 } 1057 1058 1059 void funcRange() 1060 { 1061 alias sin128LutNCO = lutNCO!(std.math.sin, 128); 1062 1063 auto mx = zip(sin128LutNCO(6.5E6L, 1E-6L, 0), sin128LutNCO(6.6E6L, 1E-6L, 0)) 1064 .map!"a[0]*a[1]"; 1065 1066 auto ptr = vector.ptr, end = ptr + vector.length; 1067 while(ptr != end){ 1068 *ptr *= mx.front; 1069 mx.popFront(); 1070 ++ptr; 1071 } 1072 } 1073 1074 1075 import std.datetime; 1076 import std.container; 1077 auto ts = benchmark!(funcStream, funcRange)(26); 1078 1079 writefln("%(\n%s%)", ts[].map!"a.usecs"); 1080 } 1081 +/ 1082 1083 1084 auto amplifier(Sg, F)(Sg sg, F gain) 1085 { 1086 static struct Amplifier() 1087 { 1088 auto front() const @property { return _sg.front * _gain; } 1089 void popFront() { _sg.popFront(); } 1090 bool empty() const { return _sg.empty; } 1091 bool fetch() { return _sg.fetch(); } 1092 1093 1094 E[] readOp(string op, E)(E[] buf) 1095 if((op == "*" || op == "/") && isInplaceComputableStream!(Sg, op, E)) 1096 { 1097 buf = _sg.readOp!op(buf); 1098 mixin("buf[]" ~ op ~ "= _gain;"); 1099 1100 return buf; 1101 } 1102 1103 1104 E[] read(E)(E[] buf) 1105 { 1106 buf = _sg.read(buf); 1107 buf[] *= _gain; 1108 1109 return buf; 1110 } 1111 1112 1113 F gain() const @property pure nothrow @safe @nogc { return _gain; } 1114 void gain(F g) @property pure nothrow @safe @nogc { _gain = g; } 1115 1116 1117 private: 1118 Sg _sg; 1119 F _gain; 1120 } 1121 1122 1123 return Amplifier!()(sg, gain); 1124 } 1125 1126 /// 1127 unittest 1128 { 1129 auto arr4 = [0, 1, 2, 3].repeatStream, 1130 amped = arr4.amplifier(4); 1131 1132 assert(amped.read(new int[4]) == [0, 4, 8, 12]); 1133 static assert(isInplaceComputableStream!(typeof(arr4))); 1134 static assert(isInplaceComputableStream!(typeof(amped), "*")); 1135 static assert(isInplaceComputableStream!(typeof(amped), "/")); 1136 1137 auto arr = [3, 4, 1, 2]; 1138 assert(amped.readOp!"*"(arr) == [0, 16, 8, 24]); 1139 } 1140 1141 1142 auto selector(Sg...)(Sg sgs) 1143 { 1144 static struct Selector() 1145 { 1146 import std.string : format; 1147 1148 1149 auto front() const @property 1150 { 1151 switch(_selectIndex){ 1152 foreach(i, E; Sg) 1153 mixin(format("case %s: return _sgs[%s].front;", i, i)); 1154 default: assert(0); 1155 } 1156 } 1157 1158 1159 void popFront() 1160 { 1161 switch(_selectIndex){ 1162 foreach(i, E; Sg) 1163 mixin(format("case %s: _sgs[%s].popFront();", i, i)); 1164 default: assert(0); 1165 } 1166 } 1167 1168 1169 bool empty() const @property 1170 { 1171 switch(_selectIndex){ 1172 foreach(i, E; Sg) 1173 mixin(format("case %s: return _sgs[%s].empty;", i, i)); 1174 default: assert(0); 1175 } 1176 } 1177 1178 1179 bool fetch() 1180 { 1181 switch(_selectIndex){ 1182 foreach(i, E; Sg) 1183 mixin(format("case %s: return _sgs[%s].fetch();", i, i)); 1184 default: assert(0); 1185 } 1186 } 1187 1188 1189 E[] read(E)(E[] buf) 1190 { 1191 switch(_selectIndex){ 1192 foreach(i, E; Sg) 1193 mixin(format("case %s: return _sgs[%s].read(buf);", i, i)); 1194 default: assert(0); 1195 } 1196 } 1197 1198 1199 E[] readOp(alias op, E)(E[] buf) 1200 { 1201 switch(_selectIndex){ 1202 foreach(i, E; Sg) 1203 mixin(format("case %s: return _sgs[%s].readOp!op(buf);", i, i)); 1204 default: assert(0); 1205 } 1206 } 1207 1208 1209 void select(size_t i) 1210 in{ 1211 assert(i < Sg.length); 1212 } 1213 body{ 1214 _selectIndex = i; 1215 } 1216 1217 1218 void select(size_t i)() 1219 { 1220 static assert(i < Sg.length); 1221 _selectIndex = i; 1222 } 1223 1224 1225 private: 1226 Sg _sgs; 1227 size_t _selectIndex; 1228 } 1229 1230 1231 return Selector!()(sgs, 0); 1232 } 1233 1234 /// 1235 unittest 1236 { 1237 auto arr1 = 0.repeatStream, 1238 arr2 = [0, 1].repeatStream, 1239 arr3 = [0, 1, 2].repeatStream, 1240 arr4 = [0, 1, 2, 3].repeatStream; 1241 1242 auto slt = selector(arr1, arr2, arr3, arr4); 1243 assert(slt.read(new int[4]) == [0, 0, 0, 0]); 1244 slt.select(1); // or slt.select!1; 1245 assert(slt.read(new int[4]) == [0, 1, 0, 1]); 1246 slt.select(2); // or slt.select!2; 1247 assert(slt.read(new int[4]) == [0, 1, 2, 0]); 1248 slt.select(3); // or slt.select!3; 1249 assert(slt.read(new int[4]) == [0, 1, 2, 3]); 1250 } 1251 1252 1253 /** 1254 FIRフィルタを構成します。 1255 1256 FIRフィルタの一般形は、z変換すれば次のような式で表すことができます。 1257 H[z] = Σ(k[m]*z^(-m)) 1258 1259 この関数には、各タップの係数`k[m]`を指定することで任意のFIRフィルタを構築することができます。 1260 */ 1261 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps) 1262 if(isInputStream!Sg) 1263 { 1264 return firFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E[](taps.length)); 1265 } 1266 1267 1268 /// 1269 auto firFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] tap, E[] buf) 1270 if(isInputStream!Sg && is(E == Unqual!E)) 1271 in{ 1272 assert(tap.length == buf.length); 1273 } 1274 body{ 1275 static struct FIRFiltered() 1276 { 1277 E front() const @property @trusted 1278 { 1279 E a = 0; 1280 1281 { 1282 auto p1 = _tap[$ - 1 .. $].ptr, 1283 p2 = _buf[_idx .. $].ptr, 1284 e2 = _buf[$ .. $].ptr; 1285 1286 while(p2 != e2){ 1287 a = naryFun!reduceFn(a, *p2, *p1); 1288 --p1; 1289 ++p2; 1290 } 1291 } 1292 1293 { 1294 auto p1 = _tap.ptr + _idx-1, 1295 p2 = _buf[0 .. _idx].ptr, 1296 e2 = _buf[_idx .. $].ptr; 1297 1298 while(p2 != e2){ 1299 a = naryFun!reduceFn(a, *p2, *p1); 1300 --p1; 1301 ++p2; 1302 } 1303 } 1304 1305 return a; 1306 } 1307 1308 1309 void popFront() 1310 { 1311 _buf[_idx] = _sg.front; 1312 ++_idx; 1313 _idx %= _buf.length; 1314 _sg.popFront(); 1315 } 1316 1317 static if(isInfinite!Sg) 1318 enum bool empty = false; 1319 else 1320 bool empty() const @property { return _sg.empty; } 1321 1322 1323 bool fetch() { return _sg.fetch(); } 1324 1325 1326 U[] readOp(alias op, U)(U[] buf) 1327 { 1328 auto p = buf.ptr, 1329 e = buf[$ .. $].ptr; 1330 1331 while(p != e && !this.empty){ 1332 binaryFunExt!op(*p, this.front); 1333 this.popFront(); 1334 () @trusted { ++p; }(); 1335 } 1336 1337 return () @trusted { return buf[0 .. p - buf.ptr]; }(); 1338 } 1339 1340 1341 U[] read(U)(U[] buf) 1342 { 1343 return readOp!""(buf); 1344 } 1345 1346 1347 private: 1348 Sg _sg; 1349 const(E)[] _tap; 1350 E[] _buf; 1351 size_t _idx; 1352 } 1353 1354 1355 return FIRFiltered!()(sg, tap, buf, 0); 1356 } 1357 1358 /// 1359 unittest 1360 { 1361 auto arr = [0, 1, 2, 3].repeatStream, 1362 flt1 = arr.firFilter([0, 1]); 1363 1364 assert(flt1.read(new int[8]) == [0, 0, 0*1+1*0, 1, 2, 3, 0, 1]); 1365 1366 auto flt2 = arr.firFilter([1, 2]); 1367 assert(flt2.read(new int[6]) == [0, 0, 0*2+1*1, 1*2+2*1, 2*2+3*1, 3*2+0*1]); 1368 } 1369 1370 1371 /* 1372 IIRフィルタを構成します。 1373 1374 IIRフィルタの一般形は、z変換すれば次のような式で表すことができます。 1375 H(z) = 1/Σ(k[m]*z^(-m)) 1376 1377 この関数には、各タップの係数`k[m]`を指定することで任意のIIRフィルタを構築することができます。 1378 */ 1379 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps) 1380 //if(isInputStream!Sg) 1381 //{ 1382 // return iirFilter!(reduceFn, Sg, Unqual!E)(sg, taps, new Unqual!E()(taps.length)); 1383 //} 1384 1385 ///// 1386 //auto iirFilter(alias reduceFn = "a+b*c", Sg, E)(Sg sg, const E[] taps, E[] buf) 1387 //if(isInputStream!Sg && is(E == Unqual!E)) 1388 //{ 1389 1390 //} 1391 1392 1393 /** 1394 信号の絶対値の最大値を`limit`にするように線形に振幅を小さく、もしくは大きくします。 1395 */ 1396 auto normalizer(Sg, E)(Sg sg, E limit) 1397 if(isFloatingPoint!E) 1398 { 1399 static struct Normalizer() 1400 { 1401 E front() @property 1402 { 1403 auto f = _sg.front, 1404 a = abs(f); 1405 1406 if(f == 0) return 0; 1407 1408 if(a > _max){ 1409 _max = a; 1410 1411 return _lim; 1412 } 1413 1414 return f / _max * _lim; 1415 } 1416 1417 1418 auto empty() const @property { return _sg.empty; } 1419 void popFront() { _sg.popFront(); } 1420 1421 bool fetch() { return _sg.fetch(); } 1422 1423 1424 U[] read(U)(U[] buf) 1425 { 1426 buf = _sg.read(buf); 1427 1428 auto p = buf.ptr, 1429 e = buf[$ .. $].ptr; 1430 1431 while(p != e){ 1432 immutable v = abs(*p); 1433 if(v == 0) 1434 *p = 0; 1435 else{ 1436 if(v > _max) 1437 _max = v; 1438 1439 *p *= _lim / _max; 1440 } 1441 1442 () @trusted { ++p; }(); 1443 } 1444 1445 return buf; 1446 } 1447 1448 1449 U[] readOp(alias op : "", U)(U[] buf) 1450 { 1451 return this.read(buf); 1452 } 1453 1454 1455 E[] readOp(alias op, E)(E[] buf) 1456 { 1457 auto p = buf.ptr, 1458 e = buf[$ .. $].ptr; 1459 1460 while(p != e && !this.empty){ 1461 binaryFunExt!op(*p, this.front); 1462 this.popFront(); 1463 () @trusted { ++p; }(); 1464 } 1465 1466 return buf; 1467 } 1468 1469 1470 private: 1471 Sg _sg; 1472 E _lim; 1473 E _max; 1474 } 1475 1476 return Normalizer!()(sg, limit, 0); 1477 } 1478 1479 /// 1480 unittest 1481 { 1482 auto arr = [0, 1, -2, 3].repeatStream, 1483 nlz = arr.normalizer(1.5); 1484 1485 assert(equal!approxEqual(nlz.read(new float[8]), [0, 1.5, -1.5, 1.5, 0, 0.5, -1.0, 1.5])); 1486 }