sl@0: // Copyright (c) 2007-2009 Nokia Corporation and/or its subsidiary(-ies). sl@0: // All rights reserved. sl@0: // This component and the accompanying materials are made available sl@0: // under the terms of the License "Eclipse Public License v1.0" sl@0: // which accompanies this distribution, and is available sl@0: // at the URL "http://www.eclipse.org/legal/epl-v10.html". sl@0: // sl@0: // Initial Contributors: sl@0: // Nokia Corporation - initial contribution. sl@0: // sl@0: // Contributors: sl@0: // sl@0: // Description: sl@0: // e32test\nkernsa\fastbuf.cpp sl@0: // sl@0: // sl@0: sl@0: #include sl@0: sl@0: template sl@0: class WaitFreePipe sl@0: { sl@0: public: sl@0: static WaitFreePipe* New(TInt aSize); sl@0: ~WaitFreePipe(); sl@0: void InitReader(); sl@0: void InitWriter(); sl@0: void Read(T& aOut); sl@0: TInt Write(const T& aIn); sl@0: inline TUint32 Waits() {return iWaits;} sl@0: inline void ResetWaits() {iWaits = 0;} sl@0: private: sl@0: WaitFreePipe(); sl@0: private: sl@0: T* volatile iWrite; sl@0: T* volatile iRead; sl@0: T* iBase; sl@0: T* iEnd; sl@0: NRequestStatus* volatile iStat; sl@0: NThread* iReader; sl@0: volatile TUint32 iWaits; sl@0: }; sl@0: sl@0: template sl@0: WaitFreePipe::WaitFreePipe() sl@0: : iStat(0), sl@0: iReader(0), sl@0: iWaits(0) sl@0: { sl@0: } sl@0: sl@0: template sl@0: WaitFreePipe::~WaitFreePipe() sl@0: { sl@0: free(iBase); sl@0: } sl@0: sl@0: template sl@0: WaitFreePipe* WaitFreePipe::New(TInt aSize) sl@0: { sl@0: WaitFreePipe* p = new WaitFreePipe; sl@0: if (!p) sl@0: return 0; sl@0: p->iBase = (T*)malloc(aSize * sizeof(T)); sl@0: if (!p->iBase) sl@0: { sl@0: delete p; sl@0: return 0; sl@0: } sl@0: p->iEnd = p->iBase + aSize; sl@0: p->iWrite = p->iBase; sl@0: p->iRead = p->iBase; sl@0: return p; sl@0: } sl@0: sl@0: template sl@0: void WaitFreePipe::InitWriter() sl@0: { sl@0: } sl@0: sl@0: template sl@0: void WaitFreePipe::InitReader() sl@0: { sl@0: iReader = NKern::CurrentThread(); sl@0: } sl@0: sl@0: template sl@0: void WaitFreePipe::Read(T& aOut) sl@0: { sl@0: while (iRead == iWrite) sl@0: { sl@0: NRequestStatus s; sl@0: s = KRequestPending; sl@0: // make sure set to KRequestPending is seen before iStat write sl@0: __e32_atomic_store_ord_ptr(&iStat, &s); sl@0: // make sure writer sees our request status before we check for buffer empty again sl@0: if (iRead != iWrite) sl@0: RequestComplete(iReader, (NRequestStatus*&)iStat, 0); sl@0: WaitForRequest(s); sl@0: ++iWaits; sl@0: } sl@0: aOut = *iRead; sl@0: T* new_read = iRead + 1; sl@0: if (new_read == iEnd) sl@0: new_read = iBase; sl@0: // make sure read of data value is observed before update of read pointer sl@0: __e32_atomic_store_rel_ptr(&iRead, new_read); sl@0: } sl@0: sl@0: template sl@0: TInt WaitFreePipe::Write(const T& aIn) sl@0: { sl@0: T* new_write = iWrite + 1; sl@0: if (new_write == iEnd) sl@0: new_write = iBase; sl@0: if (new_write == iRead) sl@0: return KErrOverflow; // buffer full sl@0: *iWrite = aIn; sl@0: // make sure data is seen before updated write pointer sl@0: __e32_atomic_store_ord_ptr(&iWrite, new_write); sl@0: if (iStat) sl@0: RequestComplete(iReader, (NRequestStatus*&)iStat, 0); sl@0: return KErrNone; sl@0: } sl@0: sl@0: sl@0: struct SPipeTest sl@0: { sl@0: WaitFreePipe* iPipe; sl@0: TUint64 iTotalWrites; sl@0: TUint64 iTotalReads; sl@0: volatile TUint32 iWrites; sl@0: volatile TUint32 iReads; sl@0: TUint32 iMeasure; sl@0: volatile TUint32 iReadTime; sl@0: volatile TUint32 iWriteTime; sl@0: volatile TBool iStop; sl@0: }; sl@0: sl@0: void PipeWriterThread(TAny* aPtr) sl@0: { sl@0: SPipeTest& a = *(SPipeTest*)aPtr; sl@0: a.iPipe->InitWriter(); sl@0: TUint32 seed[2] = {1,0}; sl@0: TUint32 seqs[2] = {3,0}; sl@0: TInt r; sl@0: while (!a.iStop) sl@0: { sl@0: TUint32 x = random(seqs); sl@0: do { sl@0: r = a.iPipe->Write(x); sl@0: if (r != KErrNone) sl@0: fcfspin(2*a.iWriteTime); sl@0: } while (r != KErrNone); sl@0: ++a.iTotalWrites; sl@0: ++a.iWrites; sl@0: while (a.iWrites>=a.iMeasure) sl@0: {} sl@0: TUint32 time = random(seed) % a.iWriteTime; sl@0: fcfspin(time); sl@0: } sl@0: } sl@0: sl@0: void PipeReaderThread(TAny* aPtr) sl@0: { sl@0: SPipeTest& a = *(SPipeTest*)aPtr; sl@0: TUint32 seed[2] = {2,0}; sl@0: TUint32 seqs[2] = {3,0}; sl@0: a.iPipe->InitReader(); sl@0: a.iPipe->ResetWaits(); sl@0: while (!a.iStop) sl@0: { sl@0: TUint32 x = random(seqs); sl@0: TUint32 y; sl@0: a.iPipe->Read(y); sl@0: TEST_RESULT(x==y, "Wrong value"); sl@0: ++a.iTotalReads; sl@0: ++a.iReads; sl@0: if (a.iReads < a.iMeasure) sl@0: { sl@0: TUint32 time = random(seed) % a.iReadTime; sl@0: fcfspin(time); sl@0: continue; sl@0: } sl@0: TUint32 w = a.iPipe->Waits(); sl@0: TUint32 wr = (w<<4)/a.iMeasure; sl@0: TEST_PRINT3("%d waits out of %d (wr=%d)", w, a.iMeasure, wr); sl@0: TUint32 rt = a.iReadTime; sl@0: TUint32 wt = a.iWriteTime; sl@0: switch (wr) sl@0: { sl@0: case 0: sl@0: a.iReadTime = rt>>1; sl@0: a.iWriteTime = wt<<1; sl@0: break; sl@0: case 1: sl@0: case 2: sl@0: case 3: sl@0: a.iReadTime = rt - (rt>>2); sl@0: a.iWriteTime = wt + (wt>>2); sl@0: break; sl@0: case 4: sl@0: case 5: sl@0: case 6: sl@0: a.iReadTime = rt - (rt>>3); sl@0: a.iWriteTime = wt + (wt>>3); sl@0: break; sl@0: case 7: sl@0: case 8: sl@0: // ok sl@0: break; sl@0: case 9: sl@0: case 10: sl@0: case 11: sl@0: a.iReadTime = rt - (rt>>3); sl@0: a.iWriteTime = wt + (wt>>3); sl@0: break; sl@0: case 12: sl@0: case 13: sl@0: case 14: sl@0: a.iReadTime = rt + (rt>>2); sl@0: a.iWriteTime = wt - (wt>>2); sl@0: break; sl@0: case 15: sl@0: case 16: sl@0: a.iReadTime = rt<<1; sl@0: a.iWriteTime = wt>>1; sl@0: break; sl@0: } sl@0: TEST_PRINT4("RT: %d->%d WT: %d->%d", rt, a.iReadTime, wt, a.iWriteTime); sl@0: a.iPipe->ResetWaits(); sl@0: a.iReads = 0; sl@0: a.iWrites = 0; sl@0: } sl@0: } sl@0: sl@0: void DoPipeTest() sl@0: { sl@0: SPipeTest a; sl@0: memclr(&a, sizeof(a)); sl@0: a.iPipe = WaitFreePipe::New(1024); sl@0: TEST_OOM(a.iPipe); sl@0: a.iMeasure = 131072; sl@0: a.iReadTime = 1024; sl@0: a.iWriteTime = 1024; sl@0: sl@0: NFastSemaphore exitSem(0); sl@0: NThread* reader = CreateThreadSignalOnExit("Reader", &PipeReaderThread, 11, &a, 0, -1, &exitSem, 0); sl@0: TEST_OOM(reader); sl@0: NThread* writer = CreateThreadSignalOnExit("Writer", &PipeWriterThread, 11, &a, 0, -1, &exitSem, 1); sl@0: TEST_OOM(writer); sl@0: sl@0: while (a.iTotalWrites < 0x01000000u) sl@0: NKern::Sleep(1000); sl@0: a.iStop = TRUE; sl@0: sl@0: NKern::FSWait(&exitSem); sl@0: NKern::FSWait(&exitSem); sl@0: } sl@0: sl@0: void TestWaitFreePipe() sl@0: { sl@0: DoPipeTest(); sl@0: } sl@0: sl@0: