NTX Paydocs
Webhooks

实现

完整示例

import express from 'express';

interface PixWebhookPayload {
  event: 'CashIn' | 'CashOut' | 'CashInReversal' | 'CashOutReversal';
  status: 'PENDING' | 'CONFIRMED' | 'ERROR';
  transactionType: 'PIX';
  movementType: 'CREDIT' | 'DEBIT';
  transactionId: string;
  externalId: string | null;
  endToEndId: string;
  pixKey: string | null;
  feeAmount: number;
  originalAmount: number;
  finalAmount: number;
  processingDate: string;
  errorCode: string | null;
  errorMessage: string | null;
  counterpart?: Counterpart;
  parentTransaction?: ParentTransaction;
  metadata: Record<string, unknown>;
}

interface Counterpart {
  name: string;
  document: string;
  bank: {
    bankISPB: string | null;
    bankName: string | null;
    bankCode: string | null;
    accountBranch: string | null;
    accountNumber: string | null;
  };
}

interface ParentTransaction {
  transactionId: string;
  externalId: string;
  endToEndId: string;
  processingDate: string;
  wasTotalRefunded: boolean;
  remainingAmountForRefund: number;
  metadata: Record<string, unknown>;
  counterpart: Counterpart;
}

const app = express();
app.use(express.json());

// Basic Auth authentication middleware
function validateBasicAuth(
  req: express.Request,
  res: express.Response,
  next: express.NextFunction
) {
  const authHeader = req.headers.authorization;

  if (!authHeader || !authHeader.startsWith('Basic ')) {
    return res.status(401).json({ error: 'Unauthorized' });
  }

  const base64Credentials = authHeader.split(' ')[1];
  const credentials = Buffer.from(base64Credentials, 'base64').toString('ascii');
  const [username, password] = credentials.split(':');

  if (
    username !== process.env.WEBHOOK_USER ||
    password !== process.env.WEBHOOK_PASS
  ) {
    return res.status(401).json({ error: 'Invalid credentials' });
  }

  next();
}

// Set for idempotency control
const processedTransactions = new Set<string>();

app.post('/webhooks/pix', validateBasicAuth, async (req, res) => {
  const payload: PixWebhookPayload = req.body;

  // Respond quickly (webhook requires response within 10s)
  res.status(200).json({ acknowledged: true });

  // Check idempotency
  if (processedTransactions.has(payload.transactionId)) {
    console.log(`Transaction ${payload.transactionId} already processed`);
    return;
  }

  // Mark as processed
  processedTransactions.add(payload.transactionId);

  // Process asynchronously
  try {
    switch (payload.event) {
      case 'CashIn':
        await handleCashIn(payload);
        break;
      case 'CashOut':
        await handleCashOut(payload);
        break;
      case 'CashInReversal':
        await handleCashInReversal(payload);
        break;
      case 'CashOutReversal':
        await handleCashOutReversal(payload);
        break;
    }
  } catch (error) {
    console.error(`Error processing ${payload.event}:`, error);
    processedTransactions.delete(payload.transactionId);
  }
});

async function handleCashIn(payload: PixWebhookPayload) {
  console.log(`[CashIn] Received: R$ ${payload.finalAmount}`);
}

async function handleCashOut(payload: PixWebhookPayload) {
  console.log(`[CashOut] Sent: R$ ${payload.originalAmount}`);
}

async function handleCashInReversal(payload: PixWebhookPayload) {
  console.log(`[CashInReversal] Refunded: R$ ${payload.originalAmount}`);
}

async function handleCashOutReversal(payload: PixWebhookPayload) {
  console.log(`[CashOutReversal] Returned: R$ ${payload.finalAmount}`);
}

app.listen(3000);
from flask import Flask, request, jsonify
from functools import wraps
import base64
import os
from typing import Dict, Any, Optional
from dataclasses import dataclass

app = Flask(__name__)
processed_transactions: set = set()

@dataclass
class PixWebhookPayload:
    event: str
    status: str
    transaction_id: str
    external_id: Optional[str]
    end_to_end_id: str
    fee_amount: float
    original_amount: float
    final_amount: float
    counterpart: Optional[Dict[str, Any]]
    parent_transaction: Optional[Dict[str, Any]]

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'PixWebhookPayload':
        return cls(
            event=data.get('event'),
            status=data.get('status'),
            transaction_id=data.get('transactionId'),
            external_id=data.get('externalId'),
            end_to_end_id=data.get('endToEndId'),
            fee_amount=data.get('feeAmount', 0),
            original_amount=data.get('originalAmount', 0),
            final_amount=data.get('finalAmount', 0),
            counterpart=data.get('counterpart'),
            parent_transaction=data.get('parentTransaction'),
        )

def require_basic_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Basic '):
            return jsonify({'error': 'Unauthorized'}), 401
        try:
            credentials = base64.b64decode(auth_header.split(' ')[1]).decode('utf-8')
            username, password = credentials.split(':')
            if username != os.environ.get('WEBHOOK_USER') or password != os.environ.get('WEBHOOK_PASS'):
                return jsonify({'error': 'Invalid credentials'}), 401
        except Exception:
            return jsonify({'error': 'Invalid auth header'}), 401
        return f(*args, **kwargs)
    return decorated

@app.route('/webhooks/pix', methods=['POST'])
@require_basic_auth
def handle_pix_webhook():
    data = request.get_json()
    payload = PixWebhookPayload.from_dict(data)
    if payload.transaction_id in processed_transactions:
        return jsonify({'acknowledged': True}), 200
    processed_transactions.add(payload.transaction_id)
    if payload.event == 'CashIn':
        print(f"[CashIn] R$ {payload.final_amount:.2f}")
    elif payload.event == 'CashOut':
        print(f"[CashOut] R$ {payload.original_amount:.2f}")
    elif payload.event == 'CashInReversal':
        print(f"[CashInReversal] R$ {payload.original_amount:.2f}")
    elif payload.event == 'CashOutReversal':
        print(f"[CashOutReversal] R$ {payload.final_amount:.2f}")
    return jsonify({'acknowledged': True}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3000)
<?php

$WEBHOOK_USER = getenv('WEBHOOK_USER') ?: 'ntxpay';
$WEBHOOK_PASS = getenv('WEBHOOK_PASS') ?: 'secret';
$PROCESSED_FILE = '/tmp/processed_transactions.json';

function validateBasicAuth($user, $pass): bool {
    $authHeader = $_SERVER['HTTP_AUTHORIZATION'] ?? '';
    if (empty($authHeader) || !str_starts_with($authHeader, 'Basic ')) {
        return false;
    }
    $credentials = base64_decode(substr($authHeader, 6));
    list($u, $p) = explode(':', $credentials, 2);
    return $u === $user && $p === $pass;
}

function isProcessed($txId): bool {
    global $PROCESSED_FILE;
    if (!file_exists($PROCESSED_FILE)) return false;
    $processed = json_decode(file_get_contents($PROCESSED_FILE), true) ?? [];
    return in_array($txId, $processed);
}

function markProcessed($txId): void {
    global $PROCESSED_FILE;
    $processed = file_exists($PROCESSED_FILE)
        ? json_decode(file_get_contents($PROCESSED_FILE), true) ?? []
        : [];
    $processed[] = $txId;
    file_put_contents($PROCESSED_FILE, json_encode(array_slice($processed, -10000)));
}

if ($_SERVER['REQUEST_METHOD'] !== 'POST') {
    http_response_code(405);
    exit;
}

if (!validateBasicAuth($WEBHOOK_USER, $WEBHOOK_PASS)) {
    http_response_code(401);
    echo json_encode(['error' => 'Unauthorized']);
    exit;
}

$payload = json_decode(file_get_contents('php://input'), true);

http_response_code(200);
header('Content-Type: application/json');
echo json_encode(['acknowledged' => true]);

if (function_exists('fastcgi_finish_request')) {
    fastcgi_finish_request();
}

if (isProcessed($payload['transactionId'])) {
    exit;
}

markProcessed($payload['transactionId']);

switch ($payload['event']) {
    case 'CashIn':
        error_log("[CashIn] R$ " . $payload['finalAmount']);
        break;
    case 'CashOut':
        error_log("[CashOut] R$ " . $payload['originalAmount']);
        break;
    case 'CashInReversal':
        error_log("[CashInReversal] R$ " . $payload['originalAmount']);
        break;
    case 'CashOutReversal':
        error_log("[CashOutReversal] R$ " . $payload['finalAmount']);
        break;
}

幂等性

Webhooks 可能会发送多次(在重试的情况下)。实现幂等性处理以避免重复处理。

使用 transactionId 字段作为唯一键:

const isProcessed = await redis.get(`webhook:${payload.transactionId}`);

if (isProcessed) {
  console.log('Webhook already processed, ignoring');
  return;
}

await redis.set(`webhook:${payload.transactionId}`, '1', 'EX', 86400);

await processWebhook(payload);


最佳实践


重试机制

如果您的端点未在 10 秒内响应 HTTP 200:

尝试次数间隔累计时间
第 1 次立即0 分钟
第 2 次(第 1 次重试)5 分钟5 分钟
第 3 次(第 2 次重试)5 分钟10 分钟
第 4 次(第 3 次重试)15 分钟25 分钟

在 4 次不成功的尝试后(总时间约 25 分钟),webhook 将被移至死信队列(DLQ)。请实现定期轮询作为后备方案,以确保不会遗漏任何交易。


响应码

您的端点应返回适当的 HTTP 状态码:

状态码描述系统行为
2xx成功 (200, 201, 204 等)Webhook 已确认,不会重试
3xx重定向视为失败,将重试
4xx客户端错误视为失败,将重试
5xx服务器错误视为失败,将重试

系统仅验证 HTTP 状态码。任何 2xx 响应(200-299)都被视为成功,无论响应体内容如何。

本页目录