Python wrapper of etcd_client built with PyO3.
pip install etcd_client
from etcd_client import EtcdClient
etcd = EtcdClient(['http:://127.0.0.1:2379'])
Actual connection establishment with Etcd's gRPC channel will be done when you call EtcdClient.connect()
.
async def main():
async with etcd.connect() as communicator:
await communicator.put('testkey'.encode(), 'testvalue'.encode())
value = await communicator.get('testkey'.encode())
print(bytes(value).decode()) # testvalue
EtcdCommunicator.get_prefix(prefix)
will return a tuple of list containing all key-values with given key prefix.
async def main():
async with etcd.connect() as communicator:
await communicator.put('/testdir'.encode(), 'root'.encode())
await communicator.put('/testdir/1'.encode(), '1'.encode())
await communicator.put('/testdir/2'.encode(), '2'.encode())
await communicator.put('/testdir/2/3'.encode(), '3'.encode())
test_dir = await communicator.get_prefix('/testdir'.encode())
for resp in test_dir:
# ['/testdir', 'root']
# ['/testdir/1', '1']
# ['/testdir/2', '2']
# ['/testdir/2/3', '3']
print([bytes(v).decode() for v in resp])
Just like EtcdClient.connect()
, you can easilly use etcd lock by calling EtcdClient.with_lock(lock_opts)
.
async def first():
async with etcd.with_lock(
EtcdLockOption(
lock_name='foolock'.encode(),
)
) as communicator:
value = await communicator.get('testkey'.encode())
print('first:', bytes(value).decode(), end=' | ')
async def second():
await asyncio.sleep(0.1)
async with etcd.with_lock(
EtcdLockOption(
lock_name='foolock'.encode(),
)
) as communicator:
value = await communicator.get('testkey'.encode())
print('second:', bytes(value).decode())
async with etcd.connect() as communicator:
await communicator.put('testkey'.encode(), 'testvalue'.encode())
await asyncio.gather(first(), second()) # first: testvalue | second: testvalue
Adding timeout
parameter to EtcdClient.with_lock()
call will add a timeout to lock acquiring process.
async def first():
async with etcd.with_lock(
EtcdLockOption(
lock_name='foolock'.encode(),
)
) as communicator:
value = await communicator.get('testkey'.encode())
print('first:', bytes(value).decode(), end=' | ')
async def second():
await asyncio.sleep(0.1)
async with etcd.with_lock(
EtcdLockOption(
lock_name='foolock'.encode(),
timeout=5.0,
)
) as communicator:
value = await communicator.get('testkey'.encode())
print('second:', bytes(value).decode())
async with etcd.connect() as communicator:
await communicator.put('testkey'.encode(), 'testvalue'.encode())
await asyncio.gather(first(), second()) # first: testvalue | second: testvalue
Adding ttl
parameter to EtcdClient.with_lock()
call will force lock to be released after given seconds.
async def first():
async with etcd.with_lock(
EtcdLockOption(lock_name="foolock".encode(), ttl=5)
) as communicator:
await asyncio.sleep(10)
async def second():
start = time.time()
async with etcd.with_lock(
EtcdLockOption(lock_name="foolock".encode(), ttl=5)
) as communicator:
print(f"acquired lock after {time.time() - start} seconds")
# 'second' acquired lock after 5.247947931289673 seconds
done, _ = await asyncio.wait([
asyncio.create_task(first()),
asyncio.create_task(second())
], return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(task.result())
You can watch changes on key with EtcdCommunicator.watch(key)
.
async def watch():
async with etcd.connect() as communicator:
async for event in communicator.watch('testkey'.encode()):
print(event.event, bytes(event.value).decode())
async def update():
await asyncio.sleep(0.1)
async with etcd.connect() as communicator:
await communicator.put('testkey'.encode(), '1'.encode())
await communicator.put('testkey'.encode(), '2'.encode())
await communicator.put('testkey'.encode(), '3'.encode())
await communicator.put('testkey'.encode(), '4'.encode())
await communicator.put('testkey'.encode(), '5'.encode())
await asyncio.gather(watch(), update())
# WatchEventType.PUT 1
# WatchEventType.PUT 2
# WatchEventType.PUT 3
# WatchEventType.PUT 4
# WatchEventType.PUT 5
Watching changes on keys with specific prefix can be also done by EtcdCommunicator.watch_prefix(key_prefix)
.
async def watch():
async with etcd.connect() as communicator:
async for event in communicator.watch_prefix('/testdir'.encode()):
print(event.event, bytes(event.key).decode(), bytes(event.value).decode())
async def update():
await asyncio.sleep(0.1)
async with etcd.connect() as communicator:
await communicator.put('/testdir'.encode(), '1'.encode())
await communicator.put('/testdir/foo'.encode(), '2'.encode())
await communicator.put('/testdir/bar'.encode(), '3'.encode())
await communicator.put('/testdir/foo/baz'.encode(), '4'.encode())
await asyncio.gather(watch(), update())
# WatchEventType.PUT /testdir 1
# WatchEventType.PUT /testdir/foo 2
# WatchEventType.PUT /testdir/bar 3
# WatchEventType.PUT /testdir/foo/baz 4
You can run etcd transaction by calling EtcdCommunicator.txn(txn)
.
Constructing compare operations can be done by comparing Compare
instance.
from etcd_client import Compare, CompareOp
compares = [
Compare.value('cmpkey1'.encode(), CompareOp.EQUAL, 'foo'.encode()),
Compare.value('cmpkey2'.encode(), CompareOp.GREATER, 'bar'.encode()),
]
async with etcd.connect() as communicator:
await communicator.put('cmpkey1'.encode(), 'foo'.encode())
await communicator.put('cmpkey2'.encode(), 'baz'.encode())
await communicator.put('successkey'.encode(), 'asdf'.encode())
compares = [
Compare.value('cmpkey1'.encode(), CompareOp.EQUAL, 'foo'.encode()),
Compare.value('cmpkey2'.encode(), CompareOp.GREATER, 'bar'.encode()),
]
res = await communicator.txn(Txn().when(compares).and_then([TxnOp.get('successkey'.encode())]))
print(res) # TODO: Need to write response type bindings.
- The Rust development environment (the 2021 edition or later) using
rustup
or your package manager - The Python development environment (3.10 or later) using
pyenv
or your package manager
First, create a virtualenv (either using the standard venv package, pyenv, or whatever your favorite). Then, install the PEP-517 build toolchain and run it.
pip install -U pip build setuptools
python -m build --sdist --wheel
It will automatically install build dependencies like
maturin
and build the wheel and source
distributions under the dist/
directory.
(TODO: run maturin for an editable setup)