51 lines
1.4 KiB
Python
51 lines
1.4 KiB
Python
|
from bundlewrap.exceptions import FaultUnavailable
|
||
|
from bundlewrap.utils import Fault
|
||
|
import json
|
||
|
from pathlib import Path
|
||
|
import subprocess
|
||
|
|
||
|
class BwSops:
|
||
|
def __init__(self, file):
|
||
|
self.file = Path(file)
|
||
|
self._secrets = None
|
||
|
|
||
|
def _fetch_secrets(self):
|
||
|
try:
|
||
|
p = subprocess.run(["sops", "decrypt", str(self.file)], capture_output=True, check=True)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
raise FaultUnavailable(f"SOPS: \n{e.stderr}")
|
||
|
|
||
|
self._secrets = json.loads(p.stdout)
|
||
|
|
||
|
def _get(self, path):
|
||
|
if self._secrets is None:
|
||
|
self._fetch_secrets()
|
||
|
|
||
|
out = self._secrets
|
||
|
|
||
|
_path_traversed = []
|
||
|
|
||
|
try:
|
||
|
for name in path:
|
||
|
if not isinstance(out, dict):
|
||
|
raise KeyError(name)
|
||
|
out = out[name]
|
||
|
_path_traversed.append(name)
|
||
|
except KeyError as e:
|
||
|
raise FaultUnavailable(
|
||
|
f"SOPS secret not found: {self.file}:{'/'.join(path)}\n"
|
||
|
f" {e.args[0]} not in {'/'.join(_path_traversed)}"
|
||
|
)
|
||
|
|
||
|
return out
|
||
|
|
||
|
def get(self, path):
|
||
|
if isinstance(path, str):
|
||
|
path = path.split("/")
|
||
|
|
||
|
return Fault(
|
||
|
f"SOPS {self.file}:{'/'.join(path)}",
|
||
|
self._get,
|
||
|
path=path,
|
||
|
)
|