摘要

AI自动化流量正在进入表单、登录和注册等关键入口。对App来说,虚假注册和活动滥用不能只靠手机号或验证码判断。本文围绕IP欺诈风险查询,拆解App如何结合IP画像与行为规则做注册安全防护。

一、为什么App注册安全要重新看

App注册接口通常是增长入口,也是风控入口。新用户注册、注册送券、活动报名、首次登录、首次领权益,这些动作看似分散,但背后是依赖同一个问题:当前请求是否像一个正常用户行为。

CNNIC第57次《中国互联网络发展状况统计报告》显示,截至2025年12月,我国网民规模达11.25亿,互联网普及率达80.1%。线上应用规模越大,App注册、登录和活动接口承受的请求量也越复杂。

DataDome《2025 Global Bot Security Report》披露,2025年AI自动化流量中,64%触达表单,23%触达登录页,5%触达结账流程。对App来说,注册表单、登录入口和活动页面都需要更细的风险判断。

二、虚假注册与活动滥用的常见信号

App注册风险信号识别-异常注册与活动滥用判断示意图

第一,短时间多次注册。

同一IP、同一设备、同一城市来源在短时间内频繁提交注册请求,通常不应直接放行。系统可以先进入观察、限频或二次验证。

第二,注册城市与活动城市不一致。

如果活动只面向特定城市,但注册IP城市长期不在活动区域,系统可以降低自动通过置信度。这里不是按城市直接拦截,而是把位置作为风控分的一部分。

第三,新设备、新账号、新IP同时出现。

单个新信号不一定代表风险,但多个新信号叠加时,就适合触发额外核验。例如新设备首次注册,同时risk_score偏高,就不应直接发放活动权益。

第四,访问环境与用户行为不匹配。

通过IP欺诈风险查询,App可以获取usage_type、risk_score、risk_level、risk_tag等字段。如果访问环境与普通App注册场景不匹配,或风险等级偏高,应进入二次验证或人工复核。

三、IP画像与行为规则如何结合

单独看IP画像并不够,单独看行为规则也容易误伤。更稳妥的方式,是把IP画像、设备、手机号、注册频次、活动城市、账号历史一起放进同一套评分规则。

IP数据云这类服务可以通过IP数据接口返回城市、应用场景、风险评分、风险等级等字段。App后端拿到这些字段后,不需要把它们直接暴露给前端,而是用于服务端风险判断。

IP画像与行为规则结合-App注册风控接入链路示意图

下面示例演示注册接口如何结合IP画像和行为规则,判断是否放行、二次验证或人工复核。

Python

import os
import time
import uuid
import requests
from flask import Flask, request, jsonify, g

app = Flask(__name__)

IP_API_URL = os.getenv("IP_RISK_API_URL", "https://api.ipdatacloud.com/v2/query")
API_KEY = os.getenv("IPDATACLOUD_API_KEY", "")
TIMEOUT = (3, 5)
CACHE_TTL_SECONDS = int(os.getenv("IP_PROFILE_CACHE_TTL", "600"))

TRUST_FORWARD_HEADERS = os.getenv("TRUST_FORWARD_HEADERS", "false").lower() == "true"
TRUSTED_PROXY_IPS = {
    item.strip()
    for item in os.getenv("TRUSTED_PROXY_IPS", "").split(",")
    if item.strip()
}

DATACENTER_USAGE_TYPES = {"IDC", "CDN", "DNS"}
HIGH_RISK_LEVELS = {"high", "critical", "高风险"}

session = requests.Session()
profile_cache = {}


def clamp(value, min_value, max_value):
    return max(min_value, min(max_value, value))


def to_int(value, default=0):
    try:
        return int(float(value))
    except (TypeError, ValueError):
        return default


def to_bool(value):
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        return value.strip().lower() in {"1", "true", "yes", "y"}
    if isinstance(value, (int, float)):
        return value != 0
    return False


def normalize_city(city):
    city = str(city or "").strip()
    for suffix in ("市", "地区", "盟", "自治州"):
        if city.endswith(suffix):
            return city[: -len(suffix)]
    return city


def normalize_score(value):
    try:
        score = float(value)
    except (TypeError, ValueError):
        return 0.0
    return clamp(score, 0.0, 100.0)


def normalize_tags(value):
    if not value:
        return []
    if isinstance(value, list):
        return [str(item).strip() for item in value if str(item).strip()]
    if isinstance(value, str):
        return [item.strip() for item in value.split(",") if item.strip()]
    return []


def get_client_ip():
    remote_ip = request.remote_addr or ""
    if TRUST_FORWARD_HEADERS and remote_ip in TRUSTED_PROXY_IPS:
        forwarded = request.headers.get("X-Forwarded-For", "")
        if forwarded:
            return forwarded.split(",")[0].strip()
    return remote_ip


def get_cached_profile(ip):
    cached = profile_cache.get(ip)
    if not cached:
        return None
    expires_at, profile = cached
    if expires_at < time.time():
        profile_cache.pop(ip, None)
        return None
    return profile


def query_ip_profile(ip):
    if not ip:
        return None

    cached = get_cached_profile(ip)
    if cached:
        return cached

    try:
        response = session.get(
            IP_API_URL,
            params={"ip": ip, "key": API_KEY},
            timeout=TIMEOUT
        )
        response.raise_for_status()
        result = response.json()
    except (requests.RequestException, ValueError):
        return None

    if not isinstance(result, dict) or result.get("code") != 200:
        return None

    data = result.get("data") or {}
    if not isinstance(data, dict):
        return None

    risk_score = data.get("risk_score")
    if risk_score is None:
        risk_score = data.get("score")

    profile = {
        "city": normalize_city(data.get("city", "")),
        "usage_type": str(data.get("usage_type", "")).upper(),
        "risk_score": normalize_score(risk_score),
        "risk_level": str(data.get("risk_level", "")).lower(),
        "risk_tag": normalize_tags(data.get("risk_tag"))
    }
    profile_cache[ip] = (time.time() + CACHE_TTL_SECONDS, profile)
    return profile


def assess_register_risk(profile, payload):
    if not profile:
        return "extra_verify", ["ip profile unavailable"]

    points = 0
    reasons = []

    activity_city = normalize_city(payload.get("activity_city", ""))
    recent_register_count = clamp(
        to_int(payload.get("recent_register_count"), 0),
        0,
        100
    )
    device_is_new = to_bool(payload.get("device_is_new", False))
    phone_is_new = to_bool(payload.get("phone_is_new", False))

    if activity_city and profile["city"] and activity_city != profile["city"]:
        points += 20
        reasons.append("city mismatch")

    if recent_register_count >= 3:
        points += 25
        reasons.append("frequent registration")

    if device_is_new:
        points += 15
        reasons.append("new device")

    if phone_is_new:
        points += 15
        reasons.append("new phone")

    if profile["usage_type"] in DATACENTER_USAGE_TYPES:
        points += 25
        reasons.append("usage type mismatch")

    if profile["risk_tag"]:
        points += 20
        reasons.append("risk tag exists")

    if profile["risk_score"] >= 80 or profile["risk_level"] in HIGH_RISK_LEVELS:
        points += 50
        reasons.append("high ip risk")
    elif profile["risk_score"] >= 60:
        points += 25
        reasons.append("medium ip risk")

    if points >= 80:
        return "manual_review", reasons
    if points >= 40:
        return "extra_verify", reasons
    return "pass", reasons


def safe_profile(profile):
    if not profile:
        return None
    return {
        "city": profile.get("city", ""),
        "usage_type": profile.get("usage_type", ""),
        "risk_score": profile.get("risk_score", 0),
        "risk_level": profile.get("risk_level", ""),
        "risk_tag_count": len(profile.get("risk_tag", []))
    }


@app.route("/api/app/register/risk-check", methods=["POST"])
def register_risk_check():
    g.request_id = str(uuid.uuid4())
    payload = request.get_json(silent=True) or {}
    client_ip = get_client_ip()

    profile = query_ip_profile(client_ip)
    decision, reasons = assess_register_risk(profile, payload)

    app.logger.info({
        "event": "register_risk_check",
        "request_id": g.request_id,
        "decision": decision,
        "reasons": reasons,
        "ip_profile": safe_profile(profile)
    })

    return jsonify({
        "code": 200,
        "data": {
            "decision": decision,
            "request_id": g.request_id
        }
    })


if __name__ == "__main__":
    if not API_KEY:
        print("请设置环境变量 IPDATACLOUD_API_KEY")
        exit(1)
    app.run(port=8080, debug=False)

四、落地建议

App注册风控不应只依赖一个字段。低风险请求可以正常注册;中风险请求进入短信验证、图形验证或延迟发放权益;高风险请求进入人工复核或限制关键动作。

IP数据云的作用,是把访问IP转化为可解释的画像字段,帮助App在注册、登录和活动领取环节做更细的风险分层。实际生产中,还需要接入设备ID、手机号实名时间、账号历史、活动规则、接口频率和人工复核流程。

需要注意,生产环境不建议把完整风险原因直接返回给前端。更稳妥的方式是在客户端只返回decision和request_id,在服务端日志中保留脱敏画像和原因,方便审计与复盘。

总结

AI流量背景下,App注册安全的重点不是简单拦截更多请求,而是识别哪些请求需要额外核验。通过IP欺诈风险查询与行为规则结合,App可以在虚假注册、异常权益领取和活动接口防护之间建立更稳的判断框架,减少误伤,也降低注册入口被消耗的风险。

数据来源

  • • CNNIC:第57次《中国互联网络发展状况统计报告》
  • • DataDome:2025 Global Bot Security Report
  • • Arkose Labs:Threat Actor Behavior Analysis
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐