1 """Plugin for authenticating directly out of Varnish's VCL."""
8 from zope.interface import implementer, provider
10 from acme import challenges
12 from certbot import errors
13 from certbot import interfaces
14 from certbot.plugins import common
17 logger = logging.getLogger(__name__)
19 def vcl_recv_line(achall):
20 # Don't bother checking for the right host, we could be coming in through a redirect.
21 return 'if (req.url == "/%s/%s") { return (synth(999, "Challenge")); } # Added by certbot Varnish plugin for authentication\n' % (achall.URI_ROOT_PATH, achall.chall.encode("token"))
23 def vcl_synth_line(validation):
24 return 'if (resp.status == 999) { set resp.status = 200; set resp.http.Content-Type = "text/plain"; synthetic("%s"); return (deliver); } # Added by certbot Varnish plugin for authentication\n' % (validation);
26 @implementer(interfaces.IAuthenticator)
27 @provider(interfaces.IPluginFactory)
28 class Authenticator(common.Plugin):
31 description = "Manual configuration, authentication via Varnish VCL"
33 def __init__(self, *args, **kwargs):
34 super(Authenticator, self).__init__(*args, **kwargs)
37 def prepare(self): # pylint: disable=missing-docstring,no-self-use
38 pass # pragma: no cover
40 def more_info(self): # pylint: disable=missing-docstring,no-self-use
43 def get_chall_pref(self, domain):
44 # pylint: disable=missing-docstring,no-self-use,unused-argument
45 return [challenges.HTTP01]
47 def perform(self, achalls): # pylint: disable=missing-docstring
49 for achall in achalls:
50 responses.append(self._perform_single(achall))
54 def add_parser_arguments(cls, add):
57 def _perform_single(self, achall):
58 # same path for each challenge response would be easier for
59 # users, but will not work if multiple domains point at the
60 # same server: default command doesn't support virtual hosts
61 response, validation = achall.response_and_validation()
63 with open("/etc/varnish/default.vcl") as vcl:
64 content = vcl.readlines()
66 self.old_content = content
68 found_vcl_recv = False
69 found_vcl_synth = False
72 if re.search("# Added by certbot Varnish plugin", line):
73 # Don't include this line; left by a previous run.
75 new_content.append(line)
76 if re.search("^sub\s+vcl_recv\s+{\s*$", line):
77 new_content.append(vcl_recv_line(achall))
79 if re.search("^sub\s+vcl_synth\s+{\s*$", line):
80 new_content.append(vcl_synth_line(validation))
81 found_vcl_synth = True
83 if not found_vcl_recv:
84 new_content.append("sub vcl_recv { # Added by certbot Varnish plugin for authentication\n")
85 new_content.append(vcl_recv_line(achall))
86 new_content.append("} # Added by certbot Varnish plugin for authentication\n")
88 if not found_vcl_synth:
89 new_content.append("sub vcl_synth { # Added by certbot Varnish plugin for authentication\n")
90 new_content.append(vcl_synth_line(validation))
91 new_content.append("} # Added by certbot Varnish plugin for authentication\n")
93 with open("/etc/varnish/default.vcl", "w") as vcl:
94 vcl.writelines(new_content)
96 subprocess.call(["systemctl", "reload", "varnish.service"])
98 if response.simple_verify(
99 achall.chall, achall.domain,
100 achall.account_key.public_key(), self.config.http01_port):
104 "Self-verify of challenge failed, authorization abandoned!")
107 def cleanup(self, achalls):
108 # pylint: disable=missing-docstring,no-self-use,unused-argument
109 with open("/etc/varnish/default.vcl", "w") as vcl:
110 vcl.writelines(self.old_content)
112 subprocess.call(["systemctl", "reload", "varnish.service"])