如果我们简单地使用国外 DNS 服务器(如 Google 的 8.8.8.8)来解析所有域名,会带来两个问题:
对于国内网站,可能会解析到离你较远的 IP 地址(甚至是海外 IP 地址),导致访问速度变慢
对于使用了 CDN 的网站,可能无法获得最适合你所在网络环境的服务器 IP
我 11 年前的文章《一个抗污染 DNS 的搭建》介绍了一种基于权威 DNS 服务器的方案,对权威 DNS 服务器在国内的域名,使用国内出口解析;对权威 DNS 服务器在海外的域名,采用海外隧道出口解析。但这种方法目前已经不再实用,因为很多权威 DNS 服务器具有国内和国外多个 IP 地址,它们根据用户的公网 IP 分区域解析,因此简单通过权威 DNS 服务器的 IP 地址来区分国内和国外网站已经不再实用了。
本文提出的新方案不再依赖于权威 DNS 服务器的位置,而是基于 DNS 响应内容进行智能判断。它是基于一个简单的观察,DNS 污染返回的 IP 地址一般不在中国境内。那么我们只要通过国内和国外的 DNS 分别解析,只要国内 DNS 返回的结果在中国境内,就认为是合法的 DNS 查询结果。否则,就以国外 DNS 的查询结果为准。之所以要通过隧道获取国外 DNS 查询结果,是为了获取距离隧道的海外出口最近的 CDN 节点。
# 全局变量,存储 IP 网络 cn_networks = [] cn_networks_v6 = []
defget_china_ip_list(is_ipv6=False): """获取或更新中国 IP 列表""" url = CHINA_IPV6_LIST_URL if is_ipv6 else CHINA_IP_LIST_URL cache_file = CHINA_IPV6_CACHE_FILE if is_ipv6 else CHINA_IP_CACHE_FILE # 检查缓存是否存在且未过期 if os.path.exists(cache_file): file_age = time.time() - os.path.getmtime(cache_file) if file_age < CACHE_DURATION: withopen(cache_file, 'r') as f: return [line.strip() for line in f if line.strip() andnot line.startswith('#')] # 下载新列表 try: response = requests.get(url) if response.status_code == 200: ip_list = [line.strip() for line in response.text.split('\n') if line.strip() andnot line.startswith('#')] # 保存到缓存 withopen(cache_file, 'w') as f: f.write('\n'.join(ip_list)) return ip_list else: # 如果下载失败但缓存存在,使用缓存 if os.path.exists(cache_file): withopen(cache_file, 'r') as f: return [line.strip() for line in f if line.strip() andnot line.startswith('#')] else: return [] except Exception as e: print(f"获取中国 IP 列表出错: {e}") if os.path.exists(cache_file): withopen(cache_file, 'r') as f: return [line.strip() for line in f if line.strip() andnot line.startswith('#')] return []
defis_cn_ip(ip): """检查 IP 是否在中国网络列表中""" global cn_networks, cn_networks_v6 try: ip_obj = ipaddress.ip_address(ip) if ip_obj.version == 4: # 首次运行时加载中国 IPv4 列表 ifnot cn_networks: china_ip_list = get_china_ip_list(is_ipv6=False) cn_networks = [ipaddress.ip_network(net) for net in china_ip_list] # 检查 IP 是否在任何网络中 for net in cn_networks: if ip_obj in net: returnTrue elif ip_obj.version == 6: # 首次运行时加载中国 IPv6 列表 ifnot cn_networks_v6: china_ipv6_list = get_china_ip_list(is_ipv6=True) cn_networks_v6 = [ipaddress.ip_network(net) for net in china_ipv6_list] # 检查 IP 是否在任何网络中 for net in cn_networks_v6: if ip_obj in net: returnTrue returnFalse except: returnFalse
defforward_dns_request(domain, dns_server, record_type): """将 DNS 请求转发到指定的 DNS 服务器""" try: # 正确使用 dnslib 创建 DNS 请求 from dnslib import DNSQuestion record = DNSRecord() record.add_question(DNSQuestion(domain, getattr(QTYPE, record_type))) packet = record.pack() s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(TIMEOUT) s.sendto(packet, (dns_server, 53)) reply_packet, _ = s.recvfrom(MAX_PACKET_SIZE) reply = DNSRecord.parse(reply_packet) return reply except Exception as e: print(f"查询 {dns_server} 关于 {domain} ({record_type}) 出错: {e}") returnNone
defhandle_dns_request(data, client_addr, sock): """处理 DNS 请求并返回智能选择的响应""" try: request = DNSRecord.parse(data) qname = str(request.q.qname) qtype = QTYPE[request.q.qtype] print(f"查询: {qname} ({qtype}) 来自 {client_addr[0]}") # 仅对 A 和 AAAA 记录使用智能逻辑 if qtype in ('A', 'AAAA'): # 查询两个 DNS 服务器 cn_reply = forward_dns_request(qname, CHINA_DNS, qtype) foreign_reply = forward_dns_request(qname, FOREIGN_DNS, qtype) # 提取回复中的 IP cn_ips = [] if cn_reply and cn_reply.rr: cn_ips = [str(rr.rdata) for rr in cn_reply.rr if rr.rtype == getattr(QTYPE, qtype)] foreign_ips = [] if foreign_reply and foreign_reply.rr: foreign_ips = [str(rr.rdata) for rr in foreign_reply.rr if rr.rtype == getattr(QTYPE, qtype)] # 检查任何 CN IP 是否实际在中国 cn_ips_in_china = [ip for ip in cn_ips if is_cn_ip(ip)] # 智能选择逻辑 if cn_ips_in_china: # 中国 DNS 返回了中国 IP,使用 CN 结果 print(f"为 {qname} ({qtype}) 使用中国 DNS") response = cn_reply elif foreign_ips: # 中国 DNS 没有返回中国 IP,但国外 DNS 有结果 # 可能是 DNS 污染,使用国外结果 print(f"为 {qname} ({qtype}) 使用国外 DNS") response = foreign_reply elif cn_ips: # 默认使用中国 DNS 结果(如果有) print(f"默认为 {qname} ({qtype}) 使用中国 DNS") response = cn_reply else: # 如果以上都失败,使用国外 DNS 或空响应 print(f"默认为 {qname} ({qtype}) 使用国外 DNS") response = foreign_reply if foreign_reply else cn_reply else: # 对于非 A/AAAA 记录,直接转发到国外 DNS response = forward_dns_request(qname, FOREIGN_DNS, qtype) ifnot response: response = forward_dns_request(qname, CHINA_DNS, qtype) # 如果有响应,发送它 if response: # 保持原始请求 ID response.header.id = request.header.id sock.sendto(response.pack(), client_addr) else: # 创建空响应 response = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=0, ra=1), q=request.q) sock.sendto(response.pack(), client_addr) except Exception as e: print(f"处理请求出错: {e}")