8
27
2024
24

使用 nftables 屏蔽大量 IP

本来我是用 iptables 来屏蔽恶意IP地址的。之所以不使用 ipset,是因为我不想永久屏蔽这些 IP。iptables 规则有命中计数,所以我可以根据最近是否命中来删除「已经变得正常、或者分配给了正常人使用」的 IP。但 iptables 规则有个问题是,它是 O(n) 的时间复杂度。对于反 spam 来说,几千上万条规则问题不大,而且很多 spam 来源是机房的固定 IP。但是以文件下载为主、要反刷下行流量的用途,一万条规则能把下载速率限制在 12MiB/s 左右,整个 CPU 核的时间都消耗在 softirq 上了。perf top 一看,时间都消耗在 ipt_do_table 函数里了。

行吧,临时先加补丁先:

iptables -I INPUT -m state --state RELATED,ESTABLISHED -j ACCEPT

这样让已建立的连接跳过后边上万条规则,就可以让正常的下载速度快起来了。

此时性能已经够用了。但是呢,还是时不时需要我手动操作一下,删除计数为零的规则、清零计数、合并恶意 IP 太多的网段。倒不是这些工作自动化起来有困难(好吧,让我用 Python 3.3 来实现可能是有些不便以至于至今我都没有动手),但是这台服务器上有新工具 nftables 可用,为什么不趁机试试看呢?

于是再次读了读 nft 的手册页,意外地发现,它竟然有个东西十分契合我的需求:它的 set 支持超时!于是开虚拟机对着文档调了半天规则,最终得到如下规则定义:

destroy table inet blocker

table inet blocker {
    set spam_ips {
        type ipv4_addr
        timeout 2d
        flags timeout, dynamic
    }
    set spam_ips6 {
        type ipv6_addr
        timeout 2d
        flags timeout, dynamic
    }

    chain input {
        type filter hook input priority 0; policy accept;

        ct state established,related accept
        ip saddr @spam_ips tcp dport { 80, 443 } update @spam_ips { ip saddr timeout 2d } drop
        ip6 saddr @spam_ips6 tcp dport { 80, 443 } update @spam_ips6 { ip6 saddr timeout 2d } drop
    }
}

nftables 是自己创建 table 的,不用和别人「共用一张桌子然后打架」啦。然后定义了两个动态的、支持超时的、默认超时时间是两天的 set。nftables 的 table 可以同时支持 IPv4 和 IPv6,但是规则和 set 不行,所以得写两份。在 chain 定义中设置 hook,就跟 iptables 的默认 chain 一样可以拿到包啦。然后,已建立的连接不用检查了,因为恶意 IP 还没学会连接复用。接下来,如果源 IP 位于 set 内并且是访问 HTTP(S) 的话,就更新 set 的超时时间,然后丢弃包。限制端口是为了避免万一哪天把自己给屏蔽掉了。nftables 的规则后边可以写多个操作,挺直观、易于理解的。

然后让自己的恶意 IP 识别脚本用 nft add element inet blocker spam_ips "{ $IP }" 这样的命令向 set 里添加要屏蔽的 IP 就可以啦。两天不再有请求过来的 IP 会被自动解除屏蔽,很适合国内的三大运营商的动态 IP 呢。

跑了几天,被屏蔽的 IP 数量稳定在 26k—28k 之间。有昼夜周期,凌晨零点多和早上六七点是爆发期,晚间是静默期。性能非常好,softirq 最高占用不到 10%。

nftables 也很好用。虽然 nft 的手册页有点难懂,多看几遍、了解其写作结构之后就好很多了。不过要是支持 IP 地址到 counter 的动态 map 就好了——我想统计各 IP 的流量。nftables 还自带 Python 绑定,虽说这 API 走 JSON 感觉怪怪的,libnftables-json(5) 这文档没有超链接也很难使用,但至少弄明白之后能用。我用来写了个简单的统计脚本:

#!/usr/bin/python3

import os
from math import log10
from itertools import groupby

import nftables

def show_set(nft, name):
  ret, r, error = nft.json_cmd({'nftables': [{'list': {'set': {'family': 'inet', 'table': 'blocker', 'name': name}}}]})
  if ret != 0:
    raise Exception(ret, error)
  try:
    elements = r['nftables'][1]['set']['elem']
  except KeyError: # empty set
    return
  ips = [(x['elem']['val'], x['elem']['expires']) for x in elements]
  ips.sort(key=lambda x: x[1])

  histo = []
  total = len(ips)
  for k, g in groupby(ips, key=lambda x: x[1] // 3600):
    count = sum(1 for _ in g)
    histo.append((k, count))
  max_count = max(x[1] for x in histo)
  w_count = int(log10(max_count)) + 1
  w = os.get_terminal_size().columns - 5 - w_count
  count_per_char = max_count / w
  # count_per_char = total / w
  print(f'>> Histogram for {name} (total {total}) <<')
  for hour, count in histo:
    print(f'{hour:2}: {f'{{:{w_count}}}'.format(count)} {'*' * int(round(count / count_per_char))}')
  print()

if __name__ == '__main__':
  nft = nftables.Nftables()
  show_set(nft, 'spam_ips6')
  show_set(nft, 'spam_ips')

最后,我本来想谴责用无辜开源设施来刷下行流量的行为的,但俗话说「人为财死」,算了。还是谴责一下运营商不顾社会责任、为了私利将压力转嫁给无辜群众好了。自私又短视的人类啊,总有一天会将互联网上的所有好东西都逼死,最后谁也得不到好处。

12
25
2018
7

Ant Design 彩蛋事件之我见

事情是这样的:一个叫「Ant Design」的大概有挺多人用的前端框架加入了一个彩蛋,在12月25日这天会更改按钮样式并修改提示文字为无意义的消息。

由于使用广泛,此事搞得比较严重,听说有程序员因此被开除,还有不少老板在跟甲方解释。事发之后,作者发布了道歉,并称「这完全是我个人的一意孤行且愚蠢的决定」。

确实很愚蠢,因为当事人似乎还没有意识到自己为什么「愚蠢」。在程序库中加入未预期的行为,是十分不负责任的表现。

库应当提供机制而非策略,并且具有良好定义的行为。软件中彩蛋这种东西由来已久,为什么这次影响这么大呢?其根本原因不在于它是开源软件,也不在于使用广泛,而是在于——它是库。库能不能提供彩蛋呢?是可以的,只要它是以 opt-in 的形式提供的,并且有文档明确其行为,使用方需要显式启用就没有任何问题。库的作者不需要知道圣诞节还可能在1月,也不需要知道代码是运行在哪个国家,他的职责应当是提供清晰明确的行为,而不是某天给你耍个花招。只有最终面向用户的应用才知道什么样的彩蛋对于它的用户是合适的,所以决定权在于应用。

至于那些在 issue 下边滥骂的人,真是给中国人丢脸唉。

PS: 关于「洋节」,shell909090 有篇文章《关于抵制洋节》。关于阿里巴巴,shell909090 还有篇《最近的阿里月饼事件》。

PPS: 每当国外发生这种令程序员们关注的事件的时候,Internet ArchiveArchive.is 上都有会很多的存档来保留历史,而国内发生这种事件,却并没有多少人去存档。大概有保存意思的人们也只是自私地本地截图保存了吧。

4
30
2016
11

愛される花 愛されぬ花

很早就听过这首歌,但现在才知道这首歌讲的是什么。很喜欢这首歌的歌词,不过像《ひとり上手》一样,好悲伤啊。

找到正确的对照版歌词并不是那么容易的,因为大陆网站莫名其妙地对日文汉字进行了「简化」。标注读音的就更难得了,所以我专门制作了标注汉字读音的中日对照版歌词,有兴趣的朋友可以看看:愛される花 愛されぬ花.pdf。感谢 farseerfc 进行校对。

我没有在 YouTube 上找到中岛美雪演唱的版本,所以只好用网易云音乐这个歌词并不正确版本啦: http://music.163.com/song?id=624802

泱泱大国,连周边国家的文字都不能写对,我也是醉了……

Category: 未分类 | Tags: 音乐 日语 中国特色
4
17
2016
6

用 nfqueue + Python 回复 IPv6 DNS 请求

发生了这么一件事:服务器访问某些 URL 时经常会花费好几秒的时间。重现并分析 strace 记录之后,发现是 DNS 的 AAAA 记录的问题。

情况是这样的:CentOS 6 默认启用了 IPv6,于是 glibc 就会同时进行 A 和 AAAA 记录查询。然后呢,上游 DNS 是运营商的,不给力,经常在解析 AAAA 记录时花费几秒甚至十几秒,然后超时或者返回个 SERVFAIL。

不过咱在天朝,也没有 IPv6 网络可用,所以就禁用 IPv6 吧。通过 sysctl 禁用 IPv6 无效。glibc 是通过创建 IPv6 套接字的方式来决定是否进行 AAAA 请求的。查了一下,禁用掉 ipv6 这个内核模块就可以了。可是,rmmod ipv6 报告模块正在使用中。

lsof -nPi | grep -i ipv6

会列出几个正在使用 IPv6 套接字的进程。重启服务,或者重启机器太麻烦了,也不知道会不会有进程会起不来……于是我想起了歪心思:既然是因为 AAAA 记录回应慢,而咱并不使用这个回应,那就及时伪造一个呗。

于是上 nfqueue。DNS 是基于 UDP 的,所以处理起来也挺简单。用的是 netfilterqueue 这个库。DNS 解析用的是 dnslib

因为 53 端口已经被 DNS 服务器占用了,所以想从这个地址发送回应还得用底层的方法。尝试过 scapy,然而包总是发不出去,Wireshark 显示 MAC 地址没有正确填写……实在弄不明白要怎么做,干脆用 RAW 套接字好了。man 7 raw 之后发现挺简单的嘛,因为它直接就支持 UDP 协议,不用自己处理 IP 头。UDP 头还是要自己处理的,8 字节,其中最麻烦的校验和可以填全零=w= 至于解析 nfqueue 那边过来的 IP 包,我只需要源地址就可以了,所以就直接从相应的偏移取了~(其实想想,好像 dnslib 也不需要呢~)

代码如下(Gist 上也放了一份):

#!/usr/bin/env python3

import socket
import struct
import traceback
import subprocess
import time
import signal

import dnslib
from dnslib import DNSRecord
from netfilterqueue import NetfilterQueue

AAAA = dnslib.QTYPE.reverse['AAAA']
udpsock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
PORT = 53

def handle_packet(pkt):
  s = time.time()
  try:
    ip = pkt.get_payload()
    # 28 = 20B IPv4 header + 8B UDP header
    dns = DNSRecord.parse(ip[28:])
    if dns.q.qtype == AAAA:
      ret = dns.reply()
      src = socket.inet_ntoa(ip[12:16])
      sport = struct.unpack('!H', ip[20:22])[0]
      p = ret.pack()
      # print(ret, p)
      checksum = 0
      p = struct.pack('!HHHH', PORT, sport, len(p) + 8, checksum) + p
      udpsock.sendto(p, (src, sport))
      pkt.drop()
    else:
      pkt.accept()
  except KeyboardInterrupt:
    pkt.accept()
    raise
  except Exception:
    traceback.print_exc()
    pkt.accept()
  e = time.time()
  print('%.3fms' % ((e - s) * 1000))

def main():
  nfqueue = NetfilterQueue()
  nfqueue.bind(1, handle_packet)
  try:
    nfqueue.run()
  except KeyboardInterrupt:
    print()

def quit(signum, sigframe):
  raise KeyboardInterrupt

if __name__ == '__main__':
  signal.signal(signal.SIGTERM, quit)
  signal.signal(signal.SIGQUIT, quit)
  signal.signal(signal.SIGHUP, quit)
  subprocess.check_call(['iptables', '-I', 'INPUT', '-p', 'udp', '-m', 'udp', '--dport', str(PORT), '-j', 'NFQUEUE', '--queue-num', '1'])
  try:
    main()
  finally:
    subprocess.check_call(['iptables', '-D', 'INPUT', '-p', 'udp', '-m', 'udp', '--dport', str(PORT), '-j', 'NFQUEUE', '--queue-num', '1'])

写好之后、准备部署前,我还担心了一下 Python 的执行效率——要是请求太多处理不过来就麻烦了,得搞多进程呢。看了一下,一个包只有 3ms 的处理时间。然后发现 Python 其实也没有那么慢嘛,绝大部分时候不到 1ms 就搞定了~

部署到咱的 DNS 服务器上之后,AAAA 记录回应迅速,再也不会慢了~

调试网络程序,Wireshark 就是好用!

PS: 后来有人告诉我改 gai.conf 也可以。我试了一下,如下设置并没有阻止 glibc 请求 AAAA 记录——它压根就没读这个文件!

precedence ::ffff:0:0/96  100

PPS: 我还发现发送这两个 DNS 请求,glibc 2.12 用了两次 sendto,但是 Arch Linux 上的 glibc 2.23 只用了一次 sendmmsg~所以大家还是尽量升级吧,有好处的呢。

Category: 网络 | Tags: python DNS iptables 中国特色
12
5
2015
21

博客「被入侵」之谜底

2015年9月13日,我的博客流量突然少了很多:

来自百度的访问量下降

因为博客突然被百度标记为「安全联盟提醒您:该页面可能已被非法篡改!」(后边那个峰的流量来源于知乎,与此事件并无关联。)

然而我一直未发现任何异常。我使用了 Google 站长工具,Google 也没说有任何异常。所以我以为这不过是百度又发了什么神经。毕竟它也说,「有一个网友举报」。「安全联盟」那里可以申请解封,但是需要出卖手机号等隐私。更神奇的是,它要求你修正问题,却连是什么问题都不清楚。「安全联盟」的客服表示,这事他们也不清楚。

然而几天前,又有网友报告我的博客跳转到了奇怪的页面。第一次有人报告时我只当用户系统或者网络的问题,虽然我也疑惑,恶意软件或者 ISP 插广告不至于插 CJB 的广告啊。但是对方并没有能力来调查此事。这次总算是遇到了一个会抓包的读者了。于是让报告者 @xuboying 帮忙抓包,这才真相大白。

简单地说,确实有页面在传输过程中被篡改了。至于是不是非法的,就得问「有关部门」了。

事情是这个样子的:我有一篇文章嵌入了 GitHub Gist。而 GitHub Gist 的域名 gist.github.com 被污染,其中一个污染 IP 为 216.234.179.13。这大概是 CJB 的源服务器地址。

本来呢,嵌入 GitHub Gist 的代码是这样子:

<script src="https://gist.github.com/lilydjwg/0bfa6807b88e6d39a995.js"></script>

当解析到 216.234.179.13 之后,最奇妙的事情发生了:

  1. CJB 使用了自签名、过期、弱密钥的证书。火狐会嫌它太弱而不可覆盖地拒绝,其它主流浏览器也会因为证书问题而报错。小米浏览器会询问用户是否接受有问题的证书(普通用户哪里懂这个啊,估计大都会选择接受吧)。一旦接受,则进入下一步。
  2. 该服务器在访问时会返回一句复合了 HTML 和 JavaScript 的脚本,修改window.location到 CJB 的主页。而引入 GitHub Gist 的方式恰好是 JavaScript 脚本,于是它得到执行,跳转到 CJB 主页去了……(不过现在只会返回空白页了。)

后来百度取消了那个「被非法篡改」的提示,不过权重依然很低,不注意看根本找不到我博客唉。

知道是 GitHub gist 的原因之后就可以很快找到遇到同样问题的人了,比如:

Category: 网络 | Tags: 网络 博客 安全 中国特色
10
10
2015
6

评知乎的「友善度」制度

最近有朋友贴了知乎上关于 wintercn 的「狗日的知乎」项目。我一向不喜欢粗话,但是此处例外。我不讨论 wintercn 的行为本身,而想看看这被众人骂的「友善度」制度是怎么回事。

知乎的「友善度」制度跟某国的一些社会制度有得一拼:这是屁股指挥大脑的又一次胜利。

首先,我们来看,「友善度」制度的目的何在?

作为一个问答论坛网站,知乎上必然会存在相冲突的观点。人非圣贤,这些具有相冲突观点的人必然会吵起来。知乎觉得这样子太不和谐了,于是搞了个「友善度」出来,希望借此来管制这些不「友善」的言论。

你骂人了扣分,说脏话了也扣分,让某人觉得不爽了也扣分。「友善度」太低在知乎上就会被限制了。

张三是一个普通人。有一次跟别人激烈辩论导致表示「友善度」的星星少了不少。他想尽快恢复「友善度」。怎么办呢?答案不是作出优质的回答、让很多人佩服、感激,然后点赞。那是 StackOverflow。知乎的答案是去举报别人。举报这一行为本身是不友善的,所以在 StackOverflow 你踩别人自己也是要掉 rep 的,这样子你在踩的时候就会慎重了。然而那只是墙外的花罢了。在知乎是这样子的:

知乎上的举报战

(图片来源于知乎网友,标注亦为其所加。)

「反正举报失败不扣友善度」,而成功了「友善度」会涨。这就是不要钱的彩票啊,中奖率还挺高的。

于是,张三在知乎四处游荡。看到李四和王五争论起来了呢。不管他们在争论什么,也不管他们的意见如何,看到稍微过激的言论就去举报。反正也没什么损失嘛。

李四很奇怪:好好的讨论怎么就被删掉了呢?但是李四是个友善的人,才不屑于互相打小报告那一套。然而李四因为观点鲜明,得罪了不少小人,最终被知乎禁言啦。他的墓志铭写着:「这是一个高尚的人」。

制度,不仅要看它做了什么,更重要的是它鼓励了什么

更新:知乎也默默地实行「选择性要求登录」策略了。文中引用到的回答不登陆看不到,难怪没有被删除。

Category: 未分类 | Tags: 知乎 中国特色
4
28
2015
15

再见,莫名其妙的知乎!

没想到会这样决定不再在知乎上产生新内容。虽然网站体验做得很差劲,其实知乎里有些人还是很不错的。

做出此决定的原因是知乎新实行的「不透明的审查制度」。

我在一个回答下边做出了可能会令某些人心里产生不适的内容:

更讽刺的是提问者叫 @不求人666 233

提问者名为「不求人666」,却提出了一个非常容易找到答案的问题,所以我才那么说。你若是见到亲友手上拿着钱包,却翻箱倒框地寻找,也是会大笑的是吧?

但是知乎认为这是「不友善行为」。什么叫「不友善行为」呢,知乎关于此规范的定义如下:

轻蔑:贬低、轻视他人及其劳动成果。

对别人做出的搞笑行为大笑是贬低还是轻视呢?那如果有人提出一个很基础的问题,我叫他去好好再读一遍基础教程,他就觉得自己受到了轻视,就可以来借此限制我在知乎上的行为了?

诽谤:捏造、散布虚假事实,损害他人名誉。

难道这个叫「不求人666」的用户不叫「不求人666」?

嘲讽:以比喻、夸张、侮辱性的手法对他人或其行为进行揭露或描述,以此来激怒他人。

哪里有比喻和夸张呢?我笑点低也不行吗?小小地嘲笑一下算「侮辱性的手法」?

挑衅:以不友好的方式激怒他人,意图使对方对自己的言论作出回应,蓄意制造事端。

什么是「不友善行为」?知乎答:「就是不友好的回应」。

羞辱:贬低他人的能力、行为、生理或身份特征,让对方难堪。

有么?名字是用户自己起的。

谩骂:以不文明的语言对他人进行负面评价。

「讽刺」这个词不文明么?

歧视:针对他人的民族、种族、宗教、性取向、性别、年龄、地域、生理特征等身份或者归类的攻击。


威胁:许诺以不良的后果来迫使他人服从自己的意志。

我通过「申诉」询问知乎管理员我的评论违反了哪一条。结果是:

知乎:不友善内容是指经知乎内部确认的不友善内容

如果真违反了哪一条规范,你要扣什么东西就扣吧。可是你好歹也告诉我到底违反了啥呀?


有位民警在大街上看着一个人不顺眼,于是上前:「你违法了,我要拘留你。」

那人问:「我违反什么法了?」

民警:「根据《XXX 法》,以下行为违法:一、闯红灯;二、随地乱扔垃圾;三、打架斗殴;四、其它违法行为。哦,你违反了这里的第四点。跟我走吧!」


2015年6月11日更新:另见《哔~知乎,离开的Q&A》

Category: 未分类 | Tags: 知乎 中国特色
4
28
2015
2

为什么你不应该安装12306的证书

中华人民共和国铁道部铁路客户服务中心在其首页上说:

为保障您顺畅购票,请下载安装根证书。

真的是这样吗?

安装该根证书意味着什么?

当然意味着你在12306网站上买票时不会遇到证书错误了。但是除此之外呢?

安装之后,你的计算机系统(或者浏览器)会信任该「CA 机构」所签名的所有证书。根据安装说明,该证书不仅能签名用于标识网站的身份的证书,还能签名应用程序,即 .exe 和 .dll 文件。

这意味着,如果铁道部没有按照规范正确管理该根证书对应的私钥的话,一旦被滥用,或者被攻击盗用,那么:

  1. 攻击者可以实施中间人攻击,伪装成支付宝、网上银行、微信、网页邮箱等网站,获取你的登录和隐私信息,篡改网页内容,以你的身份提交转账和订单、与你的亲友同事聊天等。

  2. 攻击者可以为病毒和木马签名。拥有一个受系统信任的签名,恶意软件更容易在用户不知不觉间潜入。

  3. 这是一个 SHA1 签名的证书。因为其安全性比较低,各大浏览器厂商将逐渐淘汰该类证书

  4. 当然还有一些其它的用法,毕竟这个证书的权限非常大

而铁道部能按照规范正确管理该根证书对应的私钥吗?很多人连工信部(CNNIC)的根证书都不信任呢。而工信部,即使出现过下级机构违规使用证书以至于 Google 和 Mozilla 都不再信任之,至少工信部曾经得到过各操作系统和浏览器厂商的信任,不管做没做到,人家至少知道应该怎么做。而铁道部呢?根本不是干这行的,也从未在这方面做过多少努力,连正规的 HTTPS 都不知道用,你觉得如何呢?

那要怎么用12306呢?

火狐有个功能,叫「添加证书例外」。在遇到不被信任的网站证书的时候,如果你确知你没有被骗,你可以为该网站添加例外,如图所示:

为 12306 添加证书例外

添加例外之后,火狐将这个证书这个网站关联起来。也就是说,如果12306突然换证书了,你会得到错误消息;如果别的网站也使用铁道部签名的证书,你也会得到错误消息。而这些错误消息,绝大部分是在告诉你有人正在进行中间人攻击。

HTTPS 保证了端到端的数据安全性(私密性、完整性),使得你即使在公共场合上网,或者本地网络有不可信的人时也可以安心上网。请不要随意破坏这份安全。

其它链接

11
29
2014
6

APEC 蓝与北京灰

APEC 蓝:

北京灰:

这不是月亮:

在玩过山车的空气质量:

6
17
2014
3

Google IP 可用性检测脚本

需要 Python 3.4+,一个参数用来选择测试搜索服务还是 GAE 服务。测试 GAE 服务的话需要先修改开头的两个变量。从标准输入读取 IP 地址或者 IP 段(形如 192.168.0.0/16)列表,每行一个。可用 IP 输出到标准输出。实时测试结果输出到标准错误。50 线程并发。

#!/usr/bin/env python3

import sys
from ipaddress import IPv4Network
import http.client as client
from concurrent.futures import ThreadPoolExecutor
import argparse
import ssl
import socket

# 先按自己的情况修改以下几行
APP_ID = 'your_id_here'
APP_PATH = '/fetch.py'

context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations('/etc/ssl/certs/ca-certificates.crt')

class HTTPSConnection(client.HTTPSConnection):
  def __init__(self, *args, hostname=None, **kwargs):
    self._hostname = hostname
    super().__init__(*args, **kwargs)

  def connect(self):
    super(client.HTTPSConnection, self).connect()

    if self._tunnel_host:
      server_hostname = self._tunnel_host
    else:
      server_hostname = self._hostname or self.host
      sni_hostname = server_hostname if ssl.HAS_SNI else None

    self.sock = self._context.wrap_socket(self.sock,
                                          server_hostname=sni_hostname)
    if not self._context.check_hostname and self._check_hostname:
      try:
        ssl.match_hostname(self.sock.getpeercert(), server_hostname)
      except Exception:
        self.sock.shutdown(socket.SHUT_RDWR)
        self.sock.close()
        raise

def check_ip_p(ip, func):
  if func(ip):
    print(ip, flush=True)

def check_for_gae(ip):
  return _check(APP_ID + '.appspot.com', APP_PATH, ip)

def check_for_search(ip):
  return _check('www.google.com', '/', ip)

def _check(host, path, ip):
  for chance in range(1,-1,-1):
    try:
      conn = HTTPSConnection(
        ip, timeout = 5,
        context = context,
        hostname = host,
      )
      conn.request('GET', path, headers = {
        'Host': host,
      })
      response = conn.getresponse()
      if response.status < 400:
        print('GOOD:', ip, file=sys.stderr)
      else:
        raise Exception('HTTP Error %s %s' % (
          response.status, response.reason))
      return True
    except KeyboardInterrupt:
      raise
    except Exception as e:
      if isinstance(e, ssl.CertificateError):
        print('WARN: %s is not Google\'s!' % ip, file=sys.stderr)
        chance = 0
      if chance == 0:
        print('BAD :', ip, e, file=sys.stderr)
        return False
      else:
        print('RE  :', ip, e, file=sys.stderr)

def main():
  parser = argparse.ArgumentParser(description='Check Google IPs')
  parser.add_argument('service', choices=['search', 'gae'],
                      help='service to check')
  args = parser.parse_args()
  func = globals()['check_for_' + args.service]

  count = 0
  with ThreadPoolExecutor(max_workers=50) as executor:
    for l in sys.stdin:
      l = l.strip()
      if '/' in l:
        for ip in IPv4Network(l).hosts():
          executor.submit(check_ip_p, str(ip), func)
          count += 1
      else:
        executor.submit(check_ip_p, l, func)
        count += 1
  print('%d IP checked.' % count)

if __name__ == '__main__':
  main()

脚本下载地址


2014年9月3日重要更新:由于失误,之前的脚本没有检查 SSL/TLS 证书,所以将错误的 IP 认为是可用的。现已更新。

Category: python | Tags: python google 网络 中国特色

Mastodon | Theme: Aeros 2.0 by TheBuckmaker.com