-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
4 changed files
with
254 additions
and
80 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,104 +1,155 @@ | ||
import requests | ||
import logging | ||
|
||
from .utils import CustomExceptionServerError | ||
from datetime import datetime | ||
from django.conf import settings | ||
from typing import List | ||
from cloudflare import Cloudflare | ||
|
||
|
||
ZONE_IDS = { | ||
"localcert.net.": "ab2d04b0ccf31906dd87900f0db11f73", | ||
"localhostcert.net.": "ac1335db9f052915b076c0de09e06443", | ||
PDNS_API_BASE_URL = f"http://{settings.LOCALCERT_PDNS_SERVER_IP}:{settings.LOCALCERT_PDNS_API_PORT}/api/v1" | ||
PDNS_HEADERS = { | ||
"X-API-Key": settings.LOCALCERT_PDNS_API_KEY, | ||
"accept": "application/json", | ||
} | ||
|
||
|
||
client = Cloudflare(api_token=os.environ.get("CLOUDFLARE_TOKEN")) | ||
def pdns_create_zone(zone: str): | ||
assert zone.endswith(".") | ||
|
||
logging.debug(f"[PDNS] Create {zone}") | ||
|
||
# TODO: Some records are set by wildcard, hardcode these | ||
def pdns_describe_domain(domain: str) -> dict: | ||
assert domain.endswith(".") | ||
logging.debug(f"[PDNS] Describe {domain}") | ||
# Create zone in pdns | ||
resp = requests.post( | ||
PDNS_API_BASE_URL + "/servers/localhost/zones", | ||
headers=PDNS_HEADERS, | ||
json={ | ||
"name": zone, | ||
"kind": "Native", | ||
}, | ||
) | ||
json_resp = resp.json() | ||
|
||
if "error" in json_resp.keys(): | ||
raise CustomExceptionServerError(json_resp["error"]) # pragma: no cover | ||
|
||
# success | ||
return | ||
|
||
|
||
# TODO use the targeted name/type | ||
def pdns_describe_domain(zone_name: str) -> dict: | ||
assert zone_name.endswith(".") | ||
|
||
logging.debug(f"[PDNS] Describe {zone_name}") | ||
|
||
# TODO: newer pdns versions can filter by name/type | ||
resp = requests.get( | ||
f"{PDNS_API_BASE_URL}/servers/localhost/zones/{zone_name}", | ||
headers=PDNS_HEADERS, | ||
) | ||
if resp.status_code != requests.codes.ok: | ||
raise CustomExceptionServerError( | ||
f"Unable to describe domain, PDNS error code: {resp.status_code}" | ||
) # pragma: no cover | ||
|
||
return resp.json() | ||
|
||
for k, v in ZONE_IDS.items(): | ||
if domain.endswith(f".{k}") | ||
zone_id = v | ||
break | ||
else: | ||
# Ooops | ||
return {} | ||
|
||
# CF doesn't use trailing dot | ||
domain = domain[:-1] | ||
|
||
# Two lookups: | ||
# <domain>.<zone> (exact) | ||
# *.<domain>.<zone> (endswith) | ||
results = client.dns.records.list( | ||
zone_id=zone_id, | ||
name={"endswith": f".{domain}"}, | ||
type="TXT", | ||
).result | ||
r2 = client.dns.records.list( | ||
zone_id=zone_id, | ||
name={"exact": domain}, | ||
type="TXT", | ||
).result | ||
results.extend(r2) | ||
|
||
rrsets = [] | ||
for result in results: | ||
rrset.append({ | ||
"type": "TXT", | ||
"name": result.name, | ||
"content": result.content, | ||
"ttl": result.ttl, | ||
}) | ||
return { "rrsets": rrsets } | ||
def pdns_delete_rrset(zone_name: str, rr_name: str, rrtype: str): | ||
assert zone_name.endswith(".") | ||
assert rr_name.endswith(zone_name) | ||
assert rrtype == "TXT" | ||
|
||
logging.debug(f"[PDNS] Delete {zone_name} {rr_name} {rrtype}") | ||
|
||
resp = requests.patch( | ||
f"{PDNS_API_BASE_URL}/servers/localhost/zones/{zone_name}", | ||
headers=PDNS_HEADERS, | ||
json={ | ||
"rrsets": [ | ||
{ | ||
"name": rr_name, | ||
"type": "TXT", | ||
"changetype": "DELETE", | ||
}, | ||
], | ||
}, | ||
) | ||
|
||
if resp.status_code != requests.codes.no_content: | ||
raise CustomExceptionServerError(f"{resp.status_code}") # pragma: no cover | ||
|
||
# success | ||
return | ||
|
||
|
||
def pdns_replace_rrset( | ||
zone_name: str, rr_name: str, rr_type: str, ttl: int, record_contents: List[str] | ||
): | ||
""" | ||
record_contents - Records from least recently added | ||
""" | ||
assert rr_name.endswith(".") | ||
assert rr_name.endswith(zone_name) | ||
assert rr_type == "TXT" | ||
|
||
# CF doesn't use trailing dot | ||
rr_name = rr_name[:-1] | ||
|
||
# Collect the existing content | ||
zone_id = ZONE_IDS[zone_name] | ||
results = client.dns.records.list( | ||
zone_id=zone_id, | ||
name=rr_name, | ||
type=rr_type, | ||
).result | ||
|
||
for record in results: | ||
if record.content not in record_contents: | ||
# Delete records that are no longer needed | ||
client.dns.records.delete( | ||
zone_id=zone_id, | ||
dns_record_id=record.id, | ||
) | ||
else: | ||
# Don't alter records that already exist | ||
record_contents.remove(record.content) | ||
|
||
for content in record_contents: | ||
# Create anything that's new | ||
client.dns.records.create( | ||
zone_id=zone_id, | ||
name=rr_name, | ||
type=rr_type, | ||
content=content, | ||
) | ||
assert rr_type in ["TXT", "A", "MX", "NS", "SOA"] | ||
|
||
logging.debug( | ||
f"[PDNS] Replace {zone_name} {rr_name} {rr_type} {ttl} {record_contents}" | ||
) | ||
|
||
records = [ | ||
{ | ||
"content": content, | ||
"disabled": False, | ||
} | ||
for content in record_contents | ||
] | ||
comments = [ | ||
{ | ||
"content": f"{record_contents[idx]} : {idx}", | ||
"account": "", | ||
"modified_at": int(datetime.now().timestamp()), | ||
} | ||
for idx in range(len(record_contents)) | ||
] | ||
|
||
resp = requests.patch( | ||
f"{PDNS_API_BASE_URL}/servers/localhost/zones/{zone_name}", | ||
headers=PDNS_HEADERS, | ||
json={ | ||
"rrsets": [ | ||
{ | ||
"name": rr_name, | ||
"type": rr_type, | ||
"changetype": "REPLACE", | ||
"ttl": ttl, | ||
"records": records, | ||
"comments": comments, | ||
}, | ||
], | ||
}, | ||
) | ||
|
||
if resp.status_code != requests.codes.no_content: | ||
raise CustomExceptionServerError( | ||
f"{resp.status_code}: {resp.content.decode('utf-8')}" | ||
) # pragma: no cover | ||
|
||
# success | ||
return | ||
|
||
|
||
def pdns_get_stats(): | ||
resp = requests.get( | ||
f"{PDNS_API_BASE_URL}/servers/localhost/statistics", | ||
headers=PDNS_HEADERS, | ||
) | ||
|
||
if resp.status_code != 200: # pragma: no cover | ||
logging.error(f"{resp.status_code}: {resp.content.decode('utf-8')}") | ||
return {} | ||
|
||
# success | ||
return resp.json() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.