Manage secrets with sops
This commit is contained in:
50
libs/bwsops.py
Normal file
50
libs/bwsops.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from bundlewrap.exceptions import FaultUnavailable
|
||||
from bundlewrap.utils import Fault
|
||||
import json
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
class BwSops:
|
||||
def __init__(self, file):
|
||||
self.file = Path(file)
|
||||
self._secrets = None
|
||||
|
||||
def _fetch_secrets(self):
|
||||
try:
|
||||
p = subprocess.run(["sops", "decrypt", str(self.file)], capture_output=True, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise FaultUnavailable(f"SOPS: \n{e.stderr}")
|
||||
|
||||
self._secrets = json.loads(p.stdout)
|
||||
|
||||
def _get(self, path):
|
||||
if self._secrets is None:
|
||||
self._fetch_secrets()
|
||||
|
||||
out = self._secrets
|
||||
|
||||
_path_traversed = []
|
||||
|
||||
try:
|
||||
for name in path:
|
||||
if not isinstance(out, dict):
|
||||
raise KeyError(name)
|
||||
out = out[name]
|
||||
_path_traversed.append(name)
|
||||
except KeyError as e:
|
||||
raise FaultUnavailable(
|
||||
f"SOPS secret not found: {self.file}:{'/'.join(path)}\n"
|
||||
f" {e.args[0]} not in {'/'.join(_path_traversed)}"
|
||||
)
|
||||
|
||||
return out
|
||||
|
||||
def get(self, path):
|
||||
if isinstance(path, str):
|
||||
path = path.split("/")
|
||||
|
||||
return Fault(
|
||||
f"SOPS {self.file}:{'/'.join(path)}",
|
||||
self._get,
|
||||
path=path,
|
||||
)
|
Reference in New Issue
Block a user