sl@0: /* sl@0: ** 2005 December 14 sl@0: ** sl@0: ** The author disclaims copyright to this source code. In place of sl@0: ** a legal notice, here is a blessing: sl@0: ** sl@0: ** May you do good and not evil. sl@0: ** May you find forgiveness for yourself and forgive others. sl@0: ** May you share freely, never taking more than you give. sl@0: ** sl@0: ************************************************************************* sl@0: ** sl@0: ** $Id: test_async.c,v 1.48 2008/09/26 20:02:50 drh Exp $ sl@0: ** sl@0: ** This file contains an example implementation of an asynchronous IO sl@0: ** backend for SQLite. sl@0: ** sl@0: ** WHAT IS ASYNCHRONOUS I/O? sl@0: ** sl@0: ** With asynchronous I/O, write requests are handled by a separate thread sl@0: ** running in the background. This means that the thread that initiates sl@0: ** a database write does not have to wait for (sometimes slow) disk I/O sl@0: ** to occur. The write seems to happen very quickly, though in reality sl@0: ** it is happening at its usual slow pace in the background. sl@0: ** sl@0: ** Asynchronous I/O appears to give better responsiveness, but at a price. sl@0: ** You lose the Durable property. With the default I/O backend of SQLite, sl@0: ** once a write completes, you know that the information you wrote is sl@0: ** safely on disk. With the asynchronous I/O, this is not the case. If sl@0: ** your program crashes or if a power loss occurs after the database sl@0: ** write but before the asynchronous write thread has completed, then the sl@0: ** database change might never make it to disk and the next user of the sl@0: ** database might not see your change. sl@0: ** sl@0: ** You lose Durability with asynchronous I/O, but you still retain the sl@0: ** other parts of ACID: Atomic, Consistent, and Isolated. Many sl@0: ** appliations get along fine without the Durablity. sl@0: ** sl@0: ** HOW IT WORKS sl@0: ** sl@0: ** Asynchronous I/O works by creating a special SQLite "vfs" structure sl@0: ** and registering it with sqlite3_vfs_register(). When files opened via sl@0: ** this vfs are written to (using sqlite3OsWrite()), the data is not sl@0: ** written directly to disk, but is placed in the "write-queue" to be sl@0: ** handled by the background thread. sl@0: ** sl@0: ** When files opened with the asynchronous vfs are read from sl@0: ** (using sqlite3OsRead()), the data is read from the file on sl@0: ** disk and the write-queue, so that from the point of view of sl@0: ** the vfs reader the OsWrite() appears to have already completed. sl@0: ** sl@0: ** The special vfs is registered (and unregistered) by calls to sl@0: ** function asyncEnable() (see below). sl@0: ** sl@0: ** LIMITATIONS sl@0: ** sl@0: ** This demonstration code is deliberately kept simple in order to keep sl@0: ** the main ideas clear and easy to understand. Real applications that sl@0: ** want to do asynchronous I/O might want to add additional capabilities. sl@0: ** For example, in this demonstration if writes are happening at a steady sl@0: ** stream that exceeds the I/O capability of the background writer thread, sl@0: ** the queue of pending write operations will grow without bound until we sl@0: ** run out of memory. Users of this technique may want to keep track of sl@0: ** the quantity of pending writes and stop accepting new write requests sl@0: ** when the buffer gets to be too big. sl@0: ** sl@0: ** LOCKING + CONCURRENCY sl@0: ** sl@0: ** Multiple connections from within a single process that use this sl@0: ** implementation of asynchronous IO may access a single database sl@0: ** file concurrently. From the point of view of the user, if all sl@0: ** connections are from within a single process, there is no difference sl@0: ** between the concurrency offered by "normal" SQLite and SQLite sl@0: ** using the asynchronous backend. sl@0: ** sl@0: ** If connections from within multiple database files may access the sl@0: ** database file, the ENABLE_FILE_LOCKING symbol (see below) must be sl@0: ** defined. If it is not defined, then no locks are established on sl@0: ** the database file. In this case, if multiple processes access sl@0: ** the database file, corruption will quickly result. sl@0: ** sl@0: ** If ENABLE_FILE_LOCKING is defined (the default), then connections sl@0: ** from within multiple processes may access a single database file sl@0: ** without risking corruption. However concurrency is reduced as sl@0: ** follows: sl@0: ** sl@0: ** * When a connection using asynchronous IO begins a database sl@0: ** transaction, the database is locked immediately. However the sl@0: ** lock is not released until after all relevant operations sl@0: ** in the write-queue have been flushed to disk. This means sl@0: ** (for example) that the database may remain locked for some sl@0: ** time after a "COMMIT" or "ROLLBACK" is issued. sl@0: ** sl@0: ** * If an application using asynchronous IO executes transactions sl@0: ** in quick succession, other database users may be effectively sl@0: ** locked out of the database. This is because when a BEGIN sl@0: ** is executed, a database lock is established immediately. But sl@0: ** when the corresponding COMMIT or ROLLBACK occurs, the lock sl@0: ** is not released until the relevant part of the write-queue sl@0: ** has been flushed through. As a result, if a COMMIT is followed sl@0: ** by a BEGIN before the write-queue is flushed through, the database sl@0: ** is never unlocked,preventing other processes from accessing sl@0: ** the database. sl@0: ** sl@0: ** Defining ENABLE_FILE_LOCKING when using an NFS or other remote sl@0: ** file-system may slow things down, as synchronous round-trips to the sl@0: ** server may be required to establish database file locks. sl@0: */ sl@0: #define ENABLE_FILE_LOCKING sl@0: sl@0: #ifndef SQLITE_AMALGAMATION sl@0: # include "sqlite3.h" sl@0: # include sl@0: # include sl@0: #endif sl@0: #include "tcl.h" sl@0: sl@0: /* sl@0: ** This test uses pthreads and hence only works on unix and with sl@0: ** a threadsafe build of SQLite. sl@0: */ sl@0: #if SQLITE_OS_UNIX && SQLITE_THREADSAFE sl@0: sl@0: /* sl@0: ** This demo uses pthreads. If you do not have a pthreads implementation sl@0: ** for your operating system, you will need to recode the threading sl@0: ** logic. sl@0: */ sl@0: #include sl@0: #include sl@0: sl@0: /* Useful macros used in several places */ sl@0: #define MIN(x,y) ((x)<(y)?(x):(y)) sl@0: #define MAX(x,y) ((x)>(y)?(x):(y)) sl@0: sl@0: /* Forward references */ sl@0: typedef struct AsyncWrite AsyncWrite; sl@0: typedef struct AsyncFile AsyncFile; sl@0: typedef struct AsyncFileData AsyncFileData; sl@0: typedef struct AsyncFileLock AsyncFileLock; sl@0: typedef struct AsyncLock AsyncLock; sl@0: sl@0: /* Enable for debugging */ sl@0: static int sqlite3async_trace = 0; sl@0: # define ASYNC_TRACE(X) if( sqlite3async_trace ) asyncTrace X sl@0: static void asyncTrace(const char *zFormat, ...){ sl@0: char *z; sl@0: va_list ap; sl@0: va_start(ap, zFormat); sl@0: z = sqlite3_vmprintf(zFormat, ap); sl@0: va_end(ap); sl@0: fprintf(stderr, "[%d] %s", (int)pthread_self(), z); sl@0: sqlite3_free(z); sl@0: } sl@0: sl@0: /* sl@0: ** THREAD SAFETY NOTES sl@0: ** sl@0: ** Basic rules: sl@0: ** sl@0: ** * Both read and write access to the global write-op queue must be sl@0: ** protected by the async.queueMutex. As are the async.ioError and sl@0: ** async.nFile variables. sl@0: ** sl@0: ** * The async.pLock list and all AsyncLock and AsyncFileLock sl@0: ** structures must be protected by the async.lockMutex mutex. sl@0: ** sl@0: ** * The file handles from the underlying system are not assumed to sl@0: ** be thread safe. sl@0: ** sl@0: ** * See the last two paragraphs under "The Writer Thread" for sl@0: ** an assumption to do with file-handle synchronization by the Os. sl@0: ** sl@0: ** Deadlock prevention: sl@0: ** sl@0: ** There are three mutex used by the system: the "writer" mutex, sl@0: ** the "queue" mutex and the "lock" mutex. Rules are: sl@0: ** sl@0: ** * It is illegal to block on the writer mutex when any other mutex sl@0: ** are held, and sl@0: ** sl@0: ** * It is illegal to block on the queue mutex when the lock mutex sl@0: ** is held. sl@0: ** sl@0: ** i.e. mutex's must be grabbed in the order "writer", "queue", "lock". sl@0: ** sl@0: ** File system operations (invoked by SQLite thread): sl@0: ** sl@0: ** xOpen sl@0: ** xDelete sl@0: ** xFileExists sl@0: ** sl@0: ** File handle operations (invoked by SQLite thread): sl@0: ** sl@0: ** asyncWrite, asyncClose, asyncTruncate, asyncSync sl@0: ** sl@0: ** The operations above add an entry to the global write-op list. They sl@0: ** prepare the entry, acquire the async.queueMutex momentarily while sl@0: ** list pointers are manipulated to insert the new entry, then release sl@0: ** the mutex and signal the writer thread to wake up in case it happens sl@0: ** to be asleep. sl@0: ** sl@0: ** sl@0: ** asyncRead, asyncFileSize. sl@0: ** sl@0: ** Read operations. Both of these read from both the underlying file sl@0: ** first then adjust their result based on pending writes in the sl@0: ** write-op queue. So async.queueMutex is held for the duration sl@0: ** of these operations to prevent other threads from changing the sl@0: ** queue in mid operation. sl@0: ** sl@0: ** sl@0: ** asyncLock, asyncUnlock, asyncCheckReservedLock sl@0: ** sl@0: ** These primitives implement in-process locking using a hash table sl@0: ** on the file name. Files are locked correctly for connections coming sl@0: ** from the same process. But other processes cannot see these locks sl@0: ** and will therefore not honor them. sl@0: ** sl@0: ** sl@0: ** The writer thread: sl@0: ** sl@0: ** The async.writerMutex is used to make sure only there is only sl@0: ** a single writer thread running at a time. sl@0: ** sl@0: ** Inside the writer thread is a loop that works like this: sl@0: ** sl@0: ** WHILE (write-op list is not empty) sl@0: ** Do IO operation at head of write-op list sl@0: ** Remove entry from head of write-op list sl@0: ** END WHILE sl@0: ** sl@0: ** The async.queueMutex is always held during the test, and when the entry is removed from the head sl@0: ** of the write-op list. Sometimes it is held for the interim sl@0: ** period (while the IO is performed), and sometimes it is sl@0: ** relinquished. It is relinquished if (a) the IO op is an sl@0: ** ASYNC_CLOSE or (b) when the file handle was opened, two of sl@0: ** the underlying systems handles were opened on the same sl@0: ** file-system entry. sl@0: ** sl@0: ** If condition (b) above is true, then one file-handle sl@0: ** (AsyncFile.pBaseRead) is used exclusively by sqlite threads to read the sl@0: ** file, the other (AsyncFile.pBaseWrite) by sqlite3_async_flush() sl@0: ** threads to perform write() operations. This means that read sl@0: ** operations are not blocked by asynchronous writes (although sl@0: ** asynchronous writes may still be blocked by reads). sl@0: ** sl@0: ** This assumes that the OS keeps two handles open on the same file sl@0: ** properly in sync. That is, any read operation that starts after a sl@0: ** write operation on the same file system entry has completed returns sl@0: ** data consistent with the write. We also assume that if one thread sl@0: ** reads a file while another is writing it all bytes other than the sl@0: ** ones actually being written contain valid data. sl@0: ** sl@0: ** If the above assumptions are not true, set the preprocessor symbol sl@0: ** SQLITE_ASYNC_TWO_FILEHANDLES to 0. sl@0: */ sl@0: sl@0: #ifndef SQLITE_ASYNC_TWO_FILEHANDLES sl@0: /* #define SQLITE_ASYNC_TWO_FILEHANDLES 0 */ sl@0: #define SQLITE_ASYNC_TWO_FILEHANDLES 1 sl@0: #endif sl@0: sl@0: /* sl@0: ** State information is held in the static variable "async" defined sl@0: ** as the following structure. sl@0: ** sl@0: ** Both async.ioError and async.nFile are protected by async.queueMutex. sl@0: */ sl@0: static struct TestAsyncStaticData { sl@0: pthread_mutex_t lockMutex; /* For access to aLock hash table */ sl@0: pthread_mutex_t queueMutex; /* Mutex for access to write operation queue */ sl@0: pthread_mutex_t writerMutex; /* Prevents multiple writer threads */ sl@0: pthread_cond_t queueSignal; /* For waking up sleeping writer thread */ sl@0: pthread_cond_t emptySignal; /* Notify when the write queue is empty */ sl@0: AsyncWrite *pQueueFirst; /* Next write operation to be processed */ sl@0: AsyncWrite *pQueueLast; /* Last write operation on the list */ sl@0: AsyncLock *pLock; /* Linked list of all AsyncLock structures */ sl@0: volatile int ioDelay; /* Extra delay between write operations */ sl@0: volatile int writerHaltWhenIdle; /* Writer thread halts when queue empty */ sl@0: volatile int writerHaltNow; /* Writer thread halts after next op */ sl@0: int ioError; /* True if an IO error has occured */ sl@0: int nFile; /* Number of open files (from sqlite pov) */ sl@0: } async = { sl@0: PTHREAD_MUTEX_INITIALIZER, sl@0: PTHREAD_MUTEX_INITIALIZER, sl@0: PTHREAD_MUTEX_INITIALIZER, sl@0: PTHREAD_COND_INITIALIZER, sl@0: PTHREAD_COND_INITIALIZER, sl@0: }; sl@0: sl@0: /* Possible values of AsyncWrite.op */ sl@0: #define ASYNC_NOOP 0 sl@0: #define ASYNC_WRITE 1 sl@0: #define ASYNC_SYNC 2 sl@0: #define ASYNC_TRUNCATE 3 sl@0: #define ASYNC_CLOSE 4 sl@0: #define ASYNC_DELETE 5 sl@0: #define ASYNC_OPENEXCLUSIVE 6 sl@0: #define ASYNC_UNLOCK 7 sl@0: sl@0: /* Names of opcodes. Used for debugging only. sl@0: ** Make sure these stay in sync with the macros above! sl@0: */ sl@0: static const char *azOpcodeName[] = { sl@0: "NOOP", "WRITE", "SYNC", "TRUNCATE", "CLOSE", "DELETE", "OPENEX", "UNLOCK" sl@0: }; sl@0: sl@0: /* sl@0: ** Entries on the write-op queue are instances of the AsyncWrite sl@0: ** structure, defined here. sl@0: ** sl@0: ** The interpretation of the iOffset and nByte variables varies depending sl@0: ** on the value of AsyncWrite.op: sl@0: ** sl@0: ** ASYNC_NOOP: sl@0: ** No values used. sl@0: ** sl@0: ** ASYNC_WRITE: sl@0: ** iOffset -> Offset in file to write to. sl@0: ** nByte -> Number of bytes of data to write (pointed to by zBuf). sl@0: ** sl@0: ** ASYNC_SYNC: sl@0: ** nByte -> flags to pass to sqlite3OsSync(). sl@0: ** sl@0: ** ASYNC_TRUNCATE: sl@0: ** iOffset -> Size to truncate file to. sl@0: ** nByte -> Unused. sl@0: ** sl@0: ** ASYNC_CLOSE: sl@0: ** iOffset -> Unused. sl@0: ** nByte -> Unused. sl@0: ** sl@0: ** ASYNC_DELETE: sl@0: ** iOffset -> Contains the "syncDir" flag. sl@0: ** nByte -> Number of bytes of zBuf points to (file name). sl@0: ** sl@0: ** ASYNC_OPENEXCLUSIVE: sl@0: ** iOffset -> Value of "delflag". sl@0: ** nByte -> Number of bytes of zBuf points to (file name). sl@0: ** sl@0: ** ASYNC_UNLOCK: sl@0: ** nByte -> Argument to sqlite3OsUnlock(). sl@0: ** sl@0: ** sl@0: ** For an ASYNC_WRITE operation, zBuf points to the data to write to the file. sl@0: ** This space is sqlite3_malloc()d along with the AsyncWrite structure in a sl@0: ** single blob, so is deleted when sqlite3_free() is called on the parent sl@0: ** structure. sl@0: */ sl@0: struct AsyncWrite { sl@0: AsyncFileData *pFileData; /* File to write data to or sync */ sl@0: int op; /* One of ASYNC_xxx etc. */ sl@0: sqlite_int64 iOffset; /* See above */ sl@0: int nByte; /* See above */ sl@0: char *zBuf; /* Data to write to file (or NULL if op!=ASYNC_WRITE) */ sl@0: AsyncWrite *pNext; /* Next write operation (to any file) */ sl@0: }; sl@0: sl@0: /* sl@0: ** An instance of this structure is created for each distinct open file sl@0: ** (i.e. if two handles are opened on the one file, only one of these sl@0: ** structures is allocated) and stored in the async.aLock hash table. The sl@0: ** keys for async.aLock are the full pathnames of the opened files. sl@0: ** sl@0: ** AsyncLock.pList points to the head of a linked list of AsyncFileLock sl@0: ** structures, one for each handle currently open on the file. sl@0: ** sl@0: ** If the opened file is not a main-database (the SQLITE_OPEN_MAIN_DB is sl@0: ** not passed to the sqlite3OsOpen() call), or if ENABLE_FILE_LOCKING is sl@0: ** not defined at compile time, variables AsyncLock.pFile and sl@0: ** AsyncLock.eLock are never used. Otherwise, pFile is a file handle sl@0: ** opened on the file in question and used to obtain the file-system sl@0: ** locks required by database connections within this process. sl@0: ** sl@0: ** See comments above the asyncLock() function for more details on sl@0: ** the implementation of database locking used by this backend. sl@0: */ sl@0: struct AsyncLock { sl@0: char *zFile; sl@0: int nFile; sl@0: sqlite3_file *pFile; sl@0: int eLock; sl@0: AsyncFileLock *pList; sl@0: AsyncLock *pNext; /* Next in linked list headed by async.pLock */ sl@0: }; sl@0: sl@0: /* sl@0: ** An instance of the following structure is allocated along with each sl@0: ** AsyncFileData structure (see AsyncFileData.lock), but is only used if the sl@0: ** file was opened with the SQLITE_OPEN_MAIN_DB. sl@0: */ sl@0: struct AsyncFileLock { sl@0: int eLock; /* Internally visible lock state (sqlite pov) */ sl@0: int eAsyncLock; /* Lock-state with write-queue unlock */ sl@0: AsyncFileLock *pNext; sl@0: }; sl@0: sl@0: /* sl@0: ** The AsyncFile structure is a subclass of sqlite3_file used for sl@0: ** asynchronous IO. sl@0: ** sl@0: ** All of the actual data for the structure is stored in the structure sl@0: ** pointed to by AsyncFile.pData, which is allocated as part of the sl@0: ** sqlite3OsOpen() using sqlite3_malloc(). The reason for this is that the sl@0: ** lifetime of the AsyncFile structure is ended by the caller after OsClose() sl@0: ** is called, but the data in AsyncFileData may be required by the sl@0: ** writer thread after that point. sl@0: */ sl@0: struct AsyncFile { sl@0: sqlite3_io_methods *pMethod; sl@0: AsyncFileData *pData; sl@0: }; sl@0: struct AsyncFileData { sl@0: char *zName; /* Underlying OS filename - used for debugging */ sl@0: int nName; /* Number of characters in zName */ sl@0: sqlite3_file *pBaseRead; /* Read handle to the underlying Os file */ sl@0: sqlite3_file *pBaseWrite; /* Write handle to the underlying Os file */ sl@0: AsyncFileLock lock; /* Lock state for this handle */ sl@0: AsyncLock *pLock; /* AsyncLock object for this file system entry */ sl@0: AsyncWrite close; sl@0: }; sl@0: sl@0: /* sl@0: ** The following async_XXX functions are debugging wrappers around the sl@0: ** corresponding pthread_XXX functions: sl@0: ** sl@0: ** pthread_mutex_lock(); sl@0: ** pthread_mutex_unlock(); sl@0: ** pthread_mutex_trylock(); sl@0: ** pthread_cond_wait(); sl@0: ** sl@0: ** It is illegal to pass any mutex other than those stored in the sl@0: ** following global variables of these functions. sl@0: ** sl@0: ** async.queueMutex sl@0: ** async.writerMutex sl@0: ** async.lockMutex sl@0: ** sl@0: ** If NDEBUG is defined, these wrappers do nothing except call the sl@0: ** corresponding pthreads function. If NDEBUG is not defined, then the sl@0: ** following variables are used to store the thread-id (as returned sl@0: ** by pthread_self()) currently holding the mutex, or 0 otherwise: sl@0: ** sl@0: ** asyncdebug.queueMutexHolder sl@0: ** asyncdebug.writerMutexHolder sl@0: ** asyncdebug.lockMutexHolder sl@0: ** sl@0: ** These variables are used by some assert() statements that verify sl@0: ** the statements made in the "Deadlock Prevention" notes earlier sl@0: ** in this file. sl@0: */ sl@0: #ifndef NDEBUG sl@0: sl@0: static struct TestAsyncDebugData { sl@0: pthread_t lockMutexHolder; sl@0: pthread_t queueMutexHolder; sl@0: pthread_t writerMutexHolder; sl@0: } asyncdebug = {0, 0, 0}; sl@0: sl@0: /* sl@0: ** Wrapper around pthread_mutex_lock(). Checks that we have not violated sl@0: ** the anti-deadlock rules (see "Deadlock prevention" above). sl@0: */ sl@0: static int async_mutex_lock(pthread_mutex_t *pMutex){ sl@0: int iIdx; sl@0: int rc; sl@0: pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); sl@0: pthread_t *aHolder = (pthread_t *)(&asyncdebug); sl@0: sl@0: /* The code in this 'ifndef NDEBUG' block depends on a certain alignment sl@0: * of the variables in TestAsyncStaticData and TestAsyncDebugData. The sl@0: * following assert() statements check that this has not been changed. sl@0: * sl@0: * Really, these only need to be run once at startup time. sl@0: */ sl@0: assert(&(aMutex[0])==&async.lockMutex); sl@0: assert(&(aMutex[1])==&async.queueMutex); sl@0: assert(&(aMutex[2])==&async.writerMutex); sl@0: assert(&(aHolder[0])==&asyncdebug.lockMutexHolder); sl@0: assert(&(aHolder[1])==&asyncdebug.queueMutexHolder); sl@0: assert(&(aHolder[2])==&asyncdebug.writerMutexHolder); sl@0: sl@0: assert( pthread_self()!=0 ); sl@0: sl@0: for(iIdx=0; iIdx<3; iIdx++){ sl@0: if( pMutex==&aMutex[iIdx] ) break; sl@0: sl@0: /* This is the key assert(). Here we are checking that if the caller sl@0: * is trying to block on async.writerMutex, neither of the other two sl@0: * mutex are held. If the caller is trying to block on async.queueMutex, sl@0: * lockMutex is not held. sl@0: */ sl@0: assert(!pthread_equal(aHolder[iIdx], pthread_self())); sl@0: } sl@0: assert(iIdx<3); sl@0: sl@0: rc = pthread_mutex_lock(pMutex); sl@0: if( rc==0 ){ sl@0: assert(aHolder[iIdx]==0); sl@0: aHolder[iIdx] = pthread_self(); sl@0: } sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Wrapper around pthread_mutex_unlock(). sl@0: */ sl@0: static int async_mutex_unlock(pthread_mutex_t *pMutex){ sl@0: int iIdx; sl@0: int rc; sl@0: pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); sl@0: pthread_t *aHolder = (pthread_t *)(&asyncdebug); sl@0: sl@0: for(iIdx=0; iIdx<3; iIdx++){ sl@0: if( pMutex==&aMutex[iIdx] ) break; sl@0: } sl@0: assert(iIdx<3); sl@0: sl@0: assert(pthread_equal(aHolder[iIdx], pthread_self())); sl@0: aHolder[iIdx] = 0; sl@0: rc = pthread_mutex_unlock(pMutex); sl@0: assert(rc==0); sl@0: sl@0: return 0; sl@0: } sl@0: sl@0: /* sl@0: ** Wrapper around pthread_mutex_trylock(). sl@0: */ sl@0: static int async_mutex_trylock(pthread_mutex_t *pMutex){ sl@0: int iIdx; sl@0: int rc; sl@0: pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); sl@0: pthread_t *aHolder = (pthread_t *)(&asyncdebug); sl@0: sl@0: for(iIdx=0; iIdx<3; iIdx++){ sl@0: if( pMutex==&aMutex[iIdx] ) break; sl@0: } sl@0: assert(iIdx<3); sl@0: sl@0: rc = pthread_mutex_trylock(pMutex); sl@0: if( rc==0 ){ sl@0: assert(aHolder[iIdx]==0); sl@0: aHolder[iIdx] = pthread_self(); sl@0: } sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Wrapper around pthread_cond_wait(). sl@0: */ sl@0: static int async_cond_wait(pthread_cond_t *pCond, pthread_mutex_t *pMutex){ sl@0: int iIdx; sl@0: int rc; sl@0: pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); sl@0: pthread_t *aHolder = (pthread_t *)(&asyncdebug); sl@0: sl@0: for(iIdx=0; iIdx<3; iIdx++){ sl@0: if( pMutex==&aMutex[iIdx] ) break; sl@0: } sl@0: assert(iIdx<3); sl@0: sl@0: assert(pthread_equal(aHolder[iIdx],pthread_self())); sl@0: aHolder[iIdx] = 0; sl@0: rc = pthread_cond_wait(pCond, pMutex); sl@0: if( rc==0 ){ sl@0: aHolder[iIdx] = pthread_self(); sl@0: } sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Assert that the mutex is held by the current thread. sl@0: */ sl@0: static void assert_mutex_is_held(pthread_mutex_t *pMutex){ sl@0: int iIdx; sl@0: pthread_mutex_t *aMutex = (pthread_mutex_t *)(&async); sl@0: pthread_t *aHolder = (pthread_t *)(&asyncdebug); sl@0: sl@0: for(iIdx=0; iIdx<3; iIdx++){ sl@0: if( pMutex==&aMutex[iIdx] ) break; sl@0: } sl@0: assert(iIdx<3); sl@0: assert( aHolder[iIdx]==pthread_self() ); sl@0: } sl@0: sl@0: /* Call our async_XX wrappers instead of selected pthread_XX functions */ sl@0: #define pthread_mutex_lock async_mutex_lock sl@0: #define pthread_mutex_unlock async_mutex_unlock sl@0: #define pthread_mutex_trylock async_mutex_trylock sl@0: #define pthread_cond_wait async_cond_wait sl@0: sl@0: #else /* if defined(NDEBUG) */ sl@0: sl@0: #define assert_mutex_is_held(X) /* A no-op when not debugging */ sl@0: sl@0: #endif /* !defined(NDEBUG) */ sl@0: sl@0: /* sl@0: ** Add an entry to the end of the global write-op list. pWrite should point sl@0: ** to an AsyncWrite structure allocated using sqlite3_malloc(). The writer sl@0: ** thread will call sqlite3_free() to free the structure after the specified sl@0: ** operation has been completed. sl@0: ** sl@0: ** Once an AsyncWrite structure has been added to the list, it becomes the sl@0: ** property of the writer thread and must not be read or modified by the sl@0: ** caller. sl@0: */ sl@0: static void addAsyncWrite(AsyncWrite *pWrite){ sl@0: /* We must hold the queue mutex in order to modify the queue pointers */ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: sl@0: /* Add the record to the end of the write-op queue */ sl@0: assert( !pWrite->pNext ); sl@0: if( async.pQueueLast ){ sl@0: assert( async.pQueueFirst ); sl@0: async.pQueueLast->pNext = pWrite; sl@0: }else{ sl@0: async.pQueueFirst = pWrite; sl@0: } sl@0: async.pQueueLast = pWrite; sl@0: ASYNC_TRACE(("PUSH %p (%s %s %d)\n", pWrite, azOpcodeName[pWrite->op], sl@0: pWrite->pFileData ? pWrite->pFileData->zName : "-", pWrite->iOffset)); sl@0: sl@0: if( pWrite->op==ASYNC_CLOSE ){ sl@0: async.nFile--; sl@0: } sl@0: sl@0: /* Drop the queue mutex */ sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: sl@0: /* The writer thread might have been idle because there was nothing sl@0: ** on the write-op queue for it to do. So wake it up. */ sl@0: pthread_cond_signal(&async.queueSignal); sl@0: } sl@0: sl@0: /* sl@0: ** Increment async.nFile in a thread-safe manner. sl@0: */ sl@0: static void incrOpenFileCount(){ sl@0: /* We must hold the queue mutex in order to modify async.nFile */ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: if( async.nFile==0 ){ sl@0: async.ioError = SQLITE_OK; sl@0: } sl@0: async.nFile++; sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: } sl@0: sl@0: /* sl@0: ** This is a utility function to allocate and populate a new AsyncWrite sl@0: ** structure and insert it (via addAsyncWrite() ) into the global list. sl@0: */ sl@0: static int addNewAsyncWrite( sl@0: AsyncFileData *pFileData, sl@0: int op, sl@0: sqlite3_int64 iOffset, sl@0: int nByte, sl@0: const char *zByte sl@0: ){ sl@0: AsyncWrite *p; sl@0: if( op!=ASYNC_CLOSE && async.ioError ){ sl@0: return async.ioError; sl@0: } sl@0: p = sqlite3_malloc(sizeof(AsyncWrite) + (zByte?nByte:0)); sl@0: if( !p ){ sl@0: /* The upper layer does not expect operations like OsWrite() to sl@0: ** return SQLITE_NOMEM. This is partly because under normal conditions sl@0: ** SQLite is required to do rollback without calling malloc(). So sl@0: ** if malloc() fails here, treat it as an I/O error. The above sl@0: ** layer knows how to handle that. sl@0: */ sl@0: return SQLITE_IOERR; sl@0: } sl@0: p->op = op; sl@0: p->iOffset = iOffset; sl@0: p->nByte = nByte; sl@0: p->pFileData = pFileData; sl@0: p->pNext = 0; sl@0: if( zByte ){ sl@0: p->zBuf = (char *)&p[1]; sl@0: memcpy(p->zBuf, zByte, nByte); sl@0: }else{ sl@0: p->zBuf = 0; sl@0: } sl@0: addAsyncWrite(p); sl@0: return SQLITE_OK; sl@0: } sl@0: sl@0: /* sl@0: ** Close the file. This just adds an entry to the write-op list, the file is sl@0: ** not actually closed. sl@0: */ sl@0: static int asyncClose(sqlite3_file *pFile){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: sl@0: /* Unlock the file, if it is locked */ sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: p->lock.eLock = 0; sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: sl@0: addAsyncWrite(&p->close); sl@0: return SQLITE_OK; sl@0: } sl@0: sl@0: /* sl@0: ** Implementation of sqlite3OsWrite() for asynchronous files. Instead of sl@0: ** writing to the underlying file, this function adds an entry to the end of sl@0: ** the global AsyncWrite list. Either SQLITE_OK or SQLITE_NOMEM may be sl@0: ** returned. sl@0: */ sl@0: static int asyncWrite( sl@0: sqlite3_file *pFile, sl@0: const void *pBuf, sl@0: int amt, sl@0: sqlite3_int64 iOff sl@0: ){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: return addNewAsyncWrite(p, ASYNC_WRITE, iOff, amt, pBuf); sl@0: } sl@0: sl@0: /* sl@0: ** Read data from the file. First we read from the filesystem, then adjust sl@0: ** the contents of the buffer based on ASYNC_WRITE operations in the sl@0: ** write-op queue. sl@0: ** sl@0: ** This method holds the mutex from start to finish. sl@0: */ sl@0: static int asyncRead( sl@0: sqlite3_file *pFile, sl@0: void *zOut, sl@0: int iAmt, sl@0: sqlite3_int64 iOffset sl@0: ){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: int rc = SQLITE_OK; sl@0: sqlite3_int64 filesize; sl@0: int nRead; sl@0: sqlite3_file *pBase = p->pBaseRead; sl@0: sl@0: /* Grab the write queue mutex for the duration of the call */ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: sl@0: /* If an I/O error has previously occurred in this virtual file sl@0: ** system, then all subsequent operations fail. sl@0: */ sl@0: if( async.ioError!=SQLITE_OK ){ sl@0: rc = async.ioError; sl@0: goto asyncread_out; sl@0: } sl@0: sl@0: if( pBase->pMethods ){ sl@0: rc = pBase->pMethods->xFileSize(pBase, &filesize); sl@0: if( rc!=SQLITE_OK ){ sl@0: goto asyncread_out; sl@0: } sl@0: nRead = MIN(filesize - iOffset, iAmt); sl@0: if( nRead>0 ){ sl@0: rc = pBase->pMethods->xRead(pBase, zOut, nRead, iOffset); sl@0: ASYNC_TRACE(("READ %s %d bytes at %d\n", p->zName, nRead, iOffset)); sl@0: } sl@0: } sl@0: sl@0: if( rc==SQLITE_OK ){ sl@0: AsyncWrite *pWrite; sl@0: char *zName = p->zName; sl@0: sl@0: for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ sl@0: if( pWrite->op==ASYNC_WRITE && ( sl@0: (pWrite->pFileData==p) || sl@0: (zName && pWrite->pFileData->zName==zName) sl@0: )){ sl@0: int iBeginOut = (pWrite->iOffset-iOffset); sl@0: int iBeginIn = -iBeginOut; sl@0: int nCopy; sl@0: sl@0: if( iBeginIn<0 ) iBeginIn = 0; sl@0: if( iBeginOut<0 ) iBeginOut = 0; sl@0: nCopy = MIN(pWrite->nByte-iBeginIn, iAmt-iBeginOut); sl@0: sl@0: if( nCopy>0 ){ sl@0: memcpy(&((char *)zOut)[iBeginOut], &pWrite->zBuf[iBeginIn], nCopy); sl@0: ASYNC_TRACE(("OVERREAD %d bytes at %d\n", nCopy, iBeginOut+iOffset)); sl@0: } sl@0: } sl@0: } sl@0: } sl@0: sl@0: asyncread_out: sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Truncate the file to nByte bytes in length. This just adds an entry to sl@0: ** the write-op list, no IO actually takes place. sl@0: */ sl@0: static int asyncTruncate(sqlite3_file *pFile, sqlite3_int64 nByte){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: return addNewAsyncWrite(p, ASYNC_TRUNCATE, nByte, 0, 0); sl@0: } sl@0: sl@0: /* sl@0: ** Sync the file. This just adds an entry to the write-op list, the sl@0: ** sync() is done later by sqlite3_async_flush(). sl@0: */ sl@0: static int asyncSync(sqlite3_file *pFile, int flags){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: return addNewAsyncWrite(p, ASYNC_SYNC, 0, flags, 0); sl@0: } sl@0: sl@0: /* sl@0: ** Read the size of the file. First we read the size of the file system sl@0: ** entry, then adjust for any ASYNC_WRITE or ASYNC_TRUNCATE operations sl@0: ** currently in the write-op list. sl@0: ** sl@0: ** This method holds the mutex from start to finish. sl@0: */ sl@0: int asyncFileSize(sqlite3_file *pFile, sqlite3_int64 *piSize){ sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: int rc = SQLITE_OK; sl@0: sqlite3_int64 s = 0; sl@0: sqlite3_file *pBase; sl@0: sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: sl@0: /* Read the filesystem size from the base file. If pBaseRead is NULL, this sl@0: ** means the file hasn't been opened yet. In this case all relevant data sl@0: ** must be in the write-op queue anyway, so we can omit reading from the sl@0: ** file-system. sl@0: */ sl@0: pBase = p->pBaseRead; sl@0: if( pBase->pMethods ){ sl@0: rc = pBase->pMethods->xFileSize(pBase, &s); sl@0: } sl@0: sl@0: if( rc==SQLITE_OK ){ sl@0: AsyncWrite *pWrite; sl@0: for(pWrite=async.pQueueFirst; pWrite; pWrite = pWrite->pNext){ sl@0: if( pWrite->op==ASYNC_DELETE sl@0: && p->zName sl@0: && strcmp(p->zName, pWrite->zBuf)==0 sl@0: ){ sl@0: s = 0; sl@0: }else if( pWrite->pFileData && ( sl@0: (pWrite->pFileData==p) sl@0: || (p->zName && pWrite->pFileData->zName==p->zName) sl@0: )){ sl@0: switch( pWrite->op ){ sl@0: case ASYNC_WRITE: sl@0: s = MAX(pWrite->iOffset + (sqlite3_int64)(pWrite->nByte), s); sl@0: break; sl@0: case ASYNC_TRUNCATE: sl@0: s = MIN(s, pWrite->iOffset); sl@0: break; sl@0: } sl@0: } sl@0: } sl@0: *piSize = s; sl@0: } sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Lock or unlock the actual file-system entry. sl@0: */ sl@0: static int getFileLock(AsyncLock *pLock){ sl@0: int rc = SQLITE_OK; sl@0: AsyncFileLock *pIter; sl@0: int eRequired = 0; sl@0: sl@0: if( pLock->pFile ){ sl@0: for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ sl@0: assert(pIter->eAsyncLock>=pIter->eLock); sl@0: if( pIter->eAsyncLock>eRequired ){ sl@0: eRequired = pIter->eAsyncLock; sl@0: assert(eRequired>=0 && eRequired<=SQLITE_LOCK_EXCLUSIVE); sl@0: } sl@0: } sl@0: sl@0: if( eRequired>pLock->eLock ){ sl@0: rc = pLock->pFile->pMethods->xLock(pLock->pFile, eRequired); sl@0: if( rc==SQLITE_OK ){ sl@0: pLock->eLock = eRequired; sl@0: } sl@0: } sl@0: else if( eRequiredeLock && eRequired<=SQLITE_LOCK_SHARED ){ sl@0: rc = pLock->pFile->pMethods->xUnlock(pLock->pFile, eRequired); sl@0: if( rc==SQLITE_OK ){ sl@0: pLock->eLock = eRequired; sl@0: } sl@0: } sl@0: } sl@0: sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Return the AsyncLock structure from the global async.pLock list sl@0: ** associated with the file-system entry identified by path zName sl@0: ** (a string of nName bytes). If no such structure exists, return 0. sl@0: */ sl@0: static AsyncLock *findLock(const char *zName, int nName){ sl@0: AsyncLock *p = async.pLock; sl@0: while( p && (p->nFile!=nName || memcmp(p->zFile, zName, nName)) ){ sl@0: p = p->pNext; sl@0: } sl@0: return p; sl@0: } sl@0: sl@0: /* sl@0: ** The following two methods - asyncLock() and asyncUnlock() - are used sl@0: ** to obtain and release locks on database files opened with the sl@0: ** asynchronous backend. sl@0: */ sl@0: static int asyncLock(sqlite3_file *pFile, int eLock){ sl@0: int rc = SQLITE_OK; sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: sl@0: if( p->zName ){ sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: if( p->lock.eLockpLock; sl@0: AsyncFileLock *pIter; sl@0: assert(pLock && pLock->pList); sl@0: for(pIter=pLock->pList; pIter; pIter=pIter->pNext){ sl@0: if( pIter!=&p->lock && ( sl@0: (eLock==SQLITE_LOCK_EXCLUSIVE && pIter->eLock>=SQLITE_LOCK_SHARED) || sl@0: (eLock==SQLITE_LOCK_PENDING && pIter->eLock>=SQLITE_LOCK_RESERVED) || sl@0: (eLock==SQLITE_LOCK_RESERVED && pIter->eLock>=SQLITE_LOCK_RESERVED) || sl@0: (eLock==SQLITE_LOCK_SHARED && pIter->eLock>=SQLITE_LOCK_PENDING) sl@0: )){ sl@0: rc = SQLITE_BUSY; sl@0: } sl@0: } sl@0: if( rc==SQLITE_OK ){ sl@0: p->lock.eLock = eLock; sl@0: p->lock.eAsyncLock = MAX(p->lock.eAsyncLock, eLock); sl@0: } sl@0: assert(p->lock.eAsyncLock>=p->lock.eLock); sl@0: if( rc==SQLITE_OK ){ sl@0: rc = getFileLock(pLock); sl@0: } sl@0: } sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: } sl@0: sl@0: ASYNC_TRACE(("LOCK %d (%s) rc=%d\n", eLock, p->zName, rc)); sl@0: return rc; sl@0: } sl@0: static int asyncUnlock(sqlite3_file *pFile, int eLock){ sl@0: int rc = SQLITE_OK; sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: if( p->zName ){ sl@0: AsyncFileLock *pLock = &p->lock; sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: pLock->eLock = MIN(pLock->eLock, eLock); sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: rc = addNewAsyncWrite(p, ASYNC_UNLOCK, 0, eLock, 0); sl@0: } sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** This function is called when the pager layer first opens a database file sl@0: ** and is checking for a hot-journal. sl@0: */ sl@0: static int asyncCheckReservedLock(sqlite3_file *pFile, int *pResOut){ sl@0: int ret = 0; sl@0: AsyncFileLock *pIter; sl@0: AsyncFileData *p = ((AsyncFile *)pFile)->pData; sl@0: sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: for(pIter=p->pLock->pList; pIter; pIter=pIter->pNext){ sl@0: if( pIter->eLock>=SQLITE_LOCK_RESERVED ){ sl@0: ret = 1; sl@0: } sl@0: } sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: sl@0: ASYNC_TRACE(("CHECK-LOCK %d (%s)\n", ret, p->zName)); sl@0: *pResOut = ret; sl@0: return SQLITE_OK; sl@0: } sl@0: sl@0: /* sl@0: ** sqlite3_file_control() implementation. sl@0: */ sl@0: static int asyncFileControl(sqlite3_file *id, int op, void *pArg){ sl@0: switch( op ){ sl@0: case SQLITE_FCNTL_LOCKSTATE: { sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: *(int*)pArg = ((AsyncFile*)id)->pData->lock.eLock; sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: return SQLITE_OK; sl@0: } sl@0: } sl@0: return SQLITE_ERROR; sl@0: } sl@0: sl@0: /* sl@0: ** Return the device characteristics and sector-size of the device. It sl@0: ** is not tricky to implement these correctly, as this backend might sl@0: ** not have an open file handle at this point. sl@0: */ sl@0: static int asyncSectorSize(sqlite3_file *pFile){ sl@0: return 512; sl@0: } sl@0: static int asyncDeviceCharacteristics(sqlite3_file *pFile){ sl@0: return 0; sl@0: } sl@0: sl@0: static int unlinkAsyncFile(AsyncFileData *pData){ sl@0: AsyncFileLock **ppIter; sl@0: int rc = SQLITE_OK; sl@0: sl@0: if( pData->zName ){ sl@0: AsyncLock *pLock = pData->pLock; sl@0: for(ppIter=&pLock->pList; *ppIter; ppIter=&((*ppIter)->pNext)){ sl@0: if( (*ppIter)==&pData->lock ){ sl@0: *ppIter = pData->lock.pNext; sl@0: break; sl@0: } sl@0: } sl@0: if( !pLock->pList ){ sl@0: AsyncLock **pp; sl@0: if( pLock->pFile ){ sl@0: pLock->pFile->pMethods->xClose(pLock->pFile); sl@0: } sl@0: for(pp=&async.pLock; *pp!=pLock; pp=&((*pp)->pNext)); sl@0: *pp = pLock->pNext; sl@0: sqlite3_free(pLock); sl@0: }else{ sl@0: rc = getFileLock(pLock); sl@0: } sl@0: } sl@0: sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Open a file. sl@0: */ sl@0: static int asyncOpen( sl@0: sqlite3_vfs *pAsyncVfs, sl@0: const char *zName, sl@0: sqlite3_file *pFile, sl@0: int flags, sl@0: int *pOutFlags sl@0: ){ sl@0: static sqlite3_io_methods async_methods = { sl@0: 1, /* iVersion */ sl@0: asyncClose, /* xClose */ sl@0: asyncRead, /* xRead */ sl@0: asyncWrite, /* xWrite */ sl@0: asyncTruncate, /* xTruncate */ sl@0: asyncSync, /* xSync */ sl@0: asyncFileSize, /* xFileSize */ sl@0: asyncLock, /* xLock */ sl@0: asyncUnlock, /* xUnlock */ sl@0: asyncCheckReservedLock, /* xCheckReservedLock */ sl@0: asyncFileControl, /* xFileControl */ sl@0: asyncSectorSize, /* xSectorSize */ sl@0: asyncDeviceCharacteristics /* xDeviceCharacteristics */ sl@0: }; sl@0: sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: AsyncFile *p = (AsyncFile *)pFile; sl@0: int nName = 0; sl@0: int rc = SQLITE_OK; sl@0: int nByte; sl@0: AsyncFileData *pData; sl@0: AsyncLock *pLock = 0; sl@0: char *z; sl@0: int isExclusive = (flags&SQLITE_OPEN_EXCLUSIVE); sl@0: sl@0: /* If zName is NULL, then the upper layer is requesting an anonymous file */ sl@0: if( zName ){ sl@0: nName = strlen(zName)+1; sl@0: } sl@0: sl@0: nByte = ( sl@0: sizeof(AsyncFileData) + /* AsyncFileData structure */ sl@0: 2 * pVfs->szOsFile + /* AsyncFileData.pBaseRead and pBaseWrite */ sl@0: nName /* AsyncFileData.zName */ sl@0: ); sl@0: z = sqlite3_malloc(nByte); sl@0: if( !z ){ sl@0: return SQLITE_NOMEM; sl@0: } sl@0: memset(z, 0, nByte); sl@0: pData = (AsyncFileData*)z; sl@0: z += sizeof(pData[0]); sl@0: pData->pBaseRead = (sqlite3_file*)z; sl@0: z += pVfs->szOsFile; sl@0: pData->pBaseWrite = (sqlite3_file*)z; sl@0: pData->close.pFileData = pData; sl@0: pData->close.op = ASYNC_CLOSE; sl@0: sl@0: if( zName ){ sl@0: z += pVfs->szOsFile; sl@0: pData->zName = z; sl@0: pData->nName = nName; sl@0: memcpy(pData->zName, zName, nName); sl@0: } sl@0: sl@0: if( !isExclusive ){ sl@0: rc = pVfs->xOpen(pVfs, zName, pData->pBaseRead, flags, pOutFlags); sl@0: if( rc==SQLITE_OK && ((*pOutFlags)&SQLITE_OPEN_READWRITE) ){ sl@0: rc = pVfs->xOpen(pVfs, zName, pData->pBaseWrite, flags, 0); sl@0: } sl@0: } sl@0: sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: sl@0: if( zName && rc==SQLITE_OK ){ sl@0: pLock = findLock(pData->zName, pData->nName); sl@0: if( !pLock ){ sl@0: int nByte = pVfs->szOsFile + sizeof(AsyncLock) + pData->nName + 1; sl@0: pLock = (AsyncLock *)sqlite3_malloc(nByte); sl@0: if( pLock ){ sl@0: memset(pLock, 0, nByte); sl@0: #ifdef ENABLE_FILE_LOCKING sl@0: if( flags&SQLITE_OPEN_MAIN_DB ){ sl@0: pLock->pFile = (sqlite3_file *)&pLock[1]; sl@0: rc = pVfs->xOpen(pVfs, zName, pLock->pFile, flags, 0); sl@0: if( rc!=SQLITE_OK ){ sl@0: sqlite3_free(pLock); sl@0: pLock = 0; sl@0: } sl@0: } sl@0: #endif sl@0: if( pLock ){ sl@0: pLock->nFile = pData->nName; sl@0: pLock->zFile = &((char *)(&pLock[1]))[pVfs->szOsFile]; sl@0: memcpy(pLock->zFile, pData->zName, pLock->nFile); sl@0: pLock->pNext = async.pLock; sl@0: async.pLock = pLock; sl@0: } sl@0: }else{ sl@0: rc = SQLITE_NOMEM; sl@0: } sl@0: } sl@0: } sl@0: sl@0: if( rc==SQLITE_OK ){ sl@0: p->pMethod = &async_methods; sl@0: p->pData = pData; sl@0: sl@0: /* Link AsyncFileData.lock into the linked list of sl@0: ** AsyncFileLock structures for this file. sl@0: */ sl@0: if( zName ){ sl@0: pData->lock.pNext = pLock->pList; sl@0: pLock->pList = &pData->lock; sl@0: pData->zName = pLock->zFile; sl@0: } sl@0: }else{ sl@0: if( pData->pBaseRead->pMethods ){ sl@0: pData->pBaseRead->pMethods->xClose(pData->pBaseRead); sl@0: } sl@0: if( pData->pBaseWrite->pMethods ){ sl@0: pData->pBaseWrite->pMethods->xClose(pData->pBaseWrite); sl@0: } sl@0: sqlite3_free(pData); sl@0: } sl@0: sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: sl@0: if( rc==SQLITE_OK ){ sl@0: incrOpenFileCount(); sl@0: pData->pLock = pLock; sl@0: } sl@0: sl@0: if( rc==SQLITE_OK && isExclusive ){ sl@0: rc = addNewAsyncWrite(pData, ASYNC_OPENEXCLUSIVE, (sqlite3_int64)flags,0,0); sl@0: if( rc==SQLITE_OK ){ sl@0: if( pOutFlags ) *pOutFlags = flags; sl@0: }else{ sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: unlinkAsyncFile(pData); sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: sqlite3_free(pData); sl@0: } sl@0: } sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Implementation of sqlite3OsDelete. Add an entry to the end of the sl@0: ** write-op queue to perform the delete. sl@0: */ sl@0: static int asyncDelete(sqlite3_vfs *pAsyncVfs, const char *z, int syncDir){ sl@0: return addNewAsyncWrite(0, ASYNC_DELETE, syncDir, strlen(z)+1, z); sl@0: } sl@0: sl@0: /* sl@0: ** Implementation of sqlite3OsAccess. This method holds the mutex from sl@0: ** start to finish. sl@0: */ sl@0: static int asyncAccess( sl@0: sqlite3_vfs *pAsyncVfs, sl@0: const char *zName, sl@0: int flags, sl@0: int *pResOut sl@0: ){ sl@0: int rc; sl@0: int ret; sl@0: AsyncWrite *p; sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: sl@0: assert(flags==SQLITE_ACCESS_READWRITE sl@0: || flags==SQLITE_ACCESS_READ sl@0: || flags==SQLITE_ACCESS_EXISTS sl@0: ); sl@0: sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: rc = pVfs->xAccess(pVfs, zName, flags, &ret); sl@0: if( rc==SQLITE_OK && flags==SQLITE_ACCESS_EXISTS ){ sl@0: for(p=async.pQueueFirst; p; p = p->pNext){ sl@0: if( p->op==ASYNC_DELETE && 0==strcmp(p->zBuf, zName) ){ sl@0: ret = 0; sl@0: }else if( p->op==ASYNC_OPENEXCLUSIVE sl@0: && p->pFileData->zName sl@0: && 0==strcmp(p->pFileData->zName, zName) sl@0: ){ sl@0: ret = 1; sl@0: } sl@0: } sl@0: } sl@0: ASYNC_TRACE(("ACCESS(%s): %s = %d\n", sl@0: flags==SQLITE_ACCESS_READWRITE?"read-write": sl@0: flags==SQLITE_ACCESS_READ?"read":"exists" sl@0: , zName, ret) sl@0: ); sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: *pResOut = ret; sl@0: return rc; sl@0: } sl@0: sl@0: /* sl@0: ** Fill in zPathOut with the full path to the file identified by zPath. sl@0: */ sl@0: static int asyncFullPathname( sl@0: sqlite3_vfs *pAsyncVfs, sl@0: const char *zPath, sl@0: int nPathOut, sl@0: char *zPathOut sl@0: ){ sl@0: int rc; sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: rc = pVfs->xFullPathname(pVfs, zPath, nPathOut, zPathOut); sl@0: sl@0: /* Because of the way intra-process file locking works, this backend sl@0: ** needs to return a canonical path. The following block assumes the sl@0: ** file-system uses unix style paths. sl@0: */ sl@0: if( rc==SQLITE_OK ){ sl@0: int iIn; sl@0: int iOut = 0; sl@0: int nPathOut = strlen(zPathOut); sl@0: sl@0: for(iIn=0; iIn/../" with "" */ sl@0: if( iOut>0 && iIn<=(nPathOut-4) sl@0: && zPathOut[iIn]=='/' && zPathOut[iIn+1]=='.' sl@0: && zPathOut[iIn+2]=='.' && zPathOut[iIn+3]=='/' sl@0: ){ sl@0: iIn += 3; sl@0: iOut--; sl@0: for( ; iOut>0 && zPathOut[iOut-1]!='/'; iOut--); sl@0: continue; sl@0: } sl@0: sl@0: zPathOut[iOut++] = zPathOut[iIn]; sl@0: } sl@0: zPathOut[iOut] = '\0'; sl@0: } sl@0: sl@0: return rc; sl@0: } sl@0: static void *asyncDlOpen(sqlite3_vfs *pAsyncVfs, const char *zPath){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: return pVfs->xDlOpen(pVfs, zPath); sl@0: } sl@0: static void asyncDlError(sqlite3_vfs *pAsyncVfs, int nByte, char *zErrMsg){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: pVfs->xDlError(pVfs, nByte, zErrMsg); sl@0: } sl@0: static void *asyncDlSym( sl@0: sqlite3_vfs *pAsyncVfs, sl@0: void *pHandle, sl@0: const char *zSymbol sl@0: ){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: return pVfs->xDlSym(pVfs, pHandle, zSymbol); sl@0: } sl@0: static void asyncDlClose(sqlite3_vfs *pAsyncVfs, void *pHandle){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: pVfs->xDlClose(pVfs, pHandle); sl@0: } sl@0: static int asyncRandomness(sqlite3_vfs *pAsyncVfs, int nByte, char *zBufOut){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: return pVfs->xRandomness(pVfs, nByte, zBufOut); sl@0: } sl@0: static int asyncSleep(sqlite3_vfs *pAsyncVfs, int nMicro){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: return pVfs->xSleep(pVfs, nMicro); sl@0: } sl@0: static int asyncCurrentTime(sqlite3_vfs *pAsyncVfs, double *pTimeOut){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)pAsyncVfs->pAppData; sl@0: return pVfs->xCurrentTime(pVfs, pTimeOut); sl@0: } sl@0: sl@0: static sqlite3_vfs async_vfs = { sl@0: 1, /* iVersion */ sl@0: sizeof(AsyncFile), /* szOsFile */ sl@0: 0, /* mxPathname */ sl@0: 0, /* pNext */ sl@0: "async", /* zName */ sl@0: 0, /* pAppData */ sl@0: asyncOpen, /* xOpen */ sl@0: asyncDelete, /* xDelete */ sl@0: asyncAccess, /* xAccess */ sl@0: asyncFullPathname, /* xFullPathname */ sl@0: asyncDlOpen, /* xDlOpen */ sl@0: asyncDlError, /* xDlError */ sl@0: asyncDlSym, /* xDlSym */ sl@0: asyncDlClose, /* xDlClose */ sl@0: asyncRandomness, /* xDlError */ sl@0: asyncSleep, /* xDlSym */ sl@0: asyncCurrentTime /* xDlClose */ sl@0: }; sl@0: sl@0: /* sl@0: ** Call this routine to enable or disable the sl@0: ** asynchronous IO features implemented in this file. sl@0: ** sl@0: ** This routine is not even remotely threadsafe. Do not call sl@0: ** this routine while any SQLite database connections are open. sl@0: */ sl@0: static void asyncEnable(int enable){ sl@0: if( enable ){ sl@0: if( !async_vfs.pAppData ){ sl@0: async_vfs.pAppData = (void *)sqlite3_vfs_find(0); sl@0: async_vfs.mxPathname = ((sqlite3_vfs *)async_vfs.pAppData)->mxPathname; sl@0: sqlite3_vfs_register(&async_vfs, 1); sl@0: } sl@0: }else{ sl@0: if( async_vfs.pAppData ){ sl@0: sqlite3_vfs_unregister(&async_vfs); sl@0: async_vfs.pAppData = 0; sl@0: } sl@0: } sl@0: } sl@0: sl@0: /* sl@0: ** This procedure runs in a separate thread, reading messages off of the sl@0: ** write queue and processing them one by one. sl@0: ** sl@0: ** If async.writerHaltNow is true, then this procedure exits sl@0: ** after processing a single message. sl@0: ** sl@0: ** If async.writerHaltWhenIdle is true, then this procedure exits when sl@0: ** the write queue is empty. sl@0: ** sl@0: ** If both of the above variables are false, this procedure runs sl@0: ** indefinately, waiting for operations to be added to the write queue sl@0: ** and processing them in the order in which they arrive. sl@0: ** sl@0: ** An artifical delay of async.ioDelay milliseconds is inserted before sl@0: ** each write operation in order to simulate the effect of a slow disk. sl@0: ** sl@0: ** Only one instance of this procedure may be running at a time. sl@0: */ sl@0: static void *asyncWriterThread(void *pIsStarted){ sl@0: sqlite3_vfs *pVfs = (sqlite3_vfs *)(async_vfs.pAppData); sl@0: AsyncWrite *p = 0; sl@0: int rc = SQLITE_OK; sl@0: int holdingMutex = 0; sl@0: sl@0: if( pthread_mutex_trylock(&async.writerMutex) ){ sl@0: return 0; sl@0: } sl@0: (*(int *)pIsStarted) = 1; sl@0: while( async.writerHaltNow==0 ){ sl@0: int doNotFree = 0; sl@0: sqlite3_file *pBase = 0; sl@0: sl@0: if( !holdingMutex ){ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: } sl@0: while( (p = async.pQueueFirst)==0 ){ sl@0: pthread_cond_broadcast(&async.emptySignal); sl@0: if( async.writerHaltWhenIdle ){ sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: break; sl@0: }else{ sl@0: ASYNC_TRACE(("IDLE\n")); sl@0: pthread_cond_wait(&async.queueSignal, &async.queueMutex); sl@0: ASYNC_TRACE(("WAKEUP\n")); sl@0: } sl@0: } sl@0: if( p==0 ) break; sl@0: holdingMutex = 1; sl@0: sl@0: /* Right now this thread is holding the mutex on the write-op queue. sl@0: ** Variable 'p' points to the first entry in the write-op queue. In sl@0: ** the general case, we hold on to the mutex for the entire body of sl@0: ** the loop. sl@0: ** sl@0: ** However in the cases enumerated below, we relinquish the mutex, sl@0: ** perform the IO, and then re-request the mutex before removing 'p' from sl@0: ** the head of the write-op queue. The idea is to increase concurrency with sl@0: ** sqlite threads. sl@0: ** sl@0: ** * An ASYNC_CLOSE operation. sl@0: ** * An ASYNC_OPENEXCLUSIVE operation. For this one, we relinquish sl@0: ** the mutex, call the underlying xOpenExclusive() function, then sl@0: ** re-aquire the mutex before seting the AsyncFile.pBaseRead sl@0: ** variable. sl@0: ** * ASYNC_SYNC and ASYNC_WRITE operations, if sl@0: ** SQLITE_ASYNC_TWO_FILEHANDLES was set at compile time and two sl@0: ** file-handles are open for the particular file being "synced". sl@0: */ sl@0: if( async.ioError!=SQLITE_OK && p->op!=ASYNC_CLOSE ){ sl@0: p->op = ASYNC_NOOP; sl@0: } sl@0: if( p->pFileData ){ sl@0: pBase = p->pFileData->pBaseWrite; sl@0: if( sl@0: p->op==ASYNC_CLOSE || sl@0: p->op==ASYNC_OPENEXCLUSIVE || sl@0: (pBase->pMethods && (p->op==ASYNC_SYNC || p->op==ASYNC_WRITE) ) sl@0: ){ sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: holdingMutex = 0; sl@0: } sl@0: if( !pBase->pMethods ){ sl@0: pBase = p->pFileData->pBaseRead; sl@0: } sl@0: } sl@0: sl@0: switch( p->op ){ sl@0: case ASYNC_NOOP: sl@0: break; sl@0: sl@0: case ASYNC_WRITE: sl@0: assert( pBase ); sl@0: ASYNC_TRACE(("WRITE %s %d bytes at %d\n", sl@0: p->pFileData->zName, p->nByte, p->iOffset)); sl@0: rc = pBase->pMethods->xWrite(pBase, (void *)(p->zBuf), p->nByte, p->iOffset); sl@0: break; sl@0: sl@0: case ASYNC_SYNC: sl@0: assert( pBase ); sl@0: ASYNC_TRACE(("SYNC %s\n", p->pFileData->zName)); sl@0: rc = pBase->pMethods->xSync(pBase, p->nByte); sl@0: break; sl@0: sl@0: case ASYNC_TRUNCATE: sl@0: assert( pBase ); sl@0: ASYNC_TRACE(("TRUNCATE %s to %d bytes\n", sl@0: p->pFileData->zName, p->iOffset)); sl@0: rc = pBase->pMethods->xTruncate(pBase, p->iOffset); sl@0: break; sl@0: sl@0: case ASYNC_CLOSE: { sl@0: AsyncFileData *pData = p->pFileData; sl@0: ASYNC_TRACE(("CLOSE %s\n", p->pFileData->zName)); sl@0: if( pData->pBaseWrite->pMethods ){ sl@0: pData->pBaseWrite->pMethods->xClose(pData->pBaseWrite); sl@0: } sl@0: if( pData->pBaseRead->pMethods ){ sl@0: pData->pBaseRead->pMethods->xClose(pData->pBaseRead); sl@0: } sl@0: sl@0: /* Unlink AsyncFileData.lock from the linked list of AsyncFileLock sl@0: ** structures for this file. Obtain the async.lockMutex mutex sl@0: ** before doing so. sl@0: */ sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: rc = unlinkAsyncFile(pData); sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: sl@0: if( !holdingMutex ){ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: holdingMutex = 1; sl@0: } sl@0: assert_mutex_is_held(&async.queueMutex); sl@0: async.pQueueFirst = p->pNext; sl@0: sqlite3_free(pData); sl@0: doNotFree = 1; sl@0: break; sl@0: } sl@0: sl@0: case ASYNC_UNLOCK: { sl@0: AsyncFileData *pData = p->pFileData; sl@0: int eLock = p->nByte; sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: pData->lock.eAsyncLock = MIN( sl@0: pData->lock.eAsyncLock, MAX(pData->lock.eLock, eLock) sl@0: ); sl@0: assert(pData->lock.eAsyncLock>=pData->lock.eLock); sl@0: rc = getFileLock(pData->pLock); sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: break; sl@0: } sl@0: sl@0: case ASYNC_DELETE: sl@0: ASYNC_TRACE(("DELETE %s\n", p->zBuf)); sl@0: rc = pVfs->xDelete(pVfs, p->zBuf, (int)p->iOffset); sl@0: break; sl@0: sl@0: case ASYNC_OPENEXCLUSIVE: { sl@0: int flags = (int)p->iOffset; sl@0: AsyncFileData *pData = p->pFileData; sl@0: ASYNC_TRACE(("OPEN %s flags=%d\n", p->zBuf, (int)p->iOffset)); sl@0: assert(pData->pBaseRead->pMethods==0 && pData->pBaseWrite->pMethods==0); sl@0: rc = pVfs->xOpen(pVfs, pData->zName, pData->pBaseRead, flags, 0); sl@0: assert( holdingMutex==0 ); sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: holdingMutex = 1; sl@0: break; sl@0: } sl@0: sl@0: default: assert(!"Illegal value for AsyncWrite.op"); sl@0: } sl@0: sl@0: /* If we didn't hang on to the mutex during the IO op, obtain it now sl@0: ** so that the AsyncWrite structure can be safely removed from the sl@0: ** global write-op queue. sl@0: */ sl@0: if( !holdingMutex ){ sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: holdingMutex = 1; sl@0: } sl@0: /* ASYNC_TRACE(("UNLINK %p\n", p)); */ sl@0: if( p==async.pQueueLast ){ sl@0: async.pQueueLast = 0; sl@0: } sl@0: if( !doNotFree ){ sl@0: assert_mutex_is_held(&async.queueMutex); sl@0: async.pQueueFirst = p->pNext; sl@0: sqlite3_free(p); sl@0: } sl@0: assert( holdingMutex ); sl@0: sl@0: /* An IO error has occured. We cannot report the error back to the sl@0: ** connection that requested the I/O since the error happened sl@0: ** asynchronously. The connection has already moved on. There sl@0: ** really is nobody to report the error to. sl@0: ** sl@0: ** The file for which the error occured may have been a database or sl@0: ** journal file. Regardless, none of the currently queued operations sl@0: ** associated with the same database should now be performed. Nor should sl@0: ** any subsequently requested IO on either a database or journal file sl@0: ** handle for the same database be accepted until the main database sl@0: ** file handle has been closed and reopened. sl@0: ** sl@0: ** Furthermore, no further IO should be queued or performed on any file sl@0: ** handle associated with a database that may have been part of a sl@0: ** multi-file transaction that included the database associated with sl@0: ** the IO error (i.e. a database ATTACHed to the same handle at some sl@0: ** point in time). sl@0: */ sl@0: if( rc!=SQLITE_OK ){ sl@0: async.ioError = rc; sl@0: } sl@0: sl@0: if( async.ioError && !async.pQueueFirst ){ sl@0: pthread_mutex_lock(&async.lockMutex); sl@0: if( 0==async.pLock ){ sl@0: async.ioError = SQLITE_OK; sl@0: } sl@0: pthread_mutex_unlock(&async.lockMutex); sl@0: } sl@0: sl@0: /* Drop the queue mutex before continuing to the next write operation sl@0: ** in order to give other threads a chance to work with the write queue. sl@0: */ sl@0: if( !async.pQueueFirst || !async.ioError ){ sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: holdingMutex = 0; sl@0: if( async.ioDelay>0 ){ sl@0: pVfs->xSleep(pVfs, async.ioDelay); sl@0: }else{ sl@0: sched_yield(); sl@0: } sl@0: } sl@0: } sl@0: sl@0: pthread_mutex_unlock(&async.writerMutex); sl@0: return 0; sl@0: } sl@0: sl@0: /************************************************************************** sl@0: ** The remaining code defines a Tcl interface for testing the asynchronous sl@0: ** IO implementation in this file. sl@0: ** sl@0: ** To adapt the code to a non-TCL environment, delete or comment out sl@0: ** the code that follows. sl@0: */ sl@0: sl@0: /* sl@0: ** sqlite3async_enable ?YES/NO? sl@0: ** sl@0: ** Enable or disable the asynchronous I/O backend. This command is sl@0: ** not thread-safe. Do not call it while any database connections sl@0: ** are open. sl@0: */ sl@0: static int testAsyncEnable( sl@0: void * clientData, sl@0: Tcl_Interp *interp, sl@0: int objc, sl@0: Tcl_Obj *CONST objv[] sl@0: ){ sl@0: if( objc!=1 && objc!=2 ){ sl@0: Tcl_WrongNumArgs(interp, 1, objv, "?YES/NO?"); sl@0: return TCL_ERROR; sl@0: } sl@0: if( objc==1 ){ sl@0: Tcl_SetObjResult(interp, Tcl_NewBooleanObj(async_vfs.pAppData!=0)); sl@0: }else{ sl@0: int en; sl@0: if( Tcl_GetBooleanFromObj(interp, objv[1], &en) ) return TCL_ERROR; sl@0: asyncEnable(en); sl@0: } sl@0: return TCL_OK; sl@0: } sl@0: sl@0: /* sl@0: ** sqlite3async_halt "now"|"idle"|"never" sl@0: ** sl@0: ** Set the conditions at which the writer thread will halt. sl@0: */ sl@0: static int testAsyncHalt( sl@0: void * clientData, sl@0: Tcl_Interp *interp, sl@0: int objc, sl@0: Tcl_Obj *CONST objv[] sl@0: ){ sl@0: const char *zCond; sl@0: if( objc!=2 ){ sl@0: Tcl_WrongNumArgs(interp, 1, objv, "\"now\"|\"idle\"|\"never\""); sl@0: return TCL_ERROR; sl@0: } sl@0: zCond = Tcl_GetString(objv[1]); sl@0: if( strcmp(zCond, "now")==0 ){ sl@0: async.writerHaltNow = 1; sl@0: pthread_cond_broadcast(&async.queueSignal); sl@0: }else if( strcmp(zCond, "idle")==0 ){ sl@0: async.writerHaltWhenIdle = 1; sl@0: async.writerHaltNow = 0; sl@0: pthread_cond_broadcast(&async.queueSignal); sl@0: }else if( strcmp(zCond, "never")==0 ){ sl@0: async.writerHaltWhenIdle = 0; sl@0: async.writerHaltNow = 0; sl@0: }else{ sl@0: Tcl_AppendResult(interp, sl@0: "should be one of: \"now\", \"idle\", or \"never\"", (char*)0); sl@0: return TCL_ERROR; sl@0: } sl@0: return TCL_OK; sl@0: } sl@0: sl@0: /* sl@0: ** sqlite3async_delay ?MS? sl@0: ** sl@0: ** Query or set the number of milliseconds of delay in the writer sl@0: ** thread after each write operation. The default is 0. By increasing sl@0: ** the memory delay we can simulate the effect of slow disk I/O. sl@0: */ sl@0: static int testAsyncDelay( sl@0: void * clientData, sl@0: Tcl_Interp *interp, sl@0: int objc, sl@0: Tcl_Obj *CONST objv[] sl@0: ){ sl@0: if( objc!=1 && objc!=2 ){ sl@0: Tcl_WrongNumArgs(interp, 1, objv, "?MS?"); sl@0: return TCL_ERROR; sl@0: } sl@0: if( objc==1 ){ sl@0: Tcl_SetObjResult(interp, Tcl_NewIntObj(async.ioDelay)); sl@0: }else{ sl@0: int ioDelay; sl@0: if( Tcl_GetIntFromObj(interp, objv[1], &ioDelay) ) return TCL_ERROR; sl@0: async.ioDelay = ioDelay; sl@0: } sl@0: return TCL_OK; sl@0: } sl@0: sl@0: /* sl@0: ** sqlite3async_start sl@0: ** sl@0: ** Start a new writer thread. sl@0: */ sl@0: static int testAsyncStart( sl@0: void * clientData, sl@0: Tcl_Interp *interp, sl@0: int objc, sl@0: Tcl_Obj *CONST objv[] sl@0: ){ sl@0: pthread_t x; sl@0: int rc; sl@0: volatile int isStarted = 0; sl@0: rc = pthread_create(&x, 0, asyncWriterThread, (void *)&isStarted); sl@0: if( rc ){ sl@0: Tcl_AppendResult(interp, "failed to create the thread", 0); sl@0: return TCL_ERROR; sl@0: } sl@0: pthread_detach(x); sl@0: while( isStarted==0 ){ sl@0: sched_yield(); sl@0: } sl@0: return TCL_OK; sl@0: } sl@0: sl@0: /* sl@0: ** sqlite3async_wait sl@0: ** sl@0: ** Wait for the current writer thread to terminate. sl@0: ** sl@0: ** If the current writer thread is set to run forever then this sl@0: ** command would block forever. To prevent that, an error is returned. sl@0: */ sl@0: static int testAsyncWait( sl@0: void * clientData, sl@0: Tcl_Interp *interp, sl@0: int objc, sl@0: Tcl_Obj *CONST objv[] sl@0: ){ sl@0: int cnt = 10; sl@0: if( async.writerHaltNow==0 && async.writerHaltWhenIdle==0 ){ sl@0: Tcl_AppendResult(interp, "would block forever", (char*)0); sl@0: return TCL_ERROR; sl@0: } sl@0: sl@0: while( cnt-- && !pthread_mutex_trylock(&async.writerMutex) ){ sl@0: pthread_mutex_unlock(&async.writerMutex); sl@0: sched_yield(); sl@0: } sl@0: if( cnt>=0 ){ sl@0: ASYNC_TRACE(("WAIT\n")); sl@0: pthread_mutex_lock(&async.queueMutex); sl@0: pthread_cond_broadcast(&async.queueSignal); sl@0: pthread_mutex_unlock(&async.queueMutex); sl@0: pthread_mutex_lock(&async.writerMutex); sl@0: pthread_mutex_unlock(&async.writerMutex); sl@0: }else{ sl@0: ASYNC_TRACE(("NO-WAIT\n")); sl@0: } sl@0: return TCL_OK; sl@0: } sl@0: sl@0: sl@0: #endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */ sl@0: sl@0: /* sl@0: ** This routine registers the custom TCL commands defined in this sl@0: ** module. This should be the only procedure visible from outside sl@0: ** of this module. sl@0: */ sl@0: int Sqlitetestasync_Init(Tcl_Interp *interp){ sl@0: #if SQLITE_OS_UNIX && SQLITE_THREADSAFE sl@0: Tcl_CreateObjCommand(interp,"sqlite3async_enable",testAsyncEnable,0,0); sl@0: Tcl_CreateObjCommand(interp,"sqlite3async_halt",testAsyncHalt,0,0); sl@0: Tcl_CreateObjCommand(interp,"sqlite3async_delay",testAsyncDelay,0,0); sl@0: Tcl_CreateObjCommand(interp,"sqlite3async_start",testAsyncStart,0,0); sl@0: Tcl_CreateObjCommand(interp,"sqlite3async_wait",testAsyncWait,0,0); sl@0: Tcl_LinkVar(interp, "sqlite3async_trace", sl@0: (char*)&sqlite3async_trace, TCL_LINK_INT); sl@0: #endif /* SQLITE_OS_UNIX and SQLITE_THREADSAFE */ sl@0: return TCL_OK; sl@0: }