1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#!/usr/bin/env python3
# standalone_smart_dns.py - 轻量级 DNS 转发服务器,具有防污染能力

import socket
import threading
import ipaddress
import os
import time
import requests
import json
from dnslib import DNSRecord, DNSHeader, RR, QTYPE, A, AAAA

# 配置项
LISTEN_IP = "127.0.0.1"
LISTEN_PORT = 53
CHINA_DNS = "114.114.114.114"
FOREIGN_DNS = "1.1.1.1"
CHINA_IP_LIST_URL = "https://raw.githubusercontent.com/mayaxcn/china-ip-list/master/chnroute.txt"
CHINA_IPV6_LIST_URL = "https://raw.githubusercontent.com/mayaxcn/china-ip-list/master/chnroute_v6.txt"
CHINA_IP_CACHE_FILE = "/tmp/.china_ip_list.txt"
CHINA_IPV6_CACHE_FILE = "/tmp/.china_ipv6_list.txt"
CACHE_DURATION = 7 * 24 * 60 * 60 # 7 天秒数
MAX_PACKET_SIZE = 1024
TIMEOUT = 5

# 全局变量,存储 IP 网络
cn_networks = []
cn_networks_v6 = []

def get_china_ip_list(is_ipv6=False):
"""获取或更新中国 IP 列表"""
url = CHINA_IPV6_LIST_URL if is_ipv6 else CHINA_IP_LIST_URL
cache_file = CHINA_IPV6_CACHE_FILE if is_ipv6 else CHINA_IP_CACHE_FILE

# 检查缓存是否存在且未过期
if os.path.exists(cache_file):
file_age = time.time() - os.path.getmtime(cache_file)
if file_age < CACHE_DURATION:
with open(cache_file, 'r') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]

# 下载新列表
try:
response = requests.get(url)
if response.status_code == 200:
ip_list = [line.strip() for line in response.text.split('\n')
if line.strip() and not line.startswith('#')]

# 保存到缓存
with open(cache_file, 'w') as f:
f.write('\n'.join(ip_list))

return ip_list
else:
# 如果下载失败但缓存存在,使用缓存
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
else:
return []
except Exception as e:
print(f"获取中国 IP 列表出错: {e}")
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
return []

def is_cn_ip(ip):
"""检查 IP 是否在中国网络列表中"""
global cn_networks, cn_networks_v6

try:
ip_obj = ipaddress.ip_address(ip)

if ip_obj.version == 4:
# 首次运行时加载中国 IPv4 列表
if not cn_networks:
china_ip_list = get_china_ip_list(is_ipv6=False)
cn_networks = [ipaddress.ip_network(net) for net in china_ip_list]

# 检查 IP 是否在任何网络中
for net in cn_networks:
if ip_obj in net:
return True
elif ip_obj.version == 6:
# 首次运行时加载中国 IPv6 列表
if not cn_networks_v6:
china_ipv6_list = get_china_ip_list(is_ipv6=True)
cn_networks_v6 = [ipaddress.ip_network(net) for net in china_ipv6_list]

# 检查 IP 是否在任何网络中
for net in cn_networks_v6:
if ip_obj in net:
return True

return False
except:
return False

def forward_dns_request(domain, dns_server, record_type):
"""将 DNS 请求转发到指定的 DNS 服务器"""
try:
# 正确使用 dnslib 创建 DNS 请求
from dnslib import DNSQuestion
record = DNSRecord()
record.add_question(DNSQuestion(domain, getattr(QTYPE, record_type)))
packet = record.pack()

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(TIMEOUT)

s.sendto(packet, (dns_server, 53))
reply_packet, _ = s.recvfrom(MAX_PACKET_SIZE)

reply = DNSRecord.parse(reply_packet)
return reply
except Exception as e:
print(f"查询 {dns_server} 关于 {domain} ({record_type}) 出错: {e}")
return None

def handle_dns_request(data, client_addr, sock):
"""处理 DNS 请求并返回智能选择的响应"""
try:
request = DNSRecord.parse(data)
qname = str(request.q.qname)
qtype = QTYPE[request.q.qtype]

print(f"查询: {qname} ({qtype}) 来自 {client_addr[0]}")

# 仅对 A 和 AAAA 记录使用智能逻辑
if qtype in ('A', 'AAAA'):
# 查询两个 DNS 服务器
cn_reply = forward_dns_request(qname, CHINA_DNS, qtype)
foreign_reply = forward_dns_request(qname, FOREIGN_DNS, qtype)

# 提取回复中的 IP
cn_ips = []
if cn_reply and cn_reply.rr:
cn_ips = [str(rr.rdata) for rr in cn_reply.rr if rr.rtype == getattr(QTYPE, qtype)]

foreign_ips = []
if foreign_reply and foreign_reply.rr:
foreign_ips = [str(rr.rdata) for rr in foreign_reply.rr if rr.rtype == getattr(QTYPE, qtype)]

# 检查任何 CN IP 是否实际在中国
cn_ips_in_china = [ip for ip in cn_ips if is_cn_ip(ip)]

# 智能选择逻辑
if cn_ips_in_china:
# 中国 DNS 返回了中国 IP,使用 CN 结果
print(f"为 {qname} ({qtype}) 使用中国 DNS")
response = cn_reply
elif foreign_ips:
# 中国 DNS 没有返回中国 IP,但国外 DNS 有结果
# 可能是 DNS 污染,使用国外结果
print(f"为 {qname} ({qtype}) 使用国外 DNS")
response = foreign_reply
elif cn_ips:
# 默认使用中国 DNS 结果(如果有)
print(f"默认为 {qname} ({qtype}) 使用中国 DNS")
response = cn_reply
else:
# 如果以上都失败,使用国外 DNS 或空响应
print(f"默认为 {qname} ({qtype}) 使用国外 DNS")
response = foreign_reply if foreign_reply else cn_reply
else:
# 对于非 A/AAAA 记录,直接转发到国外 DNS
response = forward_dns_request(qname, FOREIGN_DNS, qtype)
if not response:
response = forward_dns_request(qname, CHINA_DNS, qtype)

# 如果有响应,发送它
if response:
# 保持原始请求 ID
response.header.id = request.header.id
sock.sendto(response.pack(), client_addr)
else:
# 创建空响应
response = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=0, ra=1), q=request.q)
sock.sendto(response.pack(), client_addr)

except Exception as e:
print(f"处理请求出错: {e}")

def run_dns_server():
"""运行 DNS 服务器"""
# 尝试 UDP 和 TCP 套接字
try:
# 用于 DNS 的 UDP 套接字
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_sock.bind((LISTEN_IP, LISTEN_PORT))
print(f"DNS 服务器监听 {LISTEN_IP}:{LISTEN_PORT} (UDP)")

while True:
data, addr = udp_sock.recvfrom(MAX_PACKET_SIZE)
threading.Thread(target=handle_dns_request, args=(data, addr, udp_sock)).start()

except Exception as e:
print(f"服务器错误: {e}")
finally:
if 'udp_sock' in locals():
udp_sock.close()

if __name__ == "__main__":
# 加载库
try:
import dnslib
except ImportError:
print("安装必要的包...")
import pip
pip.main(['install', 'dnslib', 'requests'])
import dnslib

print("启动智能 DNS 服务器...")
run_dns_server()

3. 配置 Windows 使用本地 DNS

  1. 打开“控制面板” -> “网络和共享中心” -> “更改适配器设置”
  2. 右键点击当前使用的网络连接,选择“属性”
  3. 双击“Internet 协议版本 4 (TCP/IPv4)”
  4. 选择“使用下面的 DNS 服务器地址”,并输入 127.0.0.1

4. 启动 DNS 服务器

在命令提示符中运行:

1
python C:\SmartDNS\standalone_smart_dns.py

结论

通过本文介绍的方案,用户可以在本地搭建一个轻量级的智能 DNS 分流系统,有效解决 DNS 污染问题,并根据网络环境自动选择最佳的解析结果。该方案不仅适用于 macOS,也可以在 Windows 系统上实现,提供了灵活的跨平台解决方案。

3. Create a Windows Service

To run a Python script as a Windows service, we need to use the NSSM (Non-Sucking Service Manager) tool:

  1. Download NSSM

  2. Extract it to a suitable directory, such as C:\SmartDNS\nssm

  3. Run the command prompt as an administrator, then execute:

    1
    2
    cd C:\SmartDNS\nssm\win64
    nssm.exe install SmartDNS
  4. In the configuration window that appears, set:

    • Path: C:\Windows\System32\python.exe
    • Startup directory: C:\SmartDNS
    • Arguments: C:\SmartDNS\standalone_smart_dns.py
    • Set the service display name and description in the “Details” tab
    • Choose “Local System account” in the “Log on” tab
    • Click “Install service”

Note: In Windows, port 53 is a privileged port, and the Python script needs to run with administrator privileges to bind to this port. The above service setup using “Local System account” can solve this issue. If you still encounter permission issues, consider changing the listening port in the script to a non-privileged port (such as 5353), and then use firewall rules to forward requests from UDP port 53 to 5353.

  1. Start the service:
    1
    sc start SmartDNS

4. Configure the System to Use Local DNS

  1. Open Control Panel > Network and Internet > Network and Sharing Center
  2. Click on the current active network connection
  3. Click “Properties”
  4. Select “Internet Protocol Version 4 (TCP/IPv4)” and click “Properties”
  5. Choose “Use the following DNS server addresses”
  6. Set the preferred DNS server to “127.0.0.1”
  7. Click “OK” to save the settings

5. Verification Test

Execute in the command prompt:

1
2
nslookup baidu.com 127.0.0.1
nslookup google.com 127.0.0.1

If the settings are correct, the domain names should resolve correctly.

6. Restore Default Settings

If you need to restore the default settings:

  1. Stop the DNS Service:

    1
    2
    sc stop SmartDNS
    sc delete SmartDNS
  2. Restore Default DNS Settings:

    • Open Control Panel > Network and Internet > Network and Sharing Center
    • Click on the current active network connection
    • Click “Properties”
    • Select “Internet Protocol Version 4 (TCP/IPv4)” and click “Properties”
    • Choose “Obtain DNS server address automatically”
    • Click “OK” to save the settings

Conclusion

The combination of local anti-pollution DNS and Layer 3 Tunnel provides us with an elegant solution that avoids DNS pollution issues while ensuring the best speed for accessing both domestic and international websites. This solution is particularly suitable for users who need to access both domestic and international resources simultaneously.

When you use both local anti-pollution DNS and a Layer 3 tunnel (configured with regional routing), you will gain the following advantages:

  1. Pollution-free Resolution: All domain names can obtain the correct IP address
  2. Efficient Access:
    • DNS queries for domestic websites go directly through the local network, obtaining the IP most suitable for your network environment
    • DNS queries for international websites go through the tunnel, avoiding pollution and obtaining CDN IPs close to the tunnel exit
    • Direct connection for domestic websites, fast speed
    • Tunnel connection for international websites, stable and reliable
  3. Fully Automatic Traffic Splitting: The system automatically determines which route to take, without the need to manually switch DNS or proxies

Comments