]> git.sesse.net Git - letsencrypt-varnish-plugin/blob - varnish.py
Small update for certbot in bookworm.
[letsencrypt-varnish-plugin] / varnish.py
1 """Plugin for authenticating directly out of Varnish's VCL."""
2 import os
3 import logging
4 import re
5 import subprocess
6
7 import zope.interface
8 from zope.interface import implementer, provider
9
10 from acme import challenges
11
12 from certbot import errors
13 from certbot import interfaces
14 from certbot.plugins import common
15
16
17 logger = logging.getLogger(__name__)
18
19 def vcl_recv_line(achall):
20     # Don't bother checking for the right host, we could be coming in through a redirect.
21     return 'if (req.url == "/%s/%s") { return (synth(999, "Challenge")); }  # Added by certbot Varnish plugin for authentication\n' % (achall.URI_ROOT_PATH, achall.chall.encode("token"))
22
23 def vcl_synth_line(validation):
24     return 'if (resp.status == 999) { set resp.status = 200; set resp.http.Content-Type = "text/plain"; synthetic("%s"); return (deliver); }  # Added by certbot Varnish plugin for authentication\n' % (validation);
25
26 @implementer(interfaces.IAuthenticator)
27 @provider(interfaces.IPluginFactory)
28 class Authenticator(common.Plugin):
29     hidden = True
30
31     description = "Manual configuration, authentication via Varnish VCL"
32
33     def __init__(self, *args, **kwargs):
34         super(Authenticator, self).__init__(*args, **kwargs)
35         self._httpd = None
36
37     def prepare(self):  # pylint: disable=missing-docstring,no-self-use
38         pass  # pragma: no cover
39
40     def more_info(self):  # pylint: disable=missing-docstring,no-self-use
41         return ("")
42
43     def get_chall_pref(self, domain):
44         # pylint: disable=missing-docstring,no-self-use,unused-argument
45         return [challenges.HTTP01]
46
47     def perform(self, achalls):  # pylint: disable=missing-docstring
48         responses = []
49         for achall in achalls:
50             responses.append(self._perform_single(achall))
51         return responses
52
53     @classmethod
54     def add_parser_arguments(cls, add):
55         pass
56
57     def _perform_single(self, achall):
58         # same path for each challenge response would be easier for
59         # users, but will not work if multiple domains point at the
60         # same server: default command doesn't support virtual hosts
61         response, validation = achall.response_and_validation()
62
63         with open("/etc/varnish/default.vcl") as vcl:
64             content = vcl.readlines()
65
66         self.old_content = content
67
68         found_vcl_recv = False
69         found_vcl_synth = False
70         new_content = []
71         for line in content:
72             if re.search("# Added by certbot Varnish plugin", line):
73                 # Don't include this line; left by a previous run.
74                 continue
75             new_content.append(line)
76             if re.search("^sub\s+vcl_recv\s+{\s*$", line):
77                 new_content.append(vcl_recv_line(achall))
78                 found_vcl_recv = True
79             if re.search("^sub\s+vcl_synth\s+{\s*$", line):
80                 new_content.append(vcl_synth_line(validation))
81                 found_vcl_synth = True
82
83         if not found_vcl_recv:
84             new_content.append("sub vcl_recv {  # Added by certbot Varnish plugin for authentication\n")
85             new_content.append(vcl_recv_line(achall))
86             new_content.append("}  # Added by certbot Varnish plugin for authentication\n")
87
88         if not found_vcl_synth:
89             new_content.append("sub vcl_synth {  # Added by certbot Varnish plugin for authentication\n")
90             new_content.append(vcl_synth_line(validation))
91             new_content.append("}  # Added by certbot Varnish plugin for authentication\n")
92
93         with open("/etc/varnish/default.vcl", "w") as vcl:
94             vcl.writelines(new_content)
95
96         subprocess.call(["systemctl", "reload", "varnish.service"])
97
98         if response.simple_verify(
99                 achall.chall, achall.domain,
100                 achall.account_key.public_key(), self.config.http01_port):
101             return response
102         else:
103             logger.error(
104                 "Self-verify of challenge failed, authorization abandoned!")
105             return None
106
107     def cleanup(self, achalls):
108         # pylint: disable=missing-docstring,no-self-use,unused-argument
109         with open("/etc/varnish/default.vcl", "w") as vcl:
110             vcl.writelines(self.old_content)
111
112         subprocess.call(["systemctl", "reload", "varnish.service"])