diff --git a/lab/README.md b/lab/README.md index 7f1f97422..caf1b8282 100644 --- a/lab/README.md +++ b/lab/README.md @@ -3,6 +3,52 @@ Self-contained prototypes that validate control logic before it is wired into the engine. Not built by the library; each has its own Makefile. +## `lab/bench` — scaling probe and TPROC-* workload suite + +Benchmark drivers built against an Autoconf tree: + +```sh +cd build_unix && ../dist/configure && make # build libdb first +cd ../lab/bench && make BDB=../../build_unix # build the drivers +``` + +- **`scale_bench`** — micro multi-core scaling probe (`rrand`/`rhot`/`wrand`), + drives a shared environment from N threads and reports ops/sec plus + region-contention signals. + +- **`tproc_c` / `tproc_b` / `tproc_h`** — HammerDB-style workloads + (independently implemented; **not** the TPC benchmarks and not comparable to + TPC results): + - `tproc_c` — OLTP, the five weighted order-entry transactions. + - `tproc_b` — debit/credit, one short write transaction (the parameterized + successor to `examples/c/ex_tpcb.c`); isolates pure commit/log overhead. + - `tproc_h` — analytic: long read-only scans run concurrently with point + writers; with `-m` the scans use snapshot isolation and run unobstructed, + without it they contend with the writers. + +All three share `bdb_bench.h`, which exposes a **safety-feature toggle +framework** so the same workload can be run with full ACID guarantees or with +individual protections removed, to measure their cost: + +| flag | effect | +|------|--------| +| `-X txn` | no transactions (no `DB_INIT_TXN`) | +| `-X lock` | no lock manager (no `DB_INIT_LOCK`) | +| `-X log` | no logging/recovery (no `DB_INIT_LOG`) | +| `-d sync\|wnosync\|nosync` | commit durability (default `nosync`) | +| `-m` | MVCC / snapshot isolation (`DB_MULTIVERSION`) | +| `-C` | Concurrent Data Store (`DB_INIT_CDB`) instead of full txns | +| `-c` `-t` `-S` `-s` `-i` | cache bytes, threads, scale, seconds, init | + +Example: + +```sh +./tproc_c -i -S 4 -c 268435456 # populate 4 warehouses, 256MB cache +./tproc_c -t 16 -s 30 -c 268435456 # full-ACID run, 16 threads, 30s +./tproc_c -t 16 -s 30 -X txn -X lock -X log # same workload, no safety features +./tproc_h -i -S 1 -m && ./tproc_h -t 8 -w 4 -m -s 30 # MVCC analytic vs writers +``` + ## `lab/lsm` — unified adaptive LSM controller Prototype of the two-axis adaptive controller from diff --git a/lab/bench/Makefile b/lab/bench/Makefile index 38bb3ed21..8bb383db6 100644 --- a/lab/bench/Makefile +++ b/lab/bench/Makefile @@ -1,6 +1,29 @@ -# Build against an Autoconf build tree: make BDB=../../build_unix +# Build the benchmark drivers against an Autoconf build tree: +# make BDB=../../build_unix +# +# The TPROC-* drivers are HammerDB-style workloads (not the TPC benchmarks); +# each shares bdb_bench.h for the safety-feature toggle framework. BDB ?= ../../build_unix CFLAGS ?= -O2 -pthread +INCLUDES = -I$(BDB) +LIBS = -L$(BDB)/.libs -ldb-5.3 + +BENCHES = scale_bench tproc_c tproc_b tproc_h + +all: $(BENCHES) + scale_bench: scale_bench.c - $(CC) $(CFLAGS) scale_bench.c -I$(BDB) -L$(BDB)/.libs -ldb-5.3 -o $@ -clean:; rm -f scale_bench + $(CC) $(CFLAGS) $(INCLUDES) scale_bench.c $(LIBS) -o $@ + +tproc_c: tproc_c.c bdb_bench.h + $(CC) $(CFLAGS) $(INCLUDES) tproc_c.c $(LIBS) -o $@ + +tproc_b: tproc_b.c bdb_bench.h + $(CC) $(CFLAGS) $(INCLUDES) tproc_b.c $(LIBS) -o $@ + +tproc_h: tproc_h.c bdb_bench.h + $(CC) $(CFLAGS) $(INCLUDES) tproc_h.c $(LIBS) -o $@ + +clean:; rm -f $(BENCHES) + +.PHONY: all clean diff --git a/lab/bench/bdb_bench.h b/lab/bench/bdb_bench.h new file mode 100644 index 000000000..55815625d --- /dev/null +++ b/lab/bench/bdb_bench.h @@ -0,0 +1,290 @@ +/*- + * See the file LICENSE for redistribution information. + * + * bdb_bench.h -- shared harness for the TPROC-* benchmark suite. + * + * These are HammerDB-style ("TPROC-C/H/B") workloads, independently + * implemented against the libdb C API; they are not the TPC benchmarks and + * produce no TPC-comparable metrics. + * + * Every benchmark links this header for a common set of safety-feature + * toggles, so each workload can be run with full ACID guarantees or with + * individual protections disabled, to isolate their cost: + * + * -X txn run WITHOUT transactions (no DB_INIT_TXN; ops are autocommit + * only where the AM requires it, otherwise raw) + * -X lock run WITHOUT the lock manager (no DB_INIT_LOCK; single-thread + * or app-serialized only) + * -X log run WITHOUT logging/recovery (no DB_INIT_LOG) + * -d sync fully durable commits (default is nosync) + * -d wnosync write-nosync (flush log to OS, not disk) + * -d nosync no log sync at commit (default) + * -m use MVCC / snapshot isolation (DB_MULTIVERSION + snapshot txns) + * -C use Concurrent Data Store (DB_INIT_CDB) instead of full txns + * + * plus shared knobs: -c cachebytes, -t threads, -S scale, -s seconds, -i init. + */ +#ifndef BDB_BENCH_H +#define BDB_BENCH_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/* Durability levels at commit. */ +enum bb_durability { BB_NOSYNC = 0, BB_WRITE_NOSYNC, BB_SYNC }; + +/* Shared configuration parsed from argv. */ +typedef struct { + const char *home; /* environment directory */ + uint64_t cachebytes; /* mpool cache size */ + int threads; /* worker threads */ + int scale; /* warehouses (C), scale factor (H/B) */ + int seconds; /* measured run length */ + int init; /* (re)create + populate, then exit */ + unsigned seed; + + /* Safety toggles. */ + int use_txn; /* DB_INIT_TXN + per-op transactions */ + int use_lock; /* DB_INIT_LOCK */ + int use_log; /* DB_INIT_LOG */ + int use_mvcc; /* DB_MULTIVERSION + DB_TXN_SNAPSHOT readers */ + int use_cdb; /* DB_INIT_CDB (concurrent data store) */ + enum bb_durability durability; +} bb_config; + +/* Sensible defaults: full safety, nosync, all in cache, single thread. */ +static void +bb_config_defaults(bb_config *c) +{ + memset(c, 0, sizeof(*c)); + c->home = "TPROCDIR"; + c->cachebytes = (uint64_t)256 * 1024 * 1024; + c->threads = 1; + c->scale = 1; + c->seconds = 10; + c->seed = (unsigned)time(NULL); + c->use_txn = 1; + c->use_lock = 1; + c->use_log = 1; + c->use_mvcc = 0; + c->use_cdb = 0; + c->durability = BB_NOSYNC; +} + +/* + * bb_getopt -- parse the shared options. A benchmark may pre-scan argv for + * its own flags; unknown flags are reported by the caller's usage(). Returns + * 0 on success, -1 on a parse error (after printing a message). + * + * Recognized: -h home -c cache -t threads -S scale -s secs -i -m -C + * -d sync|wnosync|nosync -X txn|lock|log -R seed + */ +static int +bb_getopt(int argc, char **argv, bb_config *c) +{ + int ch; + extern char *optarg; + + while ((ch = getopt(argc, argv, "h:c:t:S:s:imCd:X:R:")) != EOF) + switch (ch) { + case 'h': c->home = optarg; break; + case 'c': c->cachebytes = strtoull(optarg, NULL, 10); break; + case 't': c->threads = atoi(optarg); break; + case 'S': c->scale = atoi(optarg); break; + case 's': c->seconds = atoi(optarg); break; + case 'i': c->init = 1; break; + case 'm': c->use_mvcc = 1; break; + case 'C': c->use_cdb = 1; break; + case 'R': c->seed = (unsigned)strtoul(optarg, NULL, 10); break; + case 'd': + if (strcmp(optarg, "sync") == 0) c->durability = BB_SYNC; + else if (strcmp(optarg, "wnosync") == 0) + c->durability = BB_WRITE_NOSYNC; + else if (strcmp(optarg, "nosync") == 0) + c->durability = BB_NOSYNC; + else { fprintf(stderr, "bad -d %s\n", optarg); return -1; } + break; + case 'X': + if (strcmp(optarg, "txn") == 0) c->use_txn = 0; + else if (strcmp(optarg, "lock") == 0) c->use_lock = 0; + else if (strcmp(optarg, "log") == 0) c->use_log = 0; + else { fprintf(stderr, "bad -X %s\n", optarg); return -1; } + break; + default: return -1; + } + + /* + * Consistency: MVCC and CDB both require transactions; CDB and full + * txns are mutually exclusive. Resolve quietly toward the safe option + * so a partial flag set still runs. + */ + if ((c->use_mvcc || c->use_cdb) && !c->use_txn && !c->use_cdb) + c->use_txn = 1; + if (c->use_cdb) + c->use_txn = 0; /* CDB is its own concurrency model */ + if (c->use_mvcc) + c->use_log = c->use_txn = 1; + return 0; +} + +/* Build the env open flags from the toggles. */ +static u_int32_t +bb_env_flags(const bb_config *c) +{ + u_int32_t f = DB_CREATE | DB_INIT_MPOOL | DB_THREAD; + + if (c->use_cdb) + return f | DB_INIT_CDB; + if (c->use_lock) f |= DB_INIT_LOCK; + if (c->use_log) f |= DB_INIT_LOG; + if (c->use_txn) f |= DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG; + return f; +} + +/* Open (and configure) the environment per the safety toggles. */ +static int +bb_env_open(bb_config *c, DB_ENV **envp) +{ + DB_ENV *env; + int ret; + + if ((ret = db_env_create(&env, 0)) != 0) { + fprintf(stderr, "db_env_create: %s\n", db_strerror(ret)); + return ret; + } + env->set_errfile(env, stderr); + env->set_errpfx(env, "tproc"); + (void)env->set_cachesize(env, (u_int32_t)(c->cachebytes >> 32), + (u_int32_t)(c->cachebytes & 0xffffffff), 1); + + /* + * With locking on, enable automatic deadlock detection so a genuine + * deadlock between concurrent transactions is broken (one is chosen as + * victim and returns DB_LOCK_DEADLOCK) instead of blocking forever -- + * the workloads here intentionally contend on shared rows. + */ + if (c->use_lock || c->use_txn) + (void)env->set_lk_detect(env, DB_LOCK_DEFAULT); + + if (c->use_txn) { + if (c->durability == BB_NOSYNC) + (void)env->set_flags(env, DB_TXN_NOSYNC, 1); + else if (c->durability == BB_WRITE_NOSYNC) + (void)env->set_flags(env, DB_TXN_WRITE_NOSYNC, 1); + } + + if ((ret = env->open(env, c->home, bb_env_flags(c), 0)) != 0) { + env->err(env, ret, "DB_ENV->open: %s", c->home); + (void)env->close(env, 0); + return ret; + } + *envp = env; + return 0; +} + +/* + * bb_begin / bb_commit / bb_abort -- transaction helpers that become no-ops + * when -X txn is set, so each workload's body is written once and runs in both + * modes. A snapshot (MVCC) read transaction is requested when use_mvcc and rdonly. + */ +static int +bb_begin(const bb_config *c, DB_ENV *env, DB_TXN **txnp, int rdonly) +{ + u_int32_t flags = 0; + + *txnp = NULL; + if (!c->use_txn) + return 0; + if (c->use_mvcc && rdonly) + flags |= DB_TXN_SNAPSHOT; + if (c->durability == BB_NOSYNC) + flags |= DB_TXN_NOSYNC; + return env->txn_begin(env, NULL, txnp, flags); +} + +static int +bb_commit(DB_TXN *txn) +{ + return txn == NULL ? 0 : txn->commit(txn, 0); +} + +static int +bb_abort(DB_TXN *txn) +{ + return txn == NULL ? 0 : txn->abort(txn); +} + +/* Per-DB open flags: add DB_MULTIVERSION when MVCC is requested. */ +static u_int32_t +bb_db_flags(const bb_config *c) +{ + u_int32_t f = DB_CREATE | DB_THREAD; + + if (c->use_txn) + f |= DB_AUTO_COMMIT; + if (c->use_mvcc) + f |= DB_MULTIVERSION; + return f; +} + +/* Monotonic milliseconds. */ +static double +bb_now_ms(void) +{ + struct timeval tv; + + (void)gettimeofday(&tv, NULL); + return (double)tv.tv_sec * 1000.0 + (double)tv.tv_usec / 1000.0; +} + +/* Cheap thread-local PRNG (xorshift) so threads don't share rand() state. */ +typedef struct { uint64_t s; } bb_rng; + +static void +bb_rng_seed(bb_rng *r, uint64_t seed) +{ + r->s = seed ? seed : 0x9e3779b97f4a7c15ULL; +} + +static uint64_t +bb_rng_next(bb_rng *r) +{ + uint64_t x = r->s; + + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + return (r->s = x); +} + +/* Uniform integer in [lo, hi]. */ +static uint32_t +bb_rand_between(bb_rng *r, uint32_t lo, uint32_t hi) +{ + return lo + (uint32_t)(bb_rng_next(r) % (uint64_t)(hi - lo + 1)); +} + +/* One-line summary of the active safety configuration. */ +static void +bb_print_config(const bb_config *c, const char *name) +{ + printf("# %s: threads=%d scale=%d secs=%d cache=%lluMB " + "txn=%d lock=%d log=%d mvcc=%d cdb=%d durability=%s\n", + name, c->threads, c->scale, c->seconds, + (unsigned long long)(c->cachebytes >> 20), + c->use_txn, c->use_lock, c->use_log, c->use_mvcc, c->use_cdb, + c->durability == BB_SYNC ? "sync" : + c->durability == BB_WRITE_NOSYNC ? "wnosync" : "nosync"); +} + +#endif /* BDB_BENCH_H */ diff --git a/lab/bench/tproc_b.c b/lab/bench/tproc_b.c new file mode 100644 index 000000000..845aeff19 --- /dev/null +++ b/lab/bench/tproc_b.c @@ -0,0 +1,259 @@ +/*- + * See the file LICENSE for redistribution information. + * + * tproc_b -- a HammerDB-style "TPROC-B" debit/credit workload for libdb. + * + * Independently implemented; not the TPC-B benchmark and not comparable to + * TPC results. It is the classic single-transaction money-transfer workload + * (the modern, parameterized successor to examples/c/ex_tpcb.c): each + * transaction picks a random account, teller, and branch, applies a delta to + * all three balances, and appends a history row -- one short write + * transaction touching four records. + * + * It exists alongside the richer TPROC-C so the suite has a write-only, + * minimal-logic transaction to isolate pure transaction/commit/log overhead, + * and (via the shared toggle framework) to measure that overhead with and + * without each BDB safety feature. + * + * Schema: + * account key=a_id -> balance (SCALE * ACCOUNTS_PER rows) + * teller key=t_id -> balance (SCALE * TELLERS_PER rows) + * branch key=b_id -> balance (SCALE rows) + * history recno -> {a,t,b,delta} (append-only) + */ +#include "bdb_bench.h" + +#define ACCOUNTS_PER 100000 +#define TELLERS_PER 100 +#define BRANCHES_PER 1 + +static DB_ENV *g_env; +static bb_config g_cfg; +static DB *g_acct, *g_tell, *g_branch, *g_hist; +static int g_naccts, g_ntellers, g_nbranches; +static volatile int g_stop; + +typedef struct { int64_t balance; uint8_t pad[56]; } bal_rec; +typedef struct { uint32_t a, t, b; int32_t delta; } hist_rec; + +typedef struct { + int tid; + bb_rng rng; + uint64_t ok; + uint64_t retry; +} worker; + +static int +open_db(DB **dbp, const char *name, DBTYPE type, u_int32_t extra) +{ + DB *db; + int ret; + + if ((ret = db_create(&db, g_env, 0)) != 0) return ret; + if ((ret = db->open(db, NULL, name, NULL, type, + bb_db_flags(&g_cfg) | extra, 0)) != 0) { + g_env->err(g_env, ret, "open %s", name); + return ret; + } + *dbp = db; + return 0; +} + +static int +put_u32(DB *db, DB_TXN *txn, uint32_t key, void *rec, size_t sz) +{ + DBT k, d; + + memset(&k, 0, sizeof(k)); k.data = &key; k.size = sizeof(key); + memset(&d, 0, sizeof(d)); d.data = rec; d.size = (u_int32_t)sz; + return db->put(db, txn, &k, &d, 0); +} + +static int +adjust(DB *db, DB_TXN *txn, uint32_t key, int32_t delta) +{ + DBT k, d; + bal_rec b; + int ret; + + memset(&k, 0, sizeof(k)); k.data = &key; k.size = sizeof(key); + memset(&d, 0, sizeof(d)); + d.data = &b; d.ulen = sizeof(b); d.flags = DB_DBT_USERMEM; + if ((ret = db->get(db, txn, &k, &d, 0)) != 0) + return ret; + b.balance += delta; + memset(&d, 0, sizeof(d)); d.data = &b; d.size = sizeof(b); + return db->put(db, txn, &k, &d, 0); +} + +static int +populate(void) +{ + DB_TXN *txn = NULL; + bal_rec b; + uint32_t i; + int ret, n; + + if ((ret = open_db(&g_acct, "account.db", DB_BTREE, 0)) != 0) return ret; + if ((ret = open_db(&g_tell, "teller.db", DB_BTREE, 0)) != 0) return ret; + if ((ret = open_db(&g_branch, "branch.db", DB_BTREE, 0)) != 0) return ret; + if ((ret = open_db(&g_hist, "history.db", DB_RECNO, 0)) != 0) return ret; + + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + memset(&b, 0, sizeof(b)); b.balance = 0; + n = 0; + for (i = 0; i < (uint32_t)g_naccts; i++) { + if ((ret = put_u32(g_acct, txn, i, &b, sizeof(b))) != 0) goto err; + if (++n % 20000 == 0 && g_cfg.use_txn) { + if ((ret = bb_commit(txn)) != 0) return ret; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + } + } + for (i = 0; i < (uint32_t)g_ntellers; i++) + if ((ret = put_u32(g_tell, txn, i, &b, sizeof(b))) != 0) goto err; + for (i = 0; i < (uint32_t)g_nbranches; i++) + if ((ret = put_u32(g_branch, txn, i, &b, sizeof(b))) != 0) goto err; + return bb_commit(txn); +err: + (void)bb_abort(txn); + return ret; +} + +static int +do_txn(worker *w) +{ + DB_TXN *txn; + DBT k, d; + hist_rec h; + db_recno_t rno; + uint32_t aid, tid, bid; + int32_t delta; + int ret; + + aid = bb_rand_between(&w->rng, 0, (uint32_t)g_naccts - 1); + tid = bb_rand_between(&w->rng, 0, (uint32_t)g_ntellers - 1); + bid = bb_rand_between(&w->rng, 0, (uint32_t)g_nbranches - 1); + delta = (int32_t)bb_rand_between(&w->rng, 1, 1000) - 500; +again: + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + + ret = adjust(g_acct, txn, aid, delta); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry++; goto again; } + if (ret != 0) goto fail; + ret = adjust(g_tell, txn, tid, delta); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry++; goto again; } + if (ret != 0) goto fail; + ret = adjust(g_branch, txn, bid, delta); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry++; goto again; } + if (ret != 0) goto fail; + + h.a = aid; h.t = tid; h.b = bid; h.delta = delta; + memset(&k, 0, sizeof(k)); k.data = &rno; k.ulen = sizeof(rno); k.flags = DB_DBT_USERMEM; + memset(&d, 0, sizeof(d)); d.data = &h; d.size = sizeof(h); + ret = g_hist->put(g_hist, txn, &k, &d, DB_APPEND); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry++; goto again; } + if (ret != 0) goto fail; + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ok++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static void * +worker_main(void *arg) +{ + worker *w = arg; + int ret; + + while (!g_stop) { + if ((ret = do_txn(w)) != 0 && ret != DB_LOCK_DEADLOCK) { + fprintf(stderr, "txn error: %s\n", db_strerror(ret)); + break; + } + } + return NULL; +} + +static void +usage(const char *p) +{ + fprintf(stderr, + "usage: %s [-i] [-h home] [-S scale] [-t threads] [-s secs]\n" + " [-c cachebytes] [-d sync|wnosync|nosync] [-m] [-C]\n" + " [-X txn|lock|log] [-R seed]\n", p); +} + +int +main(int argc, char **argv) +{ + pthread_t *tids; + worker *workers; + uint64_t okall, retryall; + double t0, elapsed; + int t, ret; + + bb_config_defaults(&g_cfg); + if (bb_getopt(argc, argv, &g_cfg) != 0) { usage(argv[0]); return 1; } + if (g_cfg.scale < 1) g_cfg.scale = 1; + g_naccts = g_cfg.scale * ACCOUNTS_PER; + g_ntellers = g_cfg.scale * TELLERS_PER; + g_nbranches = g_cfg.scale * BRANCHES_PER; + + if ((ret = bb_env_open(&g_cfg, &g_env)) != 0) return 1; + + if (g_cfg.init) { + bb_print_config(&g_cfg, "tproc-b populate"); + t0 = bb_now_ms(); + if ((ret = populate()) != 0) { + fprintf(stderr, "populate: %s\n", db_strerror(ret)); + return 1; + } + printf("# populated %d accounts in %.1f s\n", + g_naccts, (bb_now_ms() - t0) / 1000.0); + (void)g_env->close(g_env, 0); + return 0; + } + + if ((ret = open_db(&g_acct, "account.db", DB_BTREE, 0)) != 0 || + (ret = open_db(&g_tell, "teller.db", DB_BTREE, 0)) != 0 || + (ret = open_db(&g_branch, "branch.db", DB_BTREE, 0)) != 0 || + (ret = open_db(&g_hist, "history.db", DB_RECNO, 0)) != 0) { + fprintf(stderr, "open (did you -i first?): %s\n", db_strerror(ret)); + return 1; + } + + bb_print_config(&g_cfg, "tproc-b"); + + tids = calloc((size_t)g_cfg.threads, sizeof(*tids)); + workers = calloc((size_t)g_cfg.threads, sizeof(*workers)); + for (t = 0; t < g_cfg.threads; t++) { + workers[t].tid = t; + bb_rng_seed(&workers[t].rng, g_cfg.seed + (uint64_t)t * 0x100); + } + + g_stop = 0; + t0 = bb_now_ms(); + for (t = 0; t < g_cfg.threads; t++) + pthread_create(&tids[t], NULL, worker_main, &workers[t]); + usleep((useconds_t)g_cfg.seconds * 1000000); + g_stop = 1; + for (t = 0; t < g_cfg.threads; t++) + pthread_join(tids[t], NULL); + elapsed = (bb_now_ms() - t0) / 1000.0; + + okall = retryall = 0; + for (t = 0; t < g_cfg.threads; t++) { + okall += workers[t].ok; + retryall += workers[t].retry; + } + printf("tproc-b %2d threads %.0f txn/s committed=%llu retries=%llu (%.1fs)\n", + g_cfg.threads, (double)okall / elapsed, + (unsigned long long)okall, (unsigned long long)retryall, elapsed); + + free(tids); free(workers); + (void)g_env->close(g_env, 0); + return 0; +} diff --git a/lab/bench/tproc_c.c b/lab/bench/tproc_c.c new file mode 100644 index 000000000..a25892214 --- /dev/null +++ b/lab/bench/tproc_c.c @@ -0,0 +1,569 @@ +/*- + * See the file LICENSE for redistribution information. + * + * tproc_c -- a HammerDB-style "TPROC-C" OLTP workload for libdb. + * + * Independently implemented; not the TPC-C benchmark and not comparable to + * TPC results. It models the same warehouse/order business with the five + * classic weighted transactions: + * + * New-Order (45%) read-write: pick items, decrement stock, create order + * Payment (43%) read-write: update warehouse/district/customer balances + * Order-Status (4%) read-only: look up a customer's most recent order + * Delivery (4%) read-write: mark oldest new-orders delivered + * Stock-Level (4%) read-only: count district stock below a threshold + * + * The read-only transactions use snapshot isolation when -m (MVCC) is set, so + * they never block the read-write transactions. All five run with or without + * transactions, locking, and logging via the bdb_bench.h toggle framework. + * + * Schema (one DB per table, integer keys, fixed-size records): + * warehouse key=w_id -> w_ytd + * district key=(w_id,d_id) -> d_ytd, d_next_o_id + * customer key=(w_id,d_id,c_id) -> c_balance, c_ytd, c_last_o + * stock key=(w_id,i_id) -> s_quantity + * orders key=(w_id,d_id,o_id) -> o_c_id, o_carrier, o_ol_cnt + * neworder key=(w_id,d_id,o_id) -> (presence = undelivered) + * item key=i_id -> i_price (read-only, fixed) + * + * Scale: -S warehouses. Per warehouse: DISTRICTS districts, CUST_PER_DIST + * customers/district, ITEMS items (shared), STOCK_PER_WH stock rows. + */ +#include "bdb_bench.h" + +#define DISTRICTS 10 +#define CUST_PER_DIST 300 /* compact (spec is 3000) to keep load fast */ +#define ITEMS 10000 +#define ORDERS_PER_DIST CUST_PER_DIST +#define STOCK_LOW_THRESHOLD 10 + +/* Composite integer keys, big-endian so cursor scans are ordered. */ +typedef struct { uint32_t a, b, c; } key3; + +static DB_ENV *g_env; +static bb_config g_cfg; +static DB *g_wh, *g_dist, *g_cust, *g_stock, *g_ord, *g_neword, *g_item; + +static volatile int g_stop; /* set when the timer expires */ + +/* Per-transaction-type counters, summed across threads at the end. */ +enum { T_NEWORDER, T_PAYMENT, T_ORDERSTATUS, T_DELIVERY, T_STOCKLEVEL, T_N }; +static const char *g_tnames[T_N] = { + "new-order", "payment", "order-status", "delivery", "stock-level" +}; + +typedef struct { + int tid; + bb_rng rng; + uint64_t ok[T_N]; + uint64_t retry[T_N]; /* deadlock/conflict retries */ + uint64_t stock_low_seen; /* sink so stock-level scan isn't elided */ +} worker; + +static void +mkkey(DBT *dbt, key3 *k, uint32_t a, uint32_t b, uint32_t c) +{ + k->a = a; k->b = b; k->c = c; + memset(dbt, 0, sizeof(*dbt)); + dbt->data = k; + dbt->size = sizeof(*k); +} + +/* ---- record bodies (fixed size, padded to one cache line) ---- */ +typedef struct { int64_t ytd; uint8_t pad[56]; } wh_rec; +typedef struct { int64_t ytd; uint32_t next_o_id; uint8_t pad[52]; } dist_rec; +typedef struct { int64_t balance, ytd; uint32_t last_o; uint8_t pad[44]; } cust_rec; +typedef struct { int32_t quantity; uint8_t pad[60]; } stock_rec; +typedef struct { uint32_t c_id, carrier, ol_cnt; uint8_t pad[52]; } ord_rec; +typedef struct { uint32_t price; uint8_t pad[60]; } item_rec; + +/* ---------------------------------------------------------------- */ +/* Population */ +/* ---------------------------------------------------------------- */ +static int +open_db(DB **dbp, const char *name) +{ + DB *db; + int ret; + + if ((ret = db_create(&db, g_env, 0)) != 0) return ret; + if ((ret = db->open(db, NULL, name, NULL, DB_BTREE, + bb_db_flags(&g_cfg), 0)) != 0) { + g_env->err(g_env, ret, "open %s", name); + return ret; + } + *dbp = db; + return 0; +} + +static int +populate(void) +{ + DBT k, d; + key3 kk; + DB_TXN *txn = NULL; + wh_rec w; dist_rec di; cust_rec cu; stock_rec st; ord_rec o; item_rec it; + int wid, did, cid, iid, oid, ret, nput; + + if ((ret = open_db(&g_wh, "warehouse.db")) != 0) return ret; + if ((ret = open_db(&g_dist, "district.db")) != 0) return ret; + if ((ret = open_db(&g_cust, "customer.db")) != 0) return ret; + if ((ret = open_db(&g_stock, "stock.db")) != 0) return ret; + if ((ret = open_db(&g_ord, "orders.db")) != 0) return ret; + if ((ret = open_db(&g_neword, "neworder.db")) != 0) return ret; + if ((ret = open_db(&g_item, "item.db")) != 0) return ret; + + /* Items are global and read-only at run time. */ + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + memset(&it, 0, sizeof(it)); + nput = 0; + for (iid = 0; iid < ITEMS; iid++) { + it.price = 100 + (uint32_t)(iid % 9900); + mkkey(&k, &kk, (uint32_t)iid, 0, 0); + memset(&d, 0, sizeof(d)); d.data = ⁢ d.size = sizeof(it); + if ((ret = g_item->put(g_item, txn, &k, &d, 0)) != 0) goto err; + if (++nput % 10000 == 0 && g_cfg.use_txn) { + if ((ret = bb_commit(txn)) != 0) return ret; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + } + } + + for (wid = 0; wid < g_cfg.scale; wid++) { + memset(&w, 0, sizeof(w)); w.ytd = 0; + mkkey(&k, &kk, (uint32_t)wid, 0, 0); + memset(&d, 0, sizeof(d)); d.data = &w; d.size = sizeof(w); + if ((ret = g_wh->put(g_wh, txn, &k, &d, 0)) != 0) goto err; + + for (iid = 0; iid < ITEMS; iid++) { + memset(&st, 0, sizeof(st)); + st.quantity = 10 + (int32_t)(iid % 90); + mkkey(&k, &kk, (uint32_t)wid, (uint32_t)iid, 0); + memset(&d, 0, sizeof(d)); d.data = &st; d.size = sizeof(st); + if ((ret = g_stock->put(g_stock, txn, &k, &d, 0)) != 0) goto err; + if (++nput % 10000 == 0 && g_cfg.use_txn) { + if ((ret = bb_commit(txn)) != 0) return ret; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + } + } + + for (did = 0; did < DISTRICTS; did++) { + memset(&di, 0, sizeof(di)); + di.next_o_id = ORDERS_PER_DIST; + mkkey(&k, &kk, (uint32_t)wid, (uint32_t)did, 0); + memset(&d, 0, sizeof(d)); d.data = &di; d.size = sizeof(di); + if ((ret = g_dist->put(g_dist, txn, &k, &d, 0)) != 0) goto err; + + for (cid = 0; cid < CUST_PER_DIST; cid++) { + memset(&cu, 0, sizeof(cu)); + cu.balance = -1000; cu.ytd = 1000; cu.last_o = (uint32_t)cid; + mkkey(&k, &kk, (uint32_t)wid, + (uint32_t)(did * CUST_PER_DIST + cid), 0); + memset(&d, 0, sizeof(d)); d.data = &cu; d.size = sizeof(cu); + if ((ret = g_cust->put(g_cust, txn, &k, &d, 0)) != 0) goto err; + } + for (oid = 0; oid < ORDERS_PER_DIST; oid++) { + memset(&o, 0, sizeof(o)); + o.c_id = (uint32_t)(oid % CUST_PER_DIST); + o.carrier = 0; o.ol_cnt = 10; + mkkey(&k, &kk, (uint32_t)wid, + (uint32_t)did, (uint32_t)oid); + memset(&d, 0, sizeof(d)); d.data = &o; d.size = sizeof(o); + if ((ret = g_ord->put(g_ord, txn, &k, &d, 0)) != 0) goto err; + /* Half the orders are undelivered (new-order). */ + if (oid >= ORDERS_PER_DIST / 2) { + memset(&d, 0, sizeof(d)); d.data = ""; d.size = 0; + if ((ret = g_neword->put(g_neword, txn, + &k, &d, 0)) != 0) goto err; + } + if (++nput % 10000 == 0 && g_cfg.use_txn) { + if ((ret = bb_commit(txn)) != 0) return ret; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + } + } + } + } + if ((ret = bb_commit(txn)) != 0) return ret; + return 0; +err: + (void)bb_abort(txn); + return ret; +} + +/* ---------------------------------------------------------------- */ +/* Transactions */ +/* ---------------------------------------------------------------- */ +#define RETRY(expr) do { \ + ret = (expr); \ + if (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) { \ + (void)bb_abort(txn); txn = NULL; \ + w->retry[op]++; \ + goto again; \ + } \ +} while (0) + +static int +get_rec(DB *db, DB_TXN *txn, uint32_t a, uint32_t b, uint32_t c, + void *out, size_t outsz) +{ + DBT k, d; + key3 kk; + + mkkey(&k, &kk, a, b, c); + memset(&d, 0, sizeof(d)); + d.data = out; d.ulen = (u_int32_t)outsz; d.flags = DB_DBT_USERMEM; + return db->get(db, txn, &k, &d, 0); +} + +static int +put_rec(DB *db, DB_TXN *txn, uint32_t a, uint32_t b, uint32_t c, + void *in, size_t insz) +{ + DBT k, d; + key3 kk; + + mkkey(&k, &kk, a, b, c); + memset(&d, 0, sizeof(d)); d.data = in; d.size = (u_int32_t)insz; + return db->put(db, txn, &k, &d, 0); +} + +static int +do_new_order(worker *w) +{ + DB_TXN *txn; + dist_rec di; + stock_rec st; + ord_rec o; + uint32_t wid, did, oid, i, nitems; + int op = T_NEWORDER, ret; + + wid = bb_rand_between(&w->rng, 0, (uint32_t)g_cfg.scale - 1); + did = bb_rand_between(&w->rng, 0, DISTRICTS - 1); + nitems = bb_rand_between(&w->rng, 5, 15); +again: + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + + /* Allocate the next order id from the district. */ + RETRY(get_rec(g_dist, txn, wid, did, 0, &di, sizeof(di))); + if (ret != 0) goto fail; + oid = di.next_o_id++; + RETRY(put_rec(g_dist, txn, wid, did, 0, &di, sizeof(di))); + if (ret != 0) goto fail; + + /* Decrement stock for each order line. */ + for (i = 0; i < nitems; i++) { + uint32_t iid = bb_rand_between(&w->rng, 0, ITEMS - 1); + RETRY(get_rec(g_stock, txn, wid, iid, 0, &st, sizeof(st))); + if (ret != 0) goto fail; + st.quantity -= 1; + if (st.quantity < 10) st.quantity += 91; + RETRY(put_rec(g_stock, txn, wid, iid, 0, &st, sizeof(st))); + if (ret != 0) goto fail; + } + + /* Insert the order + new-order marker. */ + memset(&o, 0, sizeof(o)); + o.c_id = bb_rand_between(&w->rng, 0, CUST_PER_DIST - 1); + o.ol_cnt = nitems; + RETRY(put_rec(g_ord, txn, wid, did, oid, &o, sizeof(o))); + if (ret != 0) goto fail; + RETRY(put_rec(g_neword, txn, wid, did, oid, "", 0)); + if (ret != 0) goto fail; + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ok[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static int +do_payment(worker *w) +{ + DB_TXN *txn; + wh_rec wh; + dist_rec di; + cust_rec cu; + uint32_t wid, did, cid; + int64_t amount; + int op = T_PAYMENT, ret; + + wid = bb_rand_between(&w->rng, 0, (uint32_t)g_cfg.scale - 1); + did = bb_rand_between(&w->rng, 0, DISTRICTS - 1); + cid = bb_rand_between(&w->rng, 0, DISTRICTS * CUST_PER_DIST - 1); + amount = (int64_t)bb_rand_between(&w->rng, 1, 5000); +again: + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + + RETRY(get_rec(g_wh, txn, wid, 0, 0, &wh, sizeof(wh))); + if (ret != 0) goto fail; + wh.ytd += amount; + RETRY(put_rec(g_wh, txn, wid, 0, 0, &wh, sizeof(wh))); + if (ret != 0) goto fail; + + RETRY(get_rec(g_dist, txn, wid, did, 0, &di, sizeof(di))); + if (ret != 0) goto fail; + di.ytd += amount; + RETRY(put_rec(g_dist, txn, wid, did, 0, &di, sizeof(di))); + if (ret != 0) goto fail; + + RETRY(get_rec(g_cust, txn, wid, cid, 0, &cu, sizeof(cu))); + if (ret != 0) goto fail; + cu.balance -= amount; + cu.ytd += amount; + RETRY(put_rec(g_cust, txn, wid, cid, 0, &cu, sizeof(cu))); + if (ret != 0) goto fail; + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ok[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static int +do_order_status(worker *w) +{ + DB_TXN *txn; + cust_rec cu; + ord_rec o; + uint32_t wid, did, cid; + int op = T_ORDERSTATUS, ret; + + wid = bb_rand_between(&w->rng, 0, (uint32_t)g_cfg.scale - 1); + did = bb_rand_between(&w->rng, 0, DISTRICTS - 1); + cid = bb_rand_between(&w->rng, 0, DISTRICTS * CUST_PER_DIST - 1); +again: + if ((ret = bb_begin(&g_cfg, g_env, &txn, 1)) != 0) return ret; /* rdonly */ + + RETRY(get_rec(g_cust, txn, wid, cid, 0, &cu, sizeof(cu))); + if (ret != 0) goto fail; + /* Look up the customer's most recent order (approx via last_o). */ + ret = get_rec(g_ord, txn, wid, did, cu.last_o, &o, sizeof(o)); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); txn=NULL; w->retry[op]++; goto again; } + if (ret != 0 && ret != DB_NOTFOUND) goto fail; + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ok[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static int +do_delivery(worker *w) +{ + DB_TXN *txn; + DBC *dbc; + DBT k, d; + key3 kk, *fk; + uint32_t wid, did; + int op = T_DELIVERY, ret; + + wid = bb_rand_between(&w->rng, 0, (uint32_t)g_cfg.scale - 1); + did = bb_rand_between(&w->rng, 0, DISTRICTS - 1); +again: + dbc = NULL; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + + /* Find the oldest new-order in this (w,d) and delete it. */ + RETRY(g_neword->cursor(g_neword, txn, &dbc, 0)); + if (ret != 0) goto fail; + mkkey(&k, &kk, wid, did, 0); + memset(&d, 0, sizeof(d)); + ret = dbc->get(dbc, &k, &d, DB_SET_RANGE); + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); txn=NULL; w->retry[op]++; goto again; } + if (ret == 0) { + fk = (key3 *)k.data; + if (fk->a == wid && fk->b == did) { + ord_rec o; + ret = dbc->del(dbc, 0); + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); txn=NULL; w->retry[op]++; goto again; } + if (ret != 0) { (void)dbc->close(dbc); goto fail; } + /* Mark the order delivered (set a carrier id). */ + (void)dbc->close(dbc); dbc = NULL; + if (get_rec(g_ord, txn, fk->a, fk->b, fk->c, + &o, sizeof(o)) == 0) { + o.carrier = 1; + RETRY(put_rec(g_ord, txn, fk->a, fk->b, fk->c, + &o, sizeof(o))); + if (ret != 0) goto fail; + } + } + } else if (ret != DB_NOTFOUND) { + (void)dbc->close(dbc); + goto fail; + } + if (dbc != NULL) (void)dbc->close(dbc); + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ok[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static int +do_stock_level(worker *w) +{ + DB_TXN *txn; + DBC *dbc; + DBT k, d; + key3 kk, *fk; + stock_rec st; + uint32_t wid; + int op = T_STOCKLEVEL, ret, low; + + wid = bb_rand_between(&w->rng, 0, (uint32_t)g_cfg.scale - 1); +again: + dbc = NULL; low = 0; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 1)) != 0) return ret; /* rdonly */ + + RETRY(g_stock->cursor(g_stock, txn, &dbc, 0)); + if (ret != 0) goto fail; + mkkey(&k, &kk, wid, 0, 0); + memset(&d, 0, sizeof(d)); d.data = &st; d.ulen = sizeof(st); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_SET_RANGE); + while (ret == 0) { + fk = (key3 *)k.data; + if (fk->a != wid) break; /* left this warehouse */ + if (((stock_rec *)d.data)->quantity < STOCK_LOW_THRESHOLD) low++; + memset(&d, 0, sizeof(d)); d.data = &st; d.ulen = sizeof(st); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_NEXT); + } + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); txn=NULL; w->retry[op]++; goto again; } + (void)dbc->close(dbc); + if (ret != 0 && ret != DB_NOTFOUND) goto fail; + + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->stock_low_seen += (uint64_t)low; + w->ok[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +/* Weighted transaction mix (out of 100). */ +static int +run_one(worker *w) +{ + uint32_t r = bb_rand_between(&w->rng, 0, 99); + + if (r < 45) return do_new_order(w); + if (r < 88) return do_payment(w); + if (r < 92) return do_order_status(w); + if (r < 96) return do_delivery(w); + return do_stock_level(w); +} + +static void * +worker_main(void *arg) +{ + worker *w = arg; + int ret; + + while (!g_stop) { + if ((ret = run_one(w)) != 0 && ret != DB_LOCK_DEADLOCK) { + fprintf(stderr, "txn error: %s\n", db_strerror(ret)); + break; + } + } + return NULL; +} + +static void +usage(const char *p) +{ + fprintf(stderr, + "usage: %s [-i] [-h home] [-S warehouses] [-t threads] [-s secs]\n" + " [-c cachebytes] [-d sync|wnosync|nosync] [-m] [-C]\n" + " [-X txn|lock|log] [-R seed]\n", p); +} + +int +main(int argc, char **argv) +{ + pthread_t *tids; + worker *workers; + uint64_t total[T_N], rtot[T_N], grand; + double t0, elapsed; + int i, t, ret; + + bb_config_defaults(&g_cfg); + if (bb_getopt(argc, argv, &g_cfg) != 0) { usage(argv[0]); return 1; } + if (g_cfg.scale < 1) g_cfg.scale = 1; + + if ((ret = bb_env_open(&g_cfg, &g_env)) != 0) return 1; + + if (g_cfg.init) { + bb_print_config(&g_cfg, "tproc-c populate"); + t0 = bb_now_ms(); + if ((ret = populate()) != 0) { + fprintf(stderr, "populate: %s\n", db_strerror(ret)); + return 1; + } + printf("# populated scale=%d in %.1f s\n", + g_cfg.scale, (bb_now_ms() - t0) / 1000.0); + (void)g_env->close(g_env, 0); + return 0; + } + + /* Open the (already populated) tables. */ + if ((ret = open_db(&g_wh, "warehouse.db")) != 0 || + (ret = open_db(&g_dist, "district.db")) != 0 || + (ret = open_db(&g_cust, "customer.db")) != 0 || + (ret = open_db(&g_stock, "stock.db")) != 0 || + (ret = open_db(&g_ord, "orders.db")) != 0 || + (ret = open_db(&g_neword, "neworder.db")) != 0 || + (ret = open_db(&g_item, "item.db")) != 0) { + fprintf(stderr, "open (did you -i first?): %s\n", db_strerror(ret)); + return 1; + } + + bb_print_config(&g_cfg, "tproc-c"); + + tids = calloc((size_t)g_cfg.threads, sizeof(*tids)); + workers = calloc((size_t)g_cfg.threads, sizeof(*workers)); + for (t = 0; t < g_cfg.threads; t++) { + workers[t].tid = t; + bb_rng_seed(&workers[t].rng, g_cfg.seed + (uint64_t)t * 0x100); + } + + g_stop = 0; + t0 = bb_now_ms(); + for (t = 0; t < g_cfg.threads; t++) + pthread_create(&tids[t], NULL, worker_main, &workers[t]); + + /* Run for the requested wall-clock, then signal stop. */ + usleep((useconds_t)g_cfg.seconds * 1000000); + g_stop = 1; + for (t = 0; t < g_cfg.threads; t++) + pthread_join(tids[t], NULL); + elapsed = (bb_now_ms() - t0) / 1000.0; + + memset(total, 0, sizeof(total)); + memset(rtot, 0, sizeof(rtot)); + for (t = 0; t < g_cfg.threads; t++) + for (i = 0; i < T_N; i++) { + total[i] += workers[t].ok[i]; + rtot[i] += workers[t].retry[i]; + } + grand = 0; + for (i = 0; i < T_N; i++) grand += total[i]; + + printf("# txn-type committed retries\n"); + for (i = 0; i < T_N; i++) + printf("%-16s %10llu %9llu\n", g_tnames[i], + (unsigned long long)total[i], (unsigned long long)rtot[i]); + printf("tproc-c %2d threads %12.0f tpmC-like (%.0f txn/s over %.1fs)\n", + g_cfg.threads, (double)grand / elapsed * 60.0, + (double)grand / elapsed, elapsed); + + free(tids); free(workers); + (void)g_env->close(g_env, 0); + return 0; +} diff --git a/lab/bench/tproc_h.c b/lab/bench/tproc_h.c new file mode 100644 index 000000000..bbe576786 --- /dev/null +++ b/lab/bench/tproc_h.c @@ -0,0 +1,430 @@ +/*- + * See the file LICENSE for redistribution information. + * + * tproc_h -- a HammerDB-style "TPROC-H" analytic (OLAP) workload for libdb. + * + * Independently implemented; not the TPC-H benchmark and not comparable to + * TPC results. It models a star schema and runs long, read-mostly analytic + * scans concurrently with a trickle of point updates, to exercise the case + * MVCC is built for: long readers that must see a consistent snapshot without + * blocking (or being blocked by) the writers. + * + * Schema (one DB per table, integer keys): + * lineitem key=l_id -> { orderkey, partkey, suppkey, + * quantity, price, shipdate } (the fact) + * part key=p_id -> { retailprice, size } + * supplier key=s_id -> { acctbal, nation } + * + * Query threads (read-only) each run one of: + * Q-pricing scan lineitem in a shipdate window, sum revenue (range scan) + * Q-partagg scan lineitem, join part by partkey, sum by size bucket + * Q-suppagg scan lineitem, join supplier by suppkey, sum acctbal + * Writer threads apply point updates to part.retailprice and supplier.acctbal + * (so the analytic scans run against a moving target). + * + * With -m the query threads use snapshot isolation and never block on the + * writers. Without -m they take read locks (and serialize against writers); + * with -X txn they run with no isolation at all. This contrast is the point. + */ +#include "bdb_bench.h" + +#define N_PART 20000 +#define N_SUPPLIER 2000 +#define SHIPDATE_RANGE 2557 /* ~7 years of days */ + +static DB_ENV *g_env; +static bb_config g_cfg; +static DB *g_line, *g_part, *g_supp; +static int g_lineitems; /* scale * base */ +static volatile int g_stop; + +enum { Q_PRICING, Q_PARTAGG, Q_SUPPAGG, W_UPDATE, T_N }; +static const char *g_tnames[T_N] = { + "q-pricing", "q-partagg", "q-suppagg", "w-update" +}; + +typedef struct { + int tid; + int is_writer; + bb_rng rng; + uint64_t ops[T_N]; + uint64_t rows; /* rows scanned (reader) sink */ + uint64_t retry[T_N]; +} worker; + +typedef struct { + uint32_t orderkey, partkey, suppkey; + int32_t quantity, price, shipdate; + uint8_t pad[40]; +} line_rec; +typedef struct { int32_t retailprice, size; uint8_t pad[56]; } part_rec; +typedef struct { int64_t acctbal; int32_t nation; uint8_t pad[52]; } supp_rec; + +static int +open_db(DB **dbp, const char *name) +{ + DB *db; + int ret; + + if ((ret = db_create(&db, g_env, 0)) != 0) return ret; + if ((ret = db->open(db, NULL, name, NULL, DB_BTREE, + bb_db_flags(&g_cfg), 0)) != 0) { + g_env->err(g_env, ret, "open %s", name); + return ret; + } + *dbp = db; + return 0; +} + +static int +put_u32(DB *db, DB_TXN *txn, uint32_t key, void *rec, size_t sz) +{ + DBT k, d; + + memset(&k, 0, sizeof(k)); k.data = &key; k.size = sizeof(key); + memset(&d, 0, sizeof(d)); d.data = rec; d.size = (u_int32_t)sz; + return db->put(db, txn, &k, &d, 0); +} + +static int +get_u32(DB *db, DB_TXN *txn, uint32_t key, void *out, size_t outsz) +{ + DBT k, d; + + memset(&k, 0, sizeof(k)); k.data = &key; k.size = sizeof(key); + memset(&d, 0, sizeof(d)); + d.data = out; d.ulen = (u_int32_t)outsz; d.flags = DB_DBT_USERMEM; + return db->get(db, txn, &k, &d, 0); +} + +static int +populate(void) +{ + DB_TXN *txn = NULL; + line_rec l; part_rec p; supp_rec s; + uint32_t i; + int ret, n; + + if ((ret = open_db(&g_line, "lineitem.db")) != 0) return ret; + if ((ret = open_db(&g_part, "part.db")) != 0) return ret; + if ((ret = open_db(&g_supp, "supplier.db")) != 0) return ret; + + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + n = 0; + + for (i = 0; i < N_PART; i++) { + memset(&p, 0, sizeof(p)); + p.retailprice = 100 + (int32_t)(i % 9000); + p.size = 1 + (int32_t)(i % 50); + if ((ret = put_u32(g_part, txn, i, &p, sizeof(p))) != 0) goto err; + } + for (i = 0; i < N_SUPPLIER; i++) { + memset(&s, 0, sizeof(s)); + s.acctbal = 1000 + (int64_t)(i % 100000); + s.nation = (int32_t)(i % 25); + if ((ret = put_u32(g_supp, txn, i, &s, sizeof(s))) != 0) goto err; + } + for (i = 0; i < (uint32_t)g_lineitems; i++) { + memset(&l, 0, sizeof(l)); + l.orderkey = i / 4; + l.partkey = i % N_PART; + l.suppkey = i % N_SUPPLIER; + l.quantity = 1 + (int32_t)(i % 50); + l.price = 100 + (int32_t)(i % 90000); + l.shipdate = (int32_t)(i % SHIPDATE_RANGE); + if ((ret = put_u32(g_line, txn, i, &l, sizeof(l))) != 0) goto err; + if (++n % 20000 == 0 && g_cfg.use_txn) { + if ((ret = bb_commit(txn)) != 0) return ret; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + } + } + return bb_commit(txn); +err: + (void)bb_abort(txn); + return ret; +} + +/* Range scan of lineitem in a shipdate window, summing revenue. */ +static int +q_pricing(worker *w) +{ + DB_TXN *txn; + DBC *dbc; + DBT k, d; + line_rec l; + int32_t lo, hi; + uint64_t revenue; + int op = Q_PRICING, ret; + + lo = (int32_t)bb_rand_between(&w->rng, 0, SHIPDATE_RANGE - 200); + hi = lo + 180; + revenue = 0; +again: + dbc = NULL; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 1)) != 0) return ret; + if ((ret = g_line->cursor(g_line, txn, &dbc, 0)) != 0) goto fail; + memset(&k, 0, sizeof(k)); + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_FIRST); + while (ret == 0) { + if (l.shipdate >= lo && l.shipdate <= hi) + revenue += (uint64_t)l.price * (uint64_t)l.quantity; + w->rows++; + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_NEXT); + } + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); w->retry[op]++; goto again; } + (void)dbc->close(dbc); + if (ret != 0 && ret != DB_NOTFOUND) goto fail; + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ops[op]++; + return (revenue == ~0ULL) ? -1 : 0; /* keep revenue live */ +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +/* Scan lineitem, join part by partkey, accumulate by size. */ +static int +q_partagg(worker *w) +{ + DB_TXN *txn; + DBC *dbc; + DBT k, d; + line_rec l; + part_rec p; + uint64_t acc; + int op = Q_PARTAGG, ret, scanned; + + acc = 0; scanned = 0; +again: + dbc = NULL; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 1)) != 0) return ret; + if ((ret = g_line->cursor(g_line, txn, &dbc, 0)) != 0) goto fail; + memset(&k, 0, sizeof(k)); + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_FIRST); + /* Sample every 8th row to bound the join cost. */ + while (ret == 0) { + if ((scanned++ & 7) == 0) { + if (get_u32(g_part, txn, l.partkey, &p, sizeof(p)) == 0) + acc += (uint64_t)p.retailprice * (uint64_t)l.quantity; + w->rows++; + } + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_NEXT); + } + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); w->retry[op]++; goto again; } + (void)dbc->close(dbc); + if (ret != 0 && ret != DB_NOTFOUND) goto fail; + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ops[op]++; + return (acc == ~0ULL) ? -1 : 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +/* Scan a sample of lineitem, join supplier, sum acctbal. */ +static int +q_suppagg(worker *w) +{ + DB_TXN *txn; + DBC *dbc; + DBT k, d; + line_rec l; + supp_rec s; + int64_t acc; + int op = Q_SUPPAGG, ret, scanned; + + acc = 0; scanned = 0; +again: + dbc = NULL; + if ((ret = bb_begin(&g_cfg, g_env, &txn, 1)) != 0) return ret; + if ((ret = g_line->cursor(g_line, txn, &dbc, 0)) != 0) goto fail; + memset(&k, 0, sizeof(k)); + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_FIRST); + while (ret == 0) { + if ((scanned++ & 7) == 0) { + if (get_u32(g_supp, txn, l.suppkey, &s, sizeof(s)) == 0) + acc += s.acctbal; + w->rows++; + } + memset(&d, 0, sizeof(d)); d.data = &l; d.ulen = sizeof(l); d.flags = DB_DBT_USERMEM; + ret = dbc->get(dbc, &k, &d, DB_NEXT); + } + if (ret == DB_LOCK_DEADLOCK) { (void)dbc->close(dbc); (void)bb_abort(txn); w->retry[op]++; goto again; } + (void)dbc->close(dbc); + if (ret != 0 && ret != DB_NOTFOUND) goto fail; + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ops[op]++; + return (acc == ~(int64_t)0) ? -1 : 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +/* Writer: point-update a random part price and supplier balance. */ +static int +w_update(worker *w) +{ + DB_TXN *txn; + part_rec p; + supp_rec s; + uint32_t pid, sid; + int op = W_UPDATE, ret; + + pid = bb_rand_between(&w->rng, 0, N_PART - 1); + sid = bb_rand_between(&w->rng, 0, N_SUPPLIER - 1); +again: + if ((ret = bb_begin(&g_cfg, g_env, &txn, 0)) != 0) return ret; + ret = get_u32(g_part, txn, pid, &p, sizeof(p)); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry[op]++; goto again; } + if (ret != 0) goto fail; + p.retailprice += 1; + ret = put_u32(g_part, txn, pid, &p, sizeof(p)); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry[op]++; goto again; } + if (ret != 0) goto fail; + ret = get_u32(g_supp, txn, sid, &s, sizeof(s)); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry[op]++; goto again; } + if (ret != 0) goto fail; + s.acctbal += 1; + ret = put_u32(g_supp, txn, sid, &s, sizeof(s)); + if (ret == DB_LOCK_DEADLOCK) { (void)bb_abort(txn); w->retry[op]++; goto again; } + if (ret != 0) goto fail; + if ((ret = bb_commit(txn)) != 0) { txn = NULL; goto fail; } + w->ops[op]++; + return 0; +fail: + (void)bb_abort(txn); + return ret == DB_LOCK_DEADLOCK ? 0 : ret; +} + +static void * +worker_main(void *arg) +{ + worker *w = arg; + int ret; + + while (!g_stop) { + if (w->is_writer) + ret = w_update(w); + else { + uint32_t r = bb_rand_between(&w->rng, 0, 2); + ret = r == 0 ? q_pricing(w) : + r == 1 ? q_partagg(w) : q_suppagg(w); + } + if (ret != 0 && ret != DB_LOCK_DEADLOCK) { + fprintf(stderr, "op error: %s\n", db_strerror(ret)); + break; + } + } + return NULL; +} + +static void +usage(const char *p) +{ + fprintf(stderr, + "usage: %s [-i] [-h home] [-S scale] [-t querythreads] [-w writers]\n" + " [-s secs] [-c cachebytes] [-d sync|wnosync|nosync] [-m]\n" + " [-X txn|lock|log] [-R seed]\n" + " scale S gives ~S*250k lineitems\n", p); +} + +int +main(int argc, char **argv) +{ + pthread_t *tids; + worker *workers; + uint64_t total[T_N], rows; + double t0, elapsed; + int i, t, ret, nthreads, writers, argi; + + bb_config_defaults(&g_cfg); + + /* Pre-scan for our extra -w (writers) flag, then the shared options. */ + writers = 1; + for (argi = 1; argi < argc - 1; argi++) + if (strcmp(argv[argi], "-w") == 0) + writers = atoi(argv[argi + 1]); + /* Strip -w from argv so bb_getopt doesn't choke. */ + { + int j, k2 = 1; + char **nv = calloc((size_t)argc, sizeof(char *)); + nv[0] = argv[0]; + for (j = 1; j < argc; j++) { + if (strcmp(argv[j], "-w") == 0) { j++; continue; } + nv[k2++] = argv[j]; + } + if (bb_getopt(k2, nv, &g_cfg) != 0) { usage(argv[0]); return 1; } + free(nv); + } + if (g_cfg.scale < 1) g_cfg.scale = 1; + g_lineitems = g_cfg.scale * 250000; + + if ((ret = bb_env_open(&g_cfg, &g_env)) != 0) return 1; + + if (g_cfg.init) { + bb_print_config(&g_cfg, "tproc-h populate"); + t0 = bb_now_ms(); + if ((ret = populate()) != 0) { + fprintf(stderr, "populate: %s\n", db_strerror(ret)); + return 1; + } + printf("# populated %d lineitems in %.1f s\n", + g_lineitems, (bb_now_ms() - t0) / 1000.0); + (void)g_env->close(g_env, 0); + return 0; + } + + if ((ret = open_db(&g_line, "lineitem.db")) != 0 || + (ret = open_db(&g_part, "part.db")) != 0 || + (ret = open_db(&g_supp, "supplier.db")) != 0) { + fprintf(stderr, "open (did you -i first?): %s\n", db_strerror(ret)); + return 1; + } + + nthreads = g_cfg.threads + writers; + bb_print_config(&g_cfg, "tproc-h"); + printf("# %d query threads + %d writer threads\n", g_cfg.threads, writers); + + tids = calloc((size_t)nthreads, sizeof(*tids)); + workers = calloc((size_t)nthreads, sizeof(*workers)); + for (t = 0; t < nthreads; t++) { + workers[t].tid = t; + workers[t].is_writer = (t >= g_cfg.threads); + bb_rng_seed(&workers[t].rng, g_cfg.seed + (uint64_t)t * 0x100); + } + + g_stop = 0; + t0 = bb_now_ms(); + for (t = 0; t < nthreads; t++) + pthread_create(&tids[t], NULL, worker_main, &workers[t]); + usleep((useconds_t)g_cfg.seconds * 1000000); + g_stop = 1; + for (t = 0; t < nthreads; t++) + pthread_join(tids[t], NULL); + elapsed = (bb_now_ms() - t0) / 1000.0; + + memset(total, 0, sizeof(total)); + rows = 0; + for (t = 0; t < nthreads; t++) { + for (i = 0; i < T_N; i++) total[i] += workers[t].ops[i]; + rows += workers[t].rows; + } + + printf("# op-type completed\n"); + for (i = 0; i < T_N; i++) + printf("%-12s %10llu\n", g_tnames[i], + (unsigned long long)total[i]); + printf("tproc-h %d q + %d w queries/s=%.1f rows-scanned/s=%.0f (%.1fs)\n", + g_cfg.threads, writers, + (double)(total[Q_PRICING] + total[Q_PARTAGG] + total[Q_SUPPAGG]) / elapsed, + (double)rows / elapsed, elapsed); + + free(tids); free(workers); + (void)g_env->close(g_env, 0); + return 0; +}