Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
577 views
in Technique[技术] by (71.8m points)

sqlite - Share a dict with multiple Python scripts

I'd like a unique dict (key/value) database to be accessible from multiple Python scripts running at the same time.

If script1.py updates d[2839], then script2.py should see the modified value when querying d[2839] a few seconds after.

What would be a Pythonic solution for this?

Note: I'm on Windows, and the dict should have maximum 1M items (key and value both integers).

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Mose of embedded datastore other than SQLite doesn't have optimization for concurrent access, I was also curious about SQLite concurrent performance too, so I did a benchmark:

import time
import sqlite3
import os
import random
import sys
import multiprocessing


class Store():

    def __init__(self, filename='kv.db'):
        self.conn = sqlite3.connect(filename, timeout=60)
        self.conn.execute('pragma journal_mode=wal')
        self.conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
        self.conn.commit()

    def get(self, key):
        item = self.conn.execute('select value from "kv" where key=?', (key,))
        if item:
            return next(item)[0]

    def set(self, key, value):
        self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
        self.conn.commit()


def worker(n):
    d = [random.randint(0, 1<<31) for _ in range(n)]
    s = Store()
    for i in d:
        s.set(i, i)
    random.shuffle(d)
    for i in d:
        s.get(i)


def test(c):
    n = 5000
    start = time.time()
    ps = []
    for _ in range(c):
        p = multiprocessing.Process(target=worker, args=(n,))
        p.start()
        ps.append(p)
    while any(p.is_alive() for p in ps):
        time.sleep(0.01)
    cost = time.time() - start
    print(f'{c:<10d}{cost:<7.2f}{n/cost:<20.2f}{n*c/cost:<14.2f}')


def main():
    print(f'concurrencytime(s)pre process TPS(r/s)total TPS(r/s)')
    for c in range(1, 9):
        test(c)


if __name__ == '__main__':
    main()

result on my 4 cores macOS box, SSD volume:

concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
1           0.65    7638.43                 7638.43
2           1.30    3854.69                 7709.38
3           1.83    2729.32                 8187.97
4           2.43    2055.25                 8221.01
5           3.07    1629.35                 8146.74
6           3.87    1290.63                 7743.78
7           4.80    1041.73                 7292.13
8           5.37    931.27                  7450.15

result on an 8 cores windows server 2012 cloud server, SSD volume:

concurrency     time(s) pre process TPS(r/s)    total TPS(r/s)
1               4.12    1212.14                 1212.14
2               7.87    634.93                  1269.87
3               14.06   355.56                  1066.69
4               15.84   315.59                  1262.35
5               20.19   247.68                  1238.41
6               24.52   203.96                  1223.73
7               29.94   167.02                  1169.12
8               34.98   142.92                  1143.39

turns out overall throughput is consistent regardless of concurrency, and SQLite is slower on windows than macOS, hope this is helpful.


As SQLite write lock is database wise, in order to get more TPS, you could partition data to multi-database files:

class MultiDBStore():

    def __init__(self, buckets=5):
        self.buckets = buckets
        self.conns = []
        for n in range(buckets):
            conn = sqlite3.connect(f'kv_{n}.db', timeout=60)
            conn.execute('pragma journal_mode=wal')
            conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
            conn.commit()
            self.conns.append(conn)

    def _get_conn(self, key):
        assert isinstance(key, int)
        return self.conns[key % self.buckets]

    def get(self, key):
        item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
        if item:
            return next(item)[0]

    def set(self, key, value):
        conn = self._get_conn(key)
        conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
        conn.commit()

result on my mac with 20 partitions:

concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
1           2.07    4837.17                 4837.17
2           2.51    3980.58                 7961.17
3           3.28    3047.68                 9143.03
4           4.02    2486.76                 9947.04
5           4.44    2249.94                 11249.71
6           4.76    2101.26                 12607.58
7           5.25    1903.69                 13325.82
8           5.71    1752.46                 14019.70

total TPS is higher than single database file.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...