1 """Plugin for authenticating directly out of Varnish's VCL."""
9 from zope.interface import implementer, provider
11 from acme import challenges
13 from certbot import errors
14 from certbot import interfaces
15 from certbot.plugins import common
18 logger = logging.getLogger(__name__)
20 def vcl_recv_line(achall):
21 # Don't bother checking for the right host, we could be coming in through a redirect.
22 return 'if (req.url == "/%s/%s") { return (synth(999, "Challenge")); } # Added by certbot Varnish plugin for authentication\n' % (achall.URI_ROOT_PATH, achall.chall.encode("token"))
24 def vcl_synth_line(validation):
25 return 'if (resp.status == 999) { set resp.status = 200; set resp.http.Content-Type = "text/plain"; synthetic("%s"); return (deliver); } # Added by certbot Varnish plugin for authentication\n' % (validation);
27 @implementer(interfaces.IAuthenticator)
28 @provider(interfaces.IPluginFactory)
29 class Authenticator(common.Plugin):
32 description = "Manual configuration, authentication via Varnish VCL"
34 def __init__(self, *args, **kwargs):
35 super(Authenticator, self).__init__(*args, **kwargs)
38 def prepare(self): # pylint: disable=missing-docstring,no-self-use
39 pass # pragma: no cover
41 def more_info(self): # pylint: disable=missing-docstring,no-self-use
44 def get_chall_pref(self, domain):
45 # pylint: disable=missing-docstring,no-self-use,unused-argument
46 return [challenges.HTTP01]
48 def perform(self, achalls): # pylint: disable=missing-docstring
50 for achall in achalls:
51 responses.append(self._perform_single(achall))
54 def _perform_single(self, achall):
55 # same path for each challenge response would be easier for
56 # users, but will not work if multiple domains point at the
57 # same server: default command doesn't support virtual hosts
58 response, validation = achall.response_and_validation()
60 with open("/etc/varnish/default.vcl") as vcl:
61 content = vcl.readlines()
63 self.old_content = content
65 found_vcl_recv = False
66 found_vcl_synth = False
69 if re.search("# Added by certbot Varnish plugin", line):
70 # Don't include this line; left by a previous run.
72 new_content.append(line)
73 if re.search("^sub\s+vcl_recv\s+{\s*$", line):
74 new_content.append(vcl_recv_line(achall))
76 if re.search("^sub\s+vcl_synth\s+{\s*$", line):
77 new_content.append(vcl_synth_line(validation))
78 found_vcl_synth = True
80 if not found_vcl_recv:
81 new_content.append("sub vcl_recv { # Added by certbot Varnish plugin for authentication\n")
82 new_content.append(vcl_recv_line(achall))
83 new_content.append("} # Added by certbot Varnish plugin for authentication\n")
85 if not found_vcl_synth:
86 new_content.append("sub vcl_synth { # Added by certbot Varnish plugin for authentication\n")
87 new_content.append(vcl_synth_line(validation))
88 new_content.append("} # Added by certbot Varnish plugin for authentication\n")
90 with open("/etc/varnish/default.vcl", "w") as vcl:
91 vcl.writelines(new_content)
93 subprocess.call(["systemctl", "reload", "varnish.service"])
95 if response.simple_verify(
96 achall.chall, achall.domain,
97 achall.account_key.public_key(), self.config.http01_port):
101 "Self-verify of challenge failed, authorization abandoned!")
104 def cleanup(self, achalls):
105 # pylint: disable=missing-docstring,no-self-use,unused-argument
106 with open("/etc/varnish/default.vcl", "w") as vcl:
107 vcl.writelines(self.old_content)
109 subprocess.call(["systemctl", "reload", "varnish.service"])