From f892ccc9f2981e875b64f9cf2e33d1bb5e1ed23a Mon Sep 17 00:00:00 2001 From: Bron Gondwana Date: Wed, 24 Dec 2008 23:26:29 +1100 Subject: [PATCH] skiplist nested transactions rather than silently corrupting as we used to do, or aborting with an assertion failure as we do now, it is actually quite trivial to make the skiplist database just stack "virtual transactions" on top of the current one. On the plus side, you do actually get fine-grained rollback. You can rollback a sub transaction and then proceed with the original transaction cleanly. On the negative side, if you forget to shut down the original transaction you could keep the file locked forever. This is implemented with a simple stack of transactions. You need to commit or rollback the topmost transaction BEFORE doing anything with the nested transactions - but on the nice side, the nested code doesn't even need to know it's within a transaction - so you can write transactional code without having to care if your caller is already transactional. It if that makes sense. EXAMPLE IN PSEUDOCODE (as in, I haven't checked that I'm using the APIs sanely of filled in all the details!) int foo (struct db* d) { struct txn *t; ... OURDB->set(d, key, keylen, val, vallen, &t); r = mumble(db); if (r) { OURDB->rollback(d, &t); } else { OURDB->commit(d, &t); } } int mumble (struct db* d) { struct txn *t; ... OURDB->fetch(d, key, keylen, &val, &vallen, &t); if (vallen) { char *newval = frob(val, vallen); OURDB->set(d, key, keylen, newval, strlen(newval), &t); OURDB->commit(d, t); } else { OURDB->rollback(d, t); } return r; } See how mumble() could be called without caring about foo(), and yet can enjoy the use of transactions. This is the API simplification this patch can offer! =================================================================== --- lib/cyrusdb_skiplist.c | 105 +++++++++++++++++++++++++++++++----------------- 1 files changed, 68 insertions(+), 37 deletions(-) diff --git a/lib/cyrusdb_skiplist.c b/lib/cyrusdb_skiplist.c index 88b27a0..90ead62 100644 --- a/lib/cyrusdb_skiplist.c +++ b/lib/cyrusdb_skiplist.c @@ -152,6 +152,7 @@ struct txn { to on abort */ unsigned logstart; unsigned logend; /* where to write to continue this txn */ + struct txn *next_txn; }; struct db { @@ -465,17 +466,29 @@ static int newtxn(struct db *db, struct txn **tidptr) /* is this file safe to append to? * * If it isn't, we need to run recovery. */ - if (SAFE_TO_APPEND(db)) { - int r = recovery(db, RECOVERY_FORCE | RECOVERY_CALLER_LOCKED); - if (r) return r; + if (!db->current_txn) { + if (SAFE_TO_APPEND(db)) { + int r = recovery(db, RECOVERY_FORCE | RECOVERY_CALLER_LOCKED); + if (r) return r; + } } /* create the transaction */ tid = xmalloc(sizeof(struct txn)); - tid->syncfd = -1; - tid->logstart = db->map_size; -/* assert(t->logstart != -1);*/ + if (db->current_txn) { + tid->syncfd = db->current_txn->syncfd; + /* after the current end of the active txn */ + tid->logstart = db->current_txn->logend; + } + else { + tid->syncfd = -1; + /* at the end of the file */ + tid->logstart = db->map_size; + } tid->logend = tid->logstart; + + /* stitch it into the list */ + tid->next_txn = db->current_txn; db->current_txn = tid; /* pass it back out */ @@ -723,12 +736,12 @@ static int lock_or_refresh(struct db *db, struct txn **tidptr) update_lock(db, *tidptr); } else { - /* check that the DB isn't in a transaction */ - assert(db->current_txn == NULL); - - /* grab a r/w lock */ - if ((r = write_lock(db, NULL)) < 0) { - return r; + /* grab a r/w lock unless we're already + * in a transaction */ + if (db->current_txn == NULL) { + if ((r = write_lock(db, NULL)) < 0) { + return r; + } } /* start the transaction */ @@ -744,8 +757,14 @@ static int dispose_db(struct db *db) { if (!db) return 0; + while (db->current_txn) { + syslog(LOG_ERR, "skiplist: closed while in transaction, aborting %s", + db->fname); + /* abort will set current_txn = current_txn->next for us */ + myabort(db, db->current_txn); + } if (db->lock_status) { - syslog(LOG_ERR, "skiplist: closed while still locked"); + syslog(LOG_ERR, "skiplist: closed while still locked %s", db->fname); unlock(db); } if (db->fname) { @@ -805,6 +824,7 @@ static int myopen(const char *fname, int flags, struct db **ret) db->curlevel = 0; db->is_open = 0; db->lock_status = UNLOCKED; + db->current_txn = NULL; /* grab a read lock, only reading the header */ r = read_lock(db); @@ -987,10 +1007,9 @@ int myfetch(struct db *db, if (data) *data = NULL; if (datalen) *datalen = 0; - /* Hacky workaround: - * - * If no transaction was passed, but we're in a transaction, - * then just do the read within that transaction. + /* If we're in a transaction, just do the read + * within that transaction unless we explicitly + * need a new transaction. */ if (!tidptr && db->current_txn != NULL) { tidptr = &(db->current_txn); @@ -1063,10 +1082,9 @@ int myforeach(struct db *db, assert(db != NULL); assert(prefixlen >= 0); - /* Hacky workaround: - * - * If no transaction was passed, but we're in a transaction, - * then just do the read within that transaction. + /* If we're in a transaction, just do the read + * within that transaction unless we explicitly + * need a new transaction. */ if (!tidptr && db->current_txn != NULL) { tidptr = &(db->current_txn); @@ -1441,6 +1459,15 @@ int mycommit(struct db *db, struct txn *tid) assert(myconsistent(db, tid, 1) == 0); } + /* if this is a nested transaction, just update the + * parent transaction with these changes */ + if (tid->next_txn) { + assert(tid->next_txn->logend == tid->logstart); + tid->next_txn->logend = tid->logend; + r = 0; + goto done; + } + /* verify that we did something this txn */ if (tid->logstart == tid->logend) { /* empty txn, done */ @@ -1474,10 +1501,11 @@ int mycommit(struct db *db, struct txn *tid) done: if (!r) - db->current_txn = NULL; + db->current_txn = tid->next_txn; /* consider checkpointing */ - if (!r && tid->logend > (2 * db->logstart + SKIPLIST_MINREWRITE)) { + if (!r && !db->current_txn && + tid->logend > (2 * db->logstart + SKIPLIST_MINREWRITE)) { r = mycheckpoint(db, 1); } @@ -1495,13 +1523,15 @@ int mycommit(struct db *db, struct txn *tid) db->fname); } } else { - /* release the write lock */ - if ((r = unlock(db)) < 0) { - return r; - } + if (!db->current_txn) { + /* release the write lock */ + if ((r = unlock(db)) < 0) { + return r; + } - /* must close this after releasing the lock */ - closesyncfd(db, tid); + /* must close this after releasing the lock */ + closesyncfd(db, tid); + } /* free tid */ free(tid); @@ -1596,20 +1626,21 @@ int myabort(struct db *db, struct txn *tid) } db->map_size = tid->logstart; + db->current_txn = tid->next_txn; - /* release the write lock */ - if ((r = unlock(db)) < 0) { - return r; - } + if (!db->current_txn) { + /* release the write lock */ + if ((r = unlock(db)) < 0) { + return r; + } - /* must close this after releasing the lock */ - closesyncfd(db, tid); + /* must close this after releasing the lock */ + closesyncfd(db, tid); + } /* free the tid */ free(tid); - db->current_txn = NULL; - return 0; } -- 1.5.6.5