跳到正文

更多文章

影响力日常操作系统:21天习惯养成计划 从技能雇佣者到价值创造者 互惠账户的运营 影响力的三层架构 组织的注意力经济学
电商数据分析师面试题库:GMV诊断/用户分层/SQL实战(含答案框架)

本文来源于数据从业者全栈知识库,更多体系化内容请访问知识库。

电商行业数据工作特点

行业特色与挑战

电商数据工作核心特征

%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#f8f9fa", "primaryTextColor": "#2c3e50", "primaryBorderColor": "#c1c8cd", "lineColor": "#6c757d", "secondaryColor": "#e8f4f7", "tertiaryColor": "#ffffff", "background": "#fafafa", "mainBkg": "#ffffff", "secondBkg": "#f1f3f4", "nodeBorder": "#c1c8cd", "clusterBkg": "#f8f9fa", "defaultLinkColor": "#495057", "titleColor": "#212529", "nodeTextColor": "#343a40"}, "flowchart": {"curve": "stepAfter"}}}%%
flowchart TD
    A[电商数据工作特色] --> B[全链路数据闭环]
    A --> C[实时性要求高]
    A --> D[多维度复杂分析]
    A --> E[强调商业转化]
    B --> B1[用户行为追踪]
    B --> B2[商品全生命周期]
    B --> B3[交易完整链路]
    C --> C1[实时推荐]
    C --> C2[动态定价]
    C --> C3[库存监控]
    D --> D1[用户维度分析]
    D --> D2[商品维度分析]
    D --> D3[渠道维度分析]
    E --> E1[GMV提升]
    E --> E2[转化率优化]
    E --> E3[用户价值最大化]
    style A fill:#e1f5fe
    style B fill:#e8f5e8
    style C fill:#fff3e0
    style D fill:#f3e5f5
    style E fill:#fce4ec

核心业务指标体系

  • 流量指标:UV、PV、跳出率、停留时长、流量转化
  • 交易指标:GMV、订单量、客单价、转化率、复购率
  • 用户指标:新客获取、用户留存、用户价值、生命周期
  • 商品指标:商品转化、库存周转、价格弹性、销售预测
  • 运营指标:营销ROI、渠道效果、活动效果、客服效率

数据分析师面试题库

电商核心业务分析

题目1:GMV异常诊断分析(高频核心题)

场景:电商平台双11活动期间GMV同比下降15%,需要快速定位原因并制定应对策略。

期望回答框架

  1. 问题拆解分析(5分钟):
## GMV拆解分析框架
GMV = 访问用户数 × 转化率 × 客单价
### 第一层拆解
1. 流量分析:
- 总访问量:UV、PV变化
- 流量质量:跳出率、停留时长
- 流量结构:新老用户占比
2. 转化分析:
- 整体转化率:浏览→下单→支付
- 分渠道转化:APP、H5、小程序
- 分品类转化:不同商品类别
3. 客单价分析:
- 平均订单金额变化
- 购买商品数量变化
- 商品价格带分布
### 第二层拆解
#### 流量维度深入
- 渠道分析:自然流量、付费流量、社交流量
- 地域分析:一二三线城市表现差异
- 设备分析:移动端、PC端流量变化
- 时间分析:活动期间不同时段表现
#### 用户维度深入
- 用户分层:新用户、老用户、VIP用户
- 用户画像:年龄、性别、消费偏好
- 用户行为:浏览深度、加购行为、收藏行为
#### 商品维度深入
- 品类分析:服装、3C、家电等各品类表现
- 价格带分析:不同价格区间商品销售
- 品牌分析:自营vs第三方、品牌vs白牌
  1. 数据分析实现(8分钟):
-- GMV多维度分析SQL
WITH gmv_analysis AS (
SELECT
DATE(order_time) as order_date,
channel,
user_type,
category,
city_tier,
-- 基础指标
COUNT(DISTINCT user_id) as uv,
COUNT(DISTINCT order_id) as order_count,
SUM(order_amount) as gmv,
AVG(order_amount) as avg_order_value,
-- 转化相关
COUNT(DISTINCT CASE WHEN order_status = 'paid' THEN order_id END) as paid_orders,
SUM(CASE WHEN order_status = 'paid' THEN order_amount ELSE 0 END) as paid_gmv
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN products p ON o.product_id = p.product_id
WHERE DATE(order_time) BETWEEN '2024-11-01' AND '2024-11-15'
GROUP BY 1,2,3,4,5
),
-- 同比分析
yoy_comparison AS (
SELECT
channel,
user_type,
category,
-- 今年数据
SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) as gmv_2024,
SUM(CASE WHEN order_date >= '2024-11-11' THEN uv ELSE 0 END) as uv_2024,
-- 去年数据(需要关联历史表)
-- 这里简化处理,实际需要JOIN历史数据
SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15 as gmv_2023_est,
-- 计算同比变化
(SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) -
SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15) /
(SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15) as gmv_yoy_change
FROM gmv_analysis
GROUP BY 1,2,3
),
-- 漏斗转化分析
funnel_analysis AS (
SELECT
DATE(event_time) as event_date,
channel,
-- 流量漏斗
COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END) as pv_users,
COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) as cart_users,
COUNT(DISTINCT CASE WHEN event_type = 'checkout' THEN user_id END) as checkout_users,
COUNT(DISTINCT CASE WHEN event_type = 'payment' THEN user_id END) as payment_users,
-- 转化率计算
COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) * 1.0 /
NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END), 0) as pv_to_cart_rate,
COUNT(DISTINCT CASE WHEN event_type = 'payment' THEN user_id END) * 1.0 /
NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END), 0) as pv_to_payment_rate
FROM user_behavior_logs
WHERE DATE(event_time) BETWEEN '2024-11-01' AND '2024-11-15'
GROUP BY 1,2
)
-- 主查询:综合分析结果
SELECT
g.channel,
g.user_type,
g.category,
-- GMV表现
SUM(g.gmv) as total_gmv,
AVG(g.avg_order_value) as avg_order_value,
SUM(g.uv) as total_uv,
-- 同比变化
y.gmv_yoy_change,
-- 转化表现
AVG(f.pv_to_cart_rate) as avg_pv_to_cart_rate,
AVG(f.pv_to_payment_rate) as avg_pv_to_payment_rate
FROM gmv_analysis g
LEFT JOIN yoy_comparison y ON g.channel = y.channel
AND g.user_type = y.user_type
AND g.category = y.category
LEFT JOIN funnel_analysis f ON g.order_date = f.event_date
AND g.channel = f.channel
GROUP BY 1,2,3, y.gmv_yoy_change
ORDER BY total_gmv DESC;
  1. Python数据分析(5分钟):
# 电商GMV分析工具
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
class EcommerceGMVAnalyzer:
def __init__(self):
self.metrics = ['gmv', 'orders', 'users', 'aov']
def load_and_prepare_data(self, start_date, end_date):
"""加载和准备数据"""
# 这里模拟数据加载
# 实际场景中会从数据库或数据仓库加载
np.random.seed(42)
dates = pd.date_range(start_date, end_date, freq='D')
data = []
channels = ['organic', 'paid_search', 'social', 'direct']
categories = ['electronics', 'clothing', 'home', 'books']
for date in dates:
for channel in channels:
for category in categories:
# 模拟双11效应
is_1111 = date.strftime('%m-%d') == '11-11'
base_multiplier = 5 if is_1111 else 1
data.append({
'date': date,
'channel': channel,
'category': category,
'gmv': np.random.normal(10000, 2000) * base_multiplier,
'orders': np.random.poisson(100) * base_multiplier,
'users': np.random.poisson(80) * base_multiplier,
'pv': np.random.poisson(1000) * base_multiplier
})
df = pd.DataFrame(data)
df['aov'] = df['gmv'] / df['orders']
df['conversion_rate'] = df['orders'] / df['pv']
return df
def decompose_gmv_change(self, current_data, baseline_data):
"""GMV变化拆解分析"""
# 计算各组成部分的贡献
# 当前期间指标
current_gmv = current_data['gmv'].sum()
current_users = current_data['users'].sum()
current_orders = current_data['orders'].sum()
current_aov = current_data['gmv'].sum() / current_data['orders'].sum()
current_conversion = current_data['orders'].sum() / current_data['pv'].sum()
# 基准期间指标
baseline_gmv = baseline_data['gmv'].sum()
baseline_users = baseline_data['users'].sum()
baseline_orders = baseline_data['orders'].sum()
baseline_aov = baseline_data['gmv'].sum() / baseline_data['orders'].sum()
baseline_conversion = baseline_data['orders'].sum() / baseline_data['pv'].sum()
# GMV变化拆解
gmv_change = current_gmv - baseline_gmv
gmv_change_pct = gmv_change / baseline_gmv
# 用户数变化影响
user_impact = (current_users - baseline_users) * baseline_conversion * baseline_aov
# 转化率变化影响
conversion_impact = current_users * (current_conversion - baseline_conversion) * baseline_aov
# 客单价变化影响
aov_impact = current_users * current_conversion * (current_aov - baseline_aov)
decomposition = {
'total_change': gmv_change,
'total_change_pct': gmv_change_pct,
'user_impact': user_impact,
'conversion_impact': conversion_impact,
'aov_impact': aov_impact,
'user_impact_pct': user_impact / abs(gmv_change) if gmv_change != 0 else 0,
'conversion_impact_pct': conversion_impact / abs(gmv_change) if gmv_change != 0 else 0,
'aov_impact_pct': aov_impact / abs(gmv_change) if gmv_change != 0 else 0
}
return decomposition
def channel_performance_analysis(self, data):
"""渠道效果分析"""
channel_summary = data.groupby('channel').agg({
'gmv': 'sum',
'orders': 'sum',
'users': 'sum',
'pv': 'sum'
}).reset_index()
channel_summary['aov'] = channel_summary['gmv'] / channel_summary['orders']
channel_summary['conversion_rate'] = channel_summary['orders'] / channel_summary['pv']
channel_summary['gmv_per_user'] = channel_summary['gmv'] / channel_summary['users']
# 渠道效率排名
channel_summary['efficiency_score'] = (
channel_summary['conversion_rate'] * 0.4 +
channel_summary['aov'] / channel_summary['aov'].max() * 0.3 +
channel_summary['gmv_per_user'] / channel_summary['gmv_per_user'].max() * 0.3
)
return channel_summary.sort_values('efficiency_score', ascending=False)
def cohort_analysis(self, data, metric='gmv'):
"""队列分析"""
# 按注册时间分组用户,分析后续表现
# 这里简化处理,实际需要用户注册数据
weekly_data = data.groupby([
data['date'].dt.to_period('W'),
'channel'
])[metric].sum().unstack(fill_value=0)
# 计算环比变化
weekly_change = weekly_data.pct_change().fillna(0)
return weekly_data, weekly_change
def anomaly_detection(self, data, metric='gmv', threshold=2):
"""异常检测"""
# 使用Z-score方法检测异常
data_copy = data.copy()
# 按渠道和品类分组计算Z-score
for channel in data['channel'].unique():
for category in data['category'].unique():
mask = (data_copy['channel'] == channel) & (data_copy['category'] == category)
values = data_copy.loc[mask, metric]
mean_val = values.mean()
std_val = values.std()
if std_val > 0:
z_scores = np.abs((values - mean_val) / std_val)
data_copy.loc[mask, f'{metric}_zscore'] = z_scores
data_copy.loc[mask, f'{metric}_anomaly'] = z_scores > threshold
return data_copy
def generate_insights(self, decomposition, channel_performance):
"""生成业务洞察"""
insights = []
# GMV变化洞察
if decomposition['total_change_pct'] < -0.1:
insights.append(f"GMV同比下降{abs(decomposition['total_change_pct']):.1%},需要紧急关注")
# 找出主要影响因素
impacts = {
'用户数': decomposition['user_impact_pct'],
'转化率': decomposition['conversion_impact_pct'],
'客单价': decomposition['aov_impact_pct']
}
main_factor = max(impacts.items(), key=lambda x: abs(x[1]))
insights.append(f"主要影响因素是{main_factor[0]},贡献了{abs(main_factor[1]):.1%}的变化")
# 渠道表现洞察
best_channel = channel_performance.iloc[0]['channel']
worst_channel = channel_performance.iloc[-1]['channel']
insights.append(f"表现最好的渠道是{best_channel},效率分数{channel_performance.iloc[0]['efficiency_score']:.2f}")
insights.append(f"表现最差的渠道是{worst_channel},需要优化策略")
return insights
# 使用示例
def analyze_gmv_decline():
"""GMV下降分析示例"""
analyzer = EcommerceGMVAnalyzer()
# 加载数据
current_data = analyzer.load_and_prepare_data('2024-11-01', '2024-11-15')
baseline_data = analyzer.load_and_prepare_data('2023-11-01', '2023-11-15')
# 拆解分析
decomposition = analyzer.decompose_gmv_change(current_data, baseline_data)
# 渠道分析
channel_perf = analyzer.channel_performance_analysis(current_data)
# 异常检测
anomaly_data = analyzer.anomaly_detection(current_data)
# 生成洞察
insights = analyzer.generate_insights(decomposition, channel_perf)
return {
'decomposition': decomposition,
'channel_performance': channel_perf,
'anomaly_data': anomaly_data,
'insights': insights
}
  1. 解决方案建议(2分钟):
## 应对策略制定
### 短期应急措施(24小时内)
1. 流量补偿:
- 增加付费投放预算
- 调整推荐算法权重
- 启动站内流量位支持
2. 转化提升:
- 优化商品详情页
- 调整价格策略
- 增加优惠券发放
3. 用户召回:
- Push消息推送
- 短信营销触达
- 社群运营激活
### 中期优化措施(7天内)
1. 深度分析:
- 用户调研了解流失原因
- 竞品分析对比策略
- A/B测试验证优化方案
2. 产品优化:
- 页面加载速度优化
- 购买流程简化
- 个性化推荐精准度提升
### 长期建设措施(30天内)
1. 数据体系:
- 完善实时监控体系
- 建立异常预警机制
- 优化归因分析模型
2. 用户运营:
- 精细化用户分层
- 生命周期管理优化
- 用户价值提升策略

评分要点

  • 分析思路的系统性和逻辑性
  • SQL和Python代码的实用性
  • 业务理解的深度和准确性
  • 解决方案的可行性和针对性

题目2:用户生命周期价值分析(LTV)

场景:电商平台需要建立用户生命周期价值模型,指导获客投入和用户运营策略。

期望回答

  1. LTV模型设计(8分钟):
# 用户生命周期价值(LTV)分析模型
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
class CustomerLTVAnalyzer:
def __init__(self):
self.rfm_weights = {'recency': 0.2, 'frequency': 0.3, 'monetary': 0.5}
self.ltv_model = None
def calculate_rfm_features(self, transaction_data, analysis_date=None):
"""计算RFM特征"""
if analysis_date is None:
analysis_date = transaction_data['order_date'].max()
rfm_data = transaction_data.groupby('customer_id').agg({
'order_date': lambda x: (analysis_date - x.max()).days, # Recency
'order_id': 'count', # Frequency
'order_amount': ['sum', 'mean'] # Monetary
}).round(2)
rfm_data.columns = ['recency', 'frequency', 'monetary_total', 'monetary_avg']
rfm_data['monetary'] = rfm_data['monetary_total'] # 使用总金额作为M值
return rfm_data.reset_index()
def calculate_basic_ltv(self, transaction_data, prediction_period=365):
"""计算基础LTV(历史法)"""
customer_metrics = transaction_data.groupby('customer_id').agg({
'order_date': ['min', 'max', 'count'],
'order_amount': ['sum', 'mean'],
'order_id': 'count'
}).round(2)
customer_metrics.columns = [
'first_order_date', 'last_order_date', 'date_count',
'total_spent', 'avg_order_value', 'order_frequency'
]
# 计算生命周期长度(天)
customer_metrics['lifetime_days'] = (
customer_metrics['last_order_date'] - customer_metrics['first_order_date']
).dt.days + 1
# 计算年化指标
customer_metrics['orders_per_year'] = (
customer_metrics['order_frequency'] * 365 / customer_metrics['lifetime_days']
)
customer_metrics['annual_value'] = (
customer_metrics['avg_order_value'] * customer_metrics['orders_per_year']
)
# 简单LTV预测(假设行为延续)
customer_metrics['predicted_ltv'] = (
customer_metrics['annual_value'] * prediction_period / 365
)
return customer_metrics.reset_index()
def calculate_probabilistic_ltv(self, transaction_data):
"""概率型LTV模型(BG/NBD + Gamma-Gamma)"""
# 这里提供简化版本的概率模型
customer_summary = transaction_data.groupby('customer_id').agg({
'order_date': ['min', 'max', 'count'],
'order_amount': ['sum', 'mean', 'std']
}).round(2)
customer_summary.columns = [
'first_purchase', 'last_purchase', 'frequency',
'total_spent', 'avg_order_value', 'order_std'
]
# 计算关键参数
analysis_date = transaction_data['order_date'].max()
customer_summary['T'] = (analysis_date - customer_summary['first_purchase']).dt.days
customer_summary['recency'] = (customer_summary['last_purchase'] - customer_summary['first_purchase']).dt.days
# 简化的存活概率计算
customer_summary['survival_prob'] = np.exp(-customer_summary['recency'] / customer_summary['T'].clip(lower=1))
# 预期频率(简化)
customer_summary['expected_frequency'] = (
customer_summary['frequency'] * customer_summary['survival_prob'] / customer_summary['T'].clip(lower=1) * 365
)
# 概率型LTV
customer_summary['probabilistic_ltv'] = (
customer_summary['expected_frequency'] *
customer_summary['avg_order_value'] *
customer_summary['survival_prob']
)
return customer_summary.reset_index()
def segment_customers_by_ltv(self, ltv_data, n_segments=5):
"""基于LTV进行客户分层"""
# 使用K-means聚类
features = ['predicted_ltv', 'order_frequency', 'avg_order_value', 'recency']
# 数据标准化
scaler = StandardScaler()
scaled_features = scaler.fit_transform(ltv_data[features].fillna(0))
# K-means聚类
kmeans = KMeans(n_clusters=n_segments, random_state=42)
ltv_data['ltv_segment'] = kmeans.fit_predict(scaled_features)
# 计算各段特征
segment_summary = ltv_data.groupby('ltv_segment').agg({
'predicted_ltv': ['count', 'mean', 'median'],
'order_frequency': 'mean',
'avg_order_value': 'mean',
'recency': 'mean'
}).round(2)
# 给分段命名
segment_names = {
0: 'Champion', # 高价值高频
1: 'Loyal', # 忠实客户
2: 'Potential', # 潜力客户
3: 'At Risk', # 风险客户
4: 'Lost' # 流失客户
}
# 重新排序分段(按LTV从高到低)
segment_avg_ltv = ltv_data.groupby('ltv_segment')['predicted_ltv'].mean().sort_values(ascending=False)
segment_mapping = {old_id: new_id for new_id, (old_id, _) in enumerate(segment_avg_ltv.items())}
ltv_data['ltv_segment_ordered'] = ltv_data['ltv_segment'].map(segment_mapping)
ltv_data['segment_name'] = ltv_data['ltv_segment_ordered'].map(
{i: name for i, name in enumerate(['Champion', 'Loyal', 'Potential', 'At Risk', 'Lost'])}
)
return ltv_data, segment_summary
def calculate_clv_cohort(self, transaction_data):
"""队列LTV分析"""
# 按用户首次购买时间分组
first_purchase = transaction_data.groupby('customer_id')['order_date'].min().reset_index()
first_purchase.columns = ['customer_id', 'cohort_month']
first_purchase['cohort_month'] = first_purchase['cohort_month'].dt.to_period('M')
# 合并队列信息
transaction_with_cohort = transaction_data.merge(first_purchase, on='customer_id')
transaction_with_cohort['period_number'] = (
transaction_with_cohort['order_date'].dt.to_period('M') -
transaction_with_cohort['cohort_month']
).apply(attrgetter('n'))
# 计算累积LTV
cohort_ltv = transaction_with_cohort.groupby(['cohort_month', 'period_number']).agg({
'customer_id': 'nunique',
'order_amount': 'sum'
}).reset_index()
cohort_ltv['cumulative_ltv'] = cohort_ltv.groupby('cohort_month')['order_amount'].cumsum()
cohort_ltv['avg_ltv'] = cohort_ltv['cumulative_ltv'] / cohort_ltv['customer_id']
return cohort_ltv
def ltv_prediction_model(self, features, target_ltv):
"""LTV预测模型"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
# 特征工程
X = features'recency', 'frequency', 'monetary', 'avg_order_value',
'order_frequency', 'lifetime_days'.fillna(0)
y = target_ltv
# 训练测试分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 模型训练
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 模型评估
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# 特征重要性
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
self.ltv_model = model
return {
'model': model,
'mse': mse,
'r2': r2,
'feature_importance': feature_importance,
'predictions': y_pred
}
def calculate_cac_ltv_ratio(self, ltv_data, acquisition_cost_data):
"""计算CAC/LTV比率"""
# 合并获客成本数据
merged_data = ltv_data.merge(
acquisition_cost_data,
on=['customer_id'],
how='left'
)
# 计算CAC/LTV比率
merged_data['cac_ltv_ratio'] = merged_data['acquisition_cost'] / merged_data['predicted_ltv']
merged_data['ltv_cac_ratio'] = merged_data['predicted_ltv'] / merged_data['acquisition_cost']
# 按渠道分析
channel_analysis = merged_data.groupby('acquisition_channel').agg({
'acquisition_cost': 'mean',
'predicted_ltv': 'mean',
'cac_ltv_ratio': 'mean',
'ltv_cac_ratio': 'mean',
'customer_id': 'count'
}).round(2)
# 渠道健康度评估
channel_analysis['channel_health'] = np.where(
channel_analysis['ltv_cac_ratio'] > 3, 'Healthy',
np.where(channel_analysis['ltv_cac_ratio'] > 1, 'Acceptable', 'Unhealthy')
)
return merged_data, channel_analysis
# 使用示例和业务应用
def ltv_business_application():
"""LTV业务应用示例"""
analyzer = CustomerLTVAnalyzer()
# 模拟交易数据
np.random.seed(42)
# 生成模拟客户交易数据
customers = range(1, 1001)
transactions = []
for customer_id in customers:
# 模拟客户行为
first_order_date = pd.Timestamp('2023-01-01') + pd.Timedelta(days=np.random.randint(0, 365))
n_orders = np.random.poisson(5) + 1
for order_num in range(n_orders):
order_date = first_order_date + pd.Timedelta(days=np.random.exponential(30) * order_num)
order_amount = np.random.lognormal(mean=4, sigma=0.5)
transactions.append({
'customer_id': customer_id,
'order_id': f'ORD_{customer_id}_{order_num}',
'order_date': order_date,
'order_amount': order_amount
})
transaction_df = pd.DataFrame(transactions)
# 计算LTV
basic_ltv = analyzer.calculate_basic_ltv(transaction_df)
prob_ltv = analyzer.calculate_probabilistic_ltv(transaction_df)
# 客户分层
ltv_segments, segment_summary = analyzer.segment_customers_by_ltv(basic_ltv)
# 队列分析
cohort_ltv = analyzer.calculate_clv_cohort(transaction_df)
return {
'basic_ltv': basic_ltv,
'probabilistic_ltv': prob_ltv,
'segments': ltv_segments,
'segment_summary': segment_summary,
'cohort_ltv': cohort_ltv
}
  1. LTV应用策略(5分钟):
## LTV驱动的运营策略
### 获客策略优化
1. 渠道投放策略:
- 高LTV渠道:增加投放预算
- 中LTV渠道:优化投放精准度
- 低LTV渠道:减少投放或停止
2. 获客成本控制:
- CAC < LTV/3:健康获客
- CAC = LTV/3:盈亏平衡
- CAC > LTV/3:亏损获客
### 用户运营策略
1. Champion客户(高LTV):
- VIP专享服务
- 优先新品推荐
- 个性化定制服务
2. Loyal客户(中高LTV):
- 会员权益升级
- 交叉销售机会
- 推荐奖励计划
3. Potential客户(中LTV):
- 个性化推荐
- 使用教育内容
- 购买频次提升
4. At Risk客户(低LTV):
- 流失预警干预
- 专属优惠券
- 客服主动关怀
5. Lost客户(极低LTV):
- 召回活动
- 重新激活
- 成本控制
### 产品策略指导
1. 商品组合优化:
- 基于高LTV用户偏好调整SKU
- 开发高复购率商品
- 优化价格策略
2. 功能开发优先级:
- 提升高LTV用户体验功能
- 促进用户粘性的功能
- 降低流失风险的功能

题目3:推荐系统效果评估

场景:电商平台推荐系统上线后,如何评估推荐效果并进行优化?

评估框架

# 推荐系统效果评估框架
class RecommendationEvaluator:
def __init__(self):
self.metrics = {}
def calculate_accuracy_metrics(self, recommendations, actual_purchases):
"""计算准确性指标"""
metrics = {}
for k in [5, 10, 20]:
# Precision@K
precision_k = self.precision_at_k(recommendations, actual_purchases, k)
# Recall@K
recall_k = self.recall_at_k(recommendations, actual_purchases, k)
# F1@K
if precision_k + recall_k > 0:
f1_k = 2 * precision_k * recall_k / (precision_k + recall_k)
else:
f1_k = 0
metrics[f'precision@{k}'] = precision_k
metrics[f'recall@{k}'] = recall_k
metrics[f'f1@{k}'] = f1_k
# NDCG@K
for k in [5, 10, 20]:
metrics[f'ndcg@{k}'] = self.ndcg_at_k(recommendations, actual_purchases, k)
return metrics
def calculate_business_metrics(self, experiment_data, control_data):
"""计算业务指标"""
business_metrics = {}
# 点击率提升
exp_ctr = experiment_data['clicks'].sum() / experiment_data['impressions'].sum()
ctrl_ctr = control_data['clicks'].sum() / control_data['impressions'].sum()
business_metrics['ctr_lift'] = (exp_ctr - ctrl_ctr) / ctrl_ctr
# 转化率提升
exp_cvr = experiment_data['purchases'].sum() / experiment_data['clicks'].sum()
ctrl_cvr = control_data['purchases'].sum() / control_data['clicks'].sum()
business_metrics['cvr_lift'] = (exp_cvr - ctrl_cvr) / ctrl_cvr
# GMV提升
exp_gmv = experiment_data['purchase_amount'].sum()
ctrl_gmv = control_data['purchase_amount'].sum()
business_metrics['gmv_lift'] = (exp_gmv - ctrl_gmv) / ctrl_gmv
# 用户参与度
exp_engagement = experiment_data['session_length'].mean()
ctrl_engagement = control_data['session_length'].mean()
business_metrics['engagement_lift'] = (exp_engagement - ctrl_engagement) / ctrl_engagement
return business_metrics
def diversity_coverage_analysis(self, recommendations, item_catalog):
"""多样性和覆盖度分析"""
metrics = {}
# 推荐覆盖度:被推荐的商品占总商品的比例
recommended_items = set()
for user_recs in recommendations.values():
recommended_items.update(user_recs)
metrics['catalog_coverage'] = len(recommended_items) / len(item_catalog)
# 推荐多样性:用户推荐列表的多样性
user_diversity_scores = []
for user_id, user_recs in recommendations.items():
if len(user_recs) > 1:
diversity_score = self.calculate_intra_list_diversity(user_recs, item_catalog)
user_diversity_scores.append(diversity_score)
metrics['avg_diversity'] = np.mean(user_diversity_scores)
# 新颖性:推荐不热门商品的程度
item_popularity = item_catalog['purchase_count'] / item_catalog['purchase_count'].sum()
novelty_scores = []
for user_recs in recommendations.values():
rec_popularity = item_popularity[user_recs].mean()
novelty_scores.append(1 - rec_popularity) # 越不热门,新颖性越高
metrics['avg_novelty'] = np.mean(novelty_scores)
return metrics

商品运营分析

题目4:商品销售预测模型

场景:双11前需要预测各商品的销量,指导库存准备和促销策略。

预测模型设计

# 商品销售预测模型
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
import xgboost as xgb
class ProductSalesForecast:
def __init__(self):
self.models = {}
self.feature_importance = {}
def prepare_features(self, sales_data, product_data, promotion_data, external_data):
"""特征工程"""
# 合并基础数据
features_df = sales_data.merge(product_data, on='product_id', how='left')
features_df = features_df.merge(promotion_data, on=['product_id', 'date'], how='left')
features_df = features_df.merge(external_data, on='date', how='left')
# 时间特征
features_df['year'] = features_df['date'].dt.year
features_df['month'] = features_df['date'].dt.month
features_df['day'] = features_df['date'].dt.day
features_df['weekday'] = features_df['date'].dt.weekday
features_df['is_weekend'] = features_df['weekday'].isin([5, 6]).astype(int)
features_df['is_holiday'] = features_df['date'].isin(self.get_holidays()).astype(int)
# 滞后特征
for lag in [1, 7, 14, 30]:
features_df[f'sales_lag_{lag}'] = features_df.groupby('product_id')['sales'].shift(lag)
# 滚动统计特征
for window in [7, 14, 30]:
features_df[f'sales_mean_{window}d'] = features_df.groupby('product_id')['sales'].rolling(window).mean().reset_index(0, drop=True)
features_df[f'sales_std_{window}d'] = features_df.groupby('product_id')['sales'].rolling(window).std().reset_index(0, drop=True)
# 商品特征
features_df['price_change'] = features_df.groupby('product_id')['price'].pct_change()
features_df['days_since_launch'] = (features_df['date'] - features_df['launch_date']).dt.days
# 促销特征
features_df['has_promotion'] = features_df['promotion_type'].notna().astype(int)
features_df['discount_rate'] = features_df['discount_rate'].fillna(0)
# 竞争对手特征
features_df['competitor_avg_price'] = features_df.groupby(['category', 'date'])['price'].transform('mean')
features_df['price_competitiveness'] = features_df['price'] / features_df['competitor_avg_price']
# 外部特征
features_df['weather_score'] = features_df['temperature'] * 0.3 + features_df['humidity'] * 0.7
return features_df
def create_ensemble_model(self, X_train, y_train, X_val, y_val):
"""集成模型"""
models = {
'rf': RandomForestRegressor(n_estimators=100, random_state=42),
'gbm': GradientBoostingRegressor(n_estimators=100, random_state=42),
'xgb': xgb.XGBRegressor(n_estimators=100, random_state=42)
}
predictions = {}
model_weights = {}
# 训练各个模型
for name, model in models.items():
model.fit(X_train, y_train)
val_pred = model.predict(X_val)
val_mae = mean_absolute_error(y_val, val_pred)
predictions[name] = val_pred
model_weights[name] = 1 / (val_mae + 1e-6) # 权重与误差成反比
self.models[name] = model
# 归一化权重
total_weight = sum(model_weights.values())
model_weights = {k: v/total_weight for k, v in model_weights.items()}
# 加权平均预测
ensemble_pred = sum(predictions[name] * weight for name, weight in model_weights.items())
return ensemble_pred, model_weights
def predict_campaign_impact(self, base_forecast, campaign_features):
"""预测营销活动影响"""
# 基于历史活动数据训练影响因子模型
impact_multipliers = {}
for campaign_type in campaign_features['campaign_type'].unique():
if campaign_type in ['flash_sale', 'coupon', 'bundle']:
# 根据活动类型设定不同的提升系数
impact_multipliers[campaign_type] = {
'flash_sale': 2.5,
'coupon': 1.8,
'bundle': 1.3
}[campaign_type]
# 应用影响因子
adjusted_forecast = base_forecast.copy()
for idx, row in campaign_features.iterrows():
if row['campaign_type'] in impact_multipliers:
multiplier = impact_multipliers[row['campaign_type']]
# 考虑折扣力度
discount_factor = 1 + (row['discount_rate'] * 0.5)
adjusted_forecast[idx] *= multiplier * discount_factor
return adjusted_forecast
def demand_sensing(self, real_time_data):
"""实时需求感知"""
# 基于实时数据调整预测
# 计算实时转化指标
current_ctr = real_time_data['clicks'] / real_time_data['impressions']
current_cvr = real_time_data['orders'] / real_time_data['clicks']
# 与历史基准对比
historical_ctr = 0.05 # 历史平均CTR
historical_cvr = 0.08 # 历史平均CVR
ctr_factor = current_ctr / historical_ctr
cvr_factor = current_cvr / historical_cvr
# 调整因子
adjustment_factor = (ctr_factor * 0.4 + cvr_factor * 0.6)
return adjustment_factor
def inventory_optimization(self, forecast_data, inventory_constraints):
"""库存优化建议"""
optimization_results = []
for product_id in forecast_data['product_id'].unique():
product_forecast = forecast_data[forecast_data['product_id'] == product_id]
# 安全库存计算
avg_daily_sales = product_forecast['predicted_sales'].mean()
sales_std = product_forecast['predicted_sales'].std()
lead_time = inventory_constraints.get(product_id, {}).get('lead_time', 7)
service_level = 0.95 # 95%服务水平
from scipy.stats import norm
z_score = norm.ppf(service_level)
safety_stock = z_score * sales_std * np.sqrt(lead_time)
# 推荐库存量
forecast_period = len(product_forecast)
total_forecast = product_forecast['predicted_sales'].sum()
recommended_inventory = total_forecast + safety_stock
optimization_results.append({
'product_id': product_id,
'forecast_sales': total_forecast,
'safety_stock': safety_stock,
'recommended_inventory': recommended_inventory,
'current_inventory': inventory_constraints.get(product_id, {}).get('current_stock', 0),
'reorder_point': avg_daily_sales * lead_time + safety_stock
})
return pd.DataFrame(optimization_results)
# 价格弹性分析
class PriceElasticityAnalyzer:
def __init__(self):
self.elasticity_models = {}
def calculate_price_elasticity(self, sales_data):
"""计算价格弹性"""
elasticity_results = []
for product_id in sales_data['product_id'].unique():
product_data = sales_data[sales_data['product_id'] == product_id].copy()
if len(product_data) < 30: # 数据点太少
continue
# 对数线性回归计算弹性
product_data['log_sales'] = np.log(product_data['sales'] + 1)
product_data['log_price'] = np.log(product_data['price'])
# 控制其他变量
from sklearn.linear_model import LinearRegression
X = product_data'log_price', 'is_weekend', 'has_promotion'.fillna(0)
y = product_data['log_sales']
model = LinearRegression()
model.fit(X, y)
price_elasticity = model.coef_[0] # 价格系数即为弹性
elasticity_results.append({
'product_id': product_id,
'price_elasticity': price_elasticity,
'elasticity_interpretation': self.interpret_elasticity(price_elasticity),
'r_squared': model.score(X, y)
})
return pd.DataFrame(elasticity_results)
def interpret_elasticity(self, elasticity):
"""解释价格弹性"""
if abs(elasticity) < 0.5:
return 'inelastic' # 缺乏弹性
elif abs(elasticity) < 1.0:
return 'moderately_elastic' # 中度弹性
else:
return 'highly_elastic' # 高弹性
def optimal_pricing_strategy(self, elasticity_data, cost_data):
"""最优定价策略"""
pricing_recommendations = []
for _, row in elasticity_data.iterrows():
product_id = row['product_id']
elasticity = row['price_elasticity']
# 获取成本信息
cost = cost_data.get(product_id, {}).get('unit_cost', 0)
current_price = cost_data.get(product_id, {}).get('current_price', 0)
if elasticity < -1: # 富有弹性
# 降价策略:需求增加幅度大于价格下降幅度
recommended_change = -0.05 # 降价5%
strategy = 'reduce_price'
elif elasticity > -0.5: # 缺乏弹性
# 涨价策略:需求下降幅度小于价格上涨幅度
recommended_change = 0.08 # 涨价8%
strategy = 'increase_price'
else: # 中度弹性
# 维持策略
recommended_change = 0
strategy = 'maintain_price'
new_price = current_price * (1 + recommended_change)
expected_demand_change = elasticity * recommended_change
pricing_recommendations.append({
'product_id': product_id,
'current_price': current_price,
'recommended_price': new_price,
'price_change_pct': recommended_change,
'expected_demand_change_pct': expected_demand_change,
'strategy': strategy,
'elasticity': elasticity
})
return pd.DataFrame(pricing_recommendations)

推荐算法工程师面试题库

推荐系统设计

题目5:电商推荐系统架构设计(核心题目)

场景:为大型电商平台设计支持千万级用户的个性化推荐系统。

系统架构设计

# 电商推荐系统架构
class EcommerceRecommendationSystem:
def __init__(self):
self.user_profiles = {}
self.item_profiles = {}
self.models = {}
def multi_stage_recommendation(self, user_id, context=None):
"""多阶段推荐流程"""
# Stage 1: 召回层 (Recall)
candidate_items = self.recall_stage(user_id, context)
# Stage 2: 粗排层 (Coarse Ranking)
coarse_ranked_items = self.coarse_ranking_stage(user_id, candidate_items, context)
# Stage 3: 精排层 (Fine Ranking)
fine_ranked_items = self.fine_ranking_stage(user_id, coarse_ranked_items, context)
# Stage 4: 重排层 (Re-ranking)
final_recommendations = self.reranking_stage(user_id, fine_ranked_items, context)
return final_recommendations
def recall_stage(self, user_id, context=None, top_k=1000):
"""召回阶段:从全量商品中召回候选集"""
candidates = set()
# 1. 协同过滤召回
cf_candidates = self.collaborative_filtering_recall(user_id, top_k//4)
candidates.update(cf_candidates)
# 2. 内容召回
content_candidates = self.content_based_recall(user_id, top_k//4)
candidates.update(content_candidates)
# 3. 热门商品召回
popular_candidates = self.popularity_recall(user_id, context, top_k//4)
candidates.update(popular_candidates)
# 4. 深度学习召回
dl_candidates = self.deep_learning_recall(user_id, top_k//4)
candidates.update(dl_candidates)
return list(candidates)[:top_k]
def collaborative_filtering_recall(self, user_id, top_k):
"""协同过滤召回"""
# UserCF + ItemCF 混合
# UserCF: 基于用户相似度
similar_users = self.find_similar_users(user_id, top_k=100)
user_cf_items = []
for similar_user, similarity in similar_users:
user_items = self.get_user_items(similar_user)
current_user_items = set(self.get_user_items(user_id))
for item_id, rating in user_items:
if item_id not in current_user_items:
score = similarity * rating
user_cf_items.append((item_id, score))
# ItemCF: 基于物品相似度
user_items = self.get_user_items(user_id)
item_cf_items = []
for item_id, rating in user_items[-10:]: # 最近10个商品
similar_items = self.find_similar_items(item_id, top_k=20)
for similar_item, similarity in similar_items:
score = similarity * rating
item_cf_items.append((similar_item, score))
# 合并和排序
all_cf_items = user_cf_items + item_cf_items
cf_scores = {}
for item_id, score in all_cf_items:
if item_id in cf_scores:
cf_scores[item_id] += score
else:
cf_scores[item_id] = score
# 返回top_k商品
sorted_items = sorted(cf_scores.items(), key=lambda x: x[1], reverse=True)
return [item_id for item_id, score in sorted_items[:top_k]]
def deep_learning_recall(self, user_id, top_k):
"""深度学习召回 - 双塔模型"""
# 用户向量
user_embedding = self.get_user_embedding(user_id)
# 商品向量(预计算存储)
item_embeddings = self.load_item_embeddings()
# 计算相似度
similarities = np.dot(item_embeddings, user_embedding)
# 获取top_k
top_indices = np.argsort(similarities)[-top_k:][::-1]
top_items = [self.index_to_item_id[idx] for idx in top_indices]
return top_items
def fine_ranking_stage(self, user_id, candidate_items, context):
"""精排阶段:深度CTR预估模型"""
features = []
for item_id in candidate_items:
feature_vector = self.extract_ranking_features(user_id, item_id, context)
features.append(feature_vector)
# 使用深度CTR模型预测点击概率
click_probs = self.ctr_model.predict(features)
# 使用CVR模型预测转化概率
conversion_probs = self.cvr_model.predict(features)
# 综合排序分数
ranking_scores = []
for i, (click_prob, cvr_prob) in enumerate(zip(click_probs, conversion_probs)):
# CTCVR = CTR × CVR
ctcvr = click_prob * cvr_prob
# 考虑多目标优化
item_id = candidate_items[i]
item_price = self.get_item_price(item_id)
item_margin = self.get_item_margin(item_id)
# 综合分数:点击转化概率 + 商业价值
final_score = ctcvr * 0.7 + (item_price * item_margin * ctcvr) * 0.3
ranking_scores.append((item_id, final_score))
# 按分数排序
ranked_items = sorted(ranking_scores, key=lambda x: x[1], reverse=True)
return [item_id for item_id, score in ranked_items]
def reranking_stage(self, user_id, ranked_items, context):
"""重排阶段:多样性和业务规则"""
# 1. 多样性优化
diversified_items = self.diversification(ranked_items, user_id)
# 2. 业务规则过滤
filtered_items = self.apply_business_rules(diversified_items, user_id, context)
# 3. 实时调整
final_items = self.real_time_adjustment(filtered_items, user_id, context)
return final_items
def diversification(self, ranked_items, user_id, lambda_param=0.3):
"""MMR多样性优化"""
selected_items = []
remaining_items = ranked_items.copy()
# 第一个商品直接选择最高分的
if remaining_items:
selected_items.append(remaining_items.pop(0))
# 后续商品考虑多样性
while remaining_items and len(selected_items) < 20:
max_mmr_score = -1
best_item = None
best_index = -1
for i, candidate in enumerate(remaining_items):
# 相关性分数(原始排序分数)
relevance_score = self.get_item_relevance_score(candidate, user_id)
# 与已选商品的最大相似度
max_similarity = 0
for selected_item in selected_items:
similarity = self.calculate_item_similarity(candidate, selected_item)
max_similarity = max(max_similarity, similarity)
# MMR分数
mmr_score = lambda_param * relevance_score - (1 - lambda_param) * max_similarity
if mmr_score > max_mmr_score:
max_mmr_score = mmr_score
best_item = candidate
best_index = i
if best_item:
selected_items.append(best_item)
remaining_items.pop(best_index)
return selected_items
def extract_ranking_features(self, user_id, item_id, context):
"""提取排序特征"""
features = {}
# 用户特征
user_profile = self.get_user_profile(user_id)
features.update({
'user_age': user_profile.get('age', 0),
'user_gender': user_profile.get('gender', 0),
'user_city_tier': user_profile.get('city_tier', 0),
'user_purchase_power': user_profile.get('purchase_power', 0)
})
# 商品特征
item_profile = self.get_item_profile(item_id)
features.update({
'item_category': item_profile.get('category', 0),
'item_price': item_profile.get('price', 0),
'item_brand': item_profile.get('brand', 0),
'item_rating': item_profile.get('rating', 0),
'item_sales_volume': item_profile.get('sales_volume', 0)
})
# 用户-商品交互特征
features.update({
'user_item_category_preference': self.get_category_preference(user_id, item_profile.get('category')),
'user_item_brand_preference': self.get_brand_preference(user_id, item_profile.get('brand')),
'user_item_price_match': self.calculate_price_match(user_profile, item_profile)
})
# 上下文特征
if context:
features.update({
'hour_of_day': context.get('hour', 0),
'day_of_week': context.get('weekday', 0),
'is_weekend': context.get('is_weekend', 0),
'device_type': context.get('device', 0),
'page_type': context.get('page_type', 0)
})
# 统计特征
features.update({
'item_ctr_7d': self.get_item_ctr(item_id, days=7),
'item_cvr_7d': self.get_item_cvr(item_id, days=7),
'user_category_ctr': self.get_user_category_ctr(user_id, item_profile.get('category'))
})
return features
# CTR预估模型 - DeepFM实现
class DeepFMCTRModel:
def __init__(self, feature_dims, embedding_dim=8, hidden_dims=[256, 128, 64]):
self.feature_dims = feature_dims
self.embedding_dim = embedding_dim
self.hidden_dims = hidden_dims
self.model = self.build_model()
def build_model(self):
"""构建DeepFM模型"""
import tensorflow as tf
from tensorflow.keras import layers, Model
# 输入层
feature_inputs = []
embeddings = []
for i, dim in enumerate(self.feature_dims):
input_layer = layers.Input(shape=(1,), name=f'feature_{i}')
feature_inputs.append(input_layer)
# Embedding层
embedding = layers.Embedding(dim, self.embedding_dim)(input_layer)
embedding = layers.Flatten()(embedding)
embeddings.append(embedding)
# FM部分
# 一阶特征
first_order = layers.Concatenate()(feature_inputs)
first_order_output = layers.Dense(1, activation=None)(first_order)
# 二阶特征(交叉项)
embeddings_concat = layers.Concatenate()(embeddings)
# sum of squares
sum_square = layers.Lambda(lambda x: tf.square(tf.reduce_sum(
tf.reshape(x, (-1, len(self.feature_dims), self.embedding_dim)), axis=1
)))(embeddings_concat)
# square of sum
square_sum = layers.Lambda(lambda x: tf.reduce_sum(
tf.square(tf.reshape(x, (-1, len(self.feature_dims), self.embedding_dim))), axis=1
))(embeddings_concat)
# FM交叉项
cross_term = layers.Lambda(lambda x: 0.5 * tf.reduce_sum(x[0] - x[1], axis=1, keepdims=True))([sum_square, square_sum])
# Deep部分
deep_input = embeddings_concat
for hidden_dim in self.hidden_dims:
deep_input = layers.Dense(hidden_dim, activation='relu')(deep_input)
deep_input = layers.Dropout(0.3)(deep_input)
deep_output = layers.Dense(1, activation=None)(deep_input)
# 最终输出
output = layers.Add()([first_order_output, cross_term, deep_output])
output = layers.Activation('sigmoid')(output)
model = Model(inputs=feature_inputs, outputs=output)
return model
def train(self, X_train, y_train, X_val, y_val):
"""训练模型"""
self.model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['auc']
)
# 训练
history = self.model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=10,
batch_size=1024,
verbose=1
)
return history
def predict(self, X):
"""预测"""
return self.model.predict(X)
# 多目标学习模型
class MultiTaskLearningModel:
def __init__(self):
self.shared_layers = None
self.task_specific_layers = {}
def build_mmoe_model(self, feature_dims, num_experts=4, expert_dim=64):
"""Multi-gate Mixture-of-Experts模型"""
import tensorflow as tf
from tensorflow.keras import layers, Model
# 输入层
inputs = layers.Input(shape=(sum(feature_dims),))
# 专家网络
experts = []
for i in range(num_experts):
expert = layers.Dense(expert_dim, activation='relu', name=f'expert_{i}')(inputs)
experts.append(expert)
experts_concat = layers.Lambda(lambda x: tf.stack(x, axis=1))(experts)
# 门控网络
def build_gate(task_name):
gate = layers.Dense(num_experts, activation='softmax', name=f'gate_{task_name}')(inputs)
gate = layers.Reshape((num_experts, 1))(gate)
# 加权专家输出
weighted_expert = layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1))([experts_concat, gate])
return weighted_expert
# CTR任务
ctr_gate_output = build_gate('ctr')
ctr_output = layers.Dense(64, activation='relu')(ctr_gate_output)
ctr_output = layers.Dense(1, activation='sigmoid', name='ctr_output')(ctr_output)
# CVR任务
cvr_gate_output = build_gate('cvr')
cvr_output = layers.Dense(64, activation='relu')(cvr_gate_output)
cvr_output = layers.Dense(1, activation='sigmoid', name='cvr_output')(cvr_output)
# 模型
model = Model(inputs=inputs, outputs=[ctr_output, cvr_output])
return model

题目6:冷启动问题解决方案

场景:新用户和新商品的推荐策略设计。

解决方案

# 冷启动解决方案
class ColdStartSolver:
def __init__(self):
self.user_onboarding_model = None
self.item_content_model = None
def new_user_recommendation(self, user_basic_info, onboarding_behavior=None):
"""新用户推荐策略"""
recommendations = []
# 1. 基于人口统计学的推荐
demo_recs = self.demographic_based_recommendation(user_basic_info)
recommendations.extend(demo_recs)
# 2. 基于地理位置的推荐
location_recs = self.location_based_recommendation(user_basic_info.get('city'))
recommendations.extend(location_recs)
# 3. 热门商品推荐
popular_recs = self.popularity_based_recommendation(user_basic_info)
recommendations.extend(popular_recs)
# 4. 如果有引导行为,基于内容推荐
if onboarding_behavior:
content_recs = self.onboarding_based_recommendation(onboarding_behavior)
recommendations.extend(content_recs)
# 去重并排序
unique_recs = list(set(recommendations))
return unique_recs[:20]
def new_item_recommendation(self, item_info):
"""新商品推荐策略"""
# 1. 基于内容的用户匹配
similar_items = self.find_similar_items_by_content(item_info)
# 2. 找到喜欢相似商品的用户
target_users = []
for similar_item in similar_items:
users = self.get_item_users(similar_item)
target_users.extend(users)
# 3. 用户筛选和排序
user_scores = {}
for user_id in target_users:
score = self.calculate_user_item_match_score(user_id, item_info)
user_scores[user_id] = score
# 返回最匹配的用户
sorted_users = sorted(user_scores.items(), key=lambda x: x[1], reverse=True)
return [user_id for user_id, score in sorted_users[:100]]
def active_learning_strategy(self, user_id, candidate_items):
"""主动学习策略"""
# 选择最有信息量的商品让用户评价
# 1. 不确定性采样:选择预测不确定性最高的商品
uncertainties = []
for item_id in candidate_items:
prediction_variance = self.calculate_prediction_uncertainty(user_id, item_id)
uncertainties.append((item_id, prediction_variance))
# 2. 多样性采样:确保覆盖不同类别
diverse_items = self.ensure_diversity_sampling(uncertainties)
# 3. 返回推荐用于主动学习
return diverse_items[:5] # 推荐5个最有价值的商品让用户反馈

供应链数据科学家面试题库

需求预测与库存优化

题目7:库存优化模型(重点题目)

场景:电商平台需要优化商品库存,平衡缺货风险和库存成本。

优化模型设计

# 库存优化模型
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.stats import norm, poisson
class InventoryOptimization:
def __init__(self):
self.demand_models = {}
self.cost_parameters = {}
def newsvendor_model(self, demand_forecast, demand_std, unit_cost, selling_price, salvage_value=0):
"""报童模型 - 单周期库存优化"""
# 计算关键参数
overage_cost = unit_cost - salvage_value # 过量成本
underage_cost = selling_price - unit_cost # 不足成本
# 最优服务水平
optimal_service_level = underage_cost / (underage_cost + overage_cost)
# 最优订货量
z_score = norm.ppf(optimal_service_level)
optimal_order_quantity = demand_forecast + z_score * demand_std
# 期望利润计算
def expected_profit(order_qty):
# 蒙特卡洛模拟计算期望利润
simulations = 10000
profits = []
for _ in range(simulations):
demand = np.random.normal(demand_forecast, demand_std)
demand = max(0, demand) # 需求不能为负
sales = min(demand, order_qty)
leftover = max(0, order_qty - demand)
revenue = sales * selling_price + leftover * salvage_value
cost = order_qty * unit_cost
profit = revenue - cost
profits.append(profit)
return np.mean(profits)
optimal_profit = expected_profit(optimal_order_quantity)
return {
'optimal_order_quantity': optimal_order_quantity,
'optimal_service_level': optimal_service_level,
'expected_profit': optimal_profit,
'critical_ratio': optimal_service_level
}
def multi_period_inventory(self, demand_forecasts, holding_cost, stockout_cost, order_cost):
"""多周期库存优化"""
periods = len(demand_forecasts)
# 动态规划求解
def dp_inventory(period, current_inventory, total_cost=0):
if period >= periods:
return total_cost
min_cost = float('inf')
optimal_order = 0
# 尝试不同的订货量
max_order = max(demand_forecasts) * 2 # 设定合理的上界
for order_qty in range(0, int(max_order) + 1, 10): # 步长为10减少计算量
inventory_before_demand = current_inventory + order_qty
# 计算本期成本
period_order_cost = order_cost if order_qty > 0 else 0
period_holding_cost = holding_cost * inventory_before_demand
# 需求实现后的库存和缺货
demand = demand_forecasts[period]
inventory_after_demand = max(0, inventory_before_demand - demand)
stockout = max(0, demand - inventory_before_demand)
period_stockout_cost = stockout_cost * stockout
period_total_cost = period_order_cost + period_holding_cost + period_stockout_cost
# 递归计算后续期间成本
future_cost = dp_inventory(period + 1, inventory_after_demand, 0)
total_period_cost = period_total_cost + future_cost
if total_period_cost < min_cost:
min_cost = total_period_cost
optimal_order = order_qty
return min_cost
# 求解最优策略
optimal_cost = dp_inventory(0, 0)
return optimal_cost
def abc_xyz_analysis(self, sales_data):
"""ABC-XYZ分析进行库存分类管理"""
# 计算商品的销售额和变异系数
product_analysis = sales_data.groupby('product_id').agg({
'sales_amount': 'sum',
'sales_quantity': ['mean', 'std']
}).round(2)
product_analysis.columns = ['total_sales', 'avg_quantity', 'std_quantity']
# 计算变异系数
product_analysis['cv'] = product_analysis['std_quantity'] / product_analysis['avg_quantity']
product_analysis['cv'] = product_analysis['cv'].fillna(0)
# ABC分类(基于销售额)
sales_sorted = product_analysis.sort_values('total_sales', ascending=False)
sales_cumsum = sales_sorted['total_sales'].cumsum()
total_sales = sales_sorted['total_sales'].sum()
sales_cumsum_pct = sales_cumsum / total_sales
# ABC分类标准:A类80%,B类15%,C类5%
product_analysis['abc_category'] = 'C'
a_threshold = sales_cumsum_pct <= 0.8
b_threshold = (sales_cumsum_pct > 0.8) & (sales_cumsum_pct <= 0.95)
product_analysis.loc[sales_sorted[a_threshold].index, 'abc_category'] = 'A'
product_analysis.loc[sales_sorted[b_threshold].index, 'abc_category'] = 'B'
# XYZ分类(基于变异系数)
product_analysis['xyz_category'] = 'Z'
product_analysis.loc[product_analysis['cv'] <= 0.5, 'xyz_category'] = 'X' # 稳定
product_analysis.loc[(product_analysis['cv'] > 0.5) & (product_analysis['cv'] <= 1.0), 'xyz_category'] = 'Y' # 中等
# 组合分类
product_analysis['abc_xyz_category'] = product_analysis['abc_category'] + product_analysis['xyz_category']
# 库存策略建议
inventory_strategies = {
'AX': '高频盘点,精确预测,适量安全库存',
'AY': '中频盘点,预测+缓冲,中等安全库存',
'AZ': '低频盘点,大缓冲库存,高安全库存',
'BX': '中频盘点,正常库存管理',
'BY': '低频盘点,中等缓冲',
'BZ': '低频盘点,高缓冲',
'CX': '低频盘点,最小库存',
'CY': '极低频盘点,按需订货',
'CZ': '考虑淘汰或外包'
}
product_analysis['inventory_strategy'] = product_analysis['abc_xyz_category'].map(inventory_strategies)
return product_analysis
def safety_stock_calculation(self, demand_data, lead_time, service_level=0.95):
"""安全库存计算"""
results = []
for product_id in demand_data['product_id'].unique():
product_demand = demand_data[demand_data['product_id'] == product_id]['daily_demand']
# 需求统计
avg_demand = product_demand.mean()
demand_std = product_demand.std()
# 安全库存公式:SS = Z * σ * √L
z_score = norm.ppf(service_level)
safety_stock = z_score * demand_std * np.sqrt(lead_time)
# 再订货点:ROP = 平均需求 * 提前期 + 安全库存
reorder_point = avg_demand * lead_time + safety_stock
# 服务水平分析
current_service_level = 1 - norm.cdf(0, avg_demand * lead_time, demand_std * np.sqrt(lead_time))
results.append({
'product_id': product_id,
'avg_daily_demand': avg_demand,
'demand_std': demand_std,
'safety_stock': safety_stock,
'reorder_point': reorder_point,
'target_service_level': service_level,
'current_service_level': current_service_level
})
return pd.DataFrame(results)
# 供应链网络优化
class SupplyChainNetworkOptimization:
def __init__(self):
self.facilities = {}
self.transportation_costs = {}
def facility_location_optimization(self, demand_points, potential_facilities, fixed_costs, variable_costs):
"""设施选址优化"""
from scipy.optimize import linprog
n_facilities = len(potential_facilities)
n_customers = len(demand_points)
# 决策变量:x_ij(从设施i到客户j的供应量),y_i(是否开设施i)
# 目标函数系数
c = []
# 运输成本(x_ij变量)
for i in range(n_facilities):
for j in range(n_customers):
transport_cost = self.calculate_transport_cost(potential_facilities[i], demand_points[j])
c.append(transport_cost)
# 固定成本(y_i变量)
for i in range(n_facilities):
c.append(fixed_costs[i])
# 约束条件
A_eq = []
b_eq = []
# 需求满足约束:每个客户的需求必须被满足
for j in range(n_customers):
constraint = [0] * len(c)
for i in range(n_facilities):
constraint[i * n_customers + j] = 1
A_eq.append(constraint)
b_eq.append(demand_points[j]['demand'])
# 容量约束:供应量不能超过设施容量
A_ub = []
b_ub = []
for i in range(n_facilities):
constraint = [0] * len(c)
# 该设施的所有供应量
for j in range(n_customers):
constraint[i * n_customers + j] = 1
# 减去设施容量乘以开设指示变量
constraint[n_facilities * n_customers + i] = -potential_facilities[i]['capacity']
A_ub.append(constraint)
b_ub.append(0)
# 变量边界
bounds = []
# x_ij变量边界(供应量非负)
for i in range(n_facilities):
for j in range(n_customers):
bounds.append((0, None))
# y_i变量边界(二进制变量,这里放宽为0-1连续)
for i in range(n_facilities):
bounds.append((0, 1))
# 求解
result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
if result.success:
# 解析结果
solution = result.x
# 开设的设施
opened_facilities = []
facility_vars_start = n_facilities * n_customers
for i in range(n_facilities):
if solution[facility_vars_start + i] > 0.5: # 二进制变量阈值
opened_facilities.append(i)
# 分配方案
allocation = {}
for i in range(n_facilities):
if i in opened_facilities:
allocation[i] = {}
for j in range(n_customers):
supply = solution[i * n_customers + j]
if supply > 0.01: # 忽略极小值
allocation[i][j] = supply
return {
'opened_facilities': opened_facilities,
'allocation': allocation,
'total_cost': result.fun,
'optimization_successful': True
}
else:
return {
'optimization_successful': False,
'message': 'Optimization failed'
}
def calculate_transport_cost(self, facility, customer):
"""计算运输成本"""
# 简化的距离计算(实际应使用地理距离)
distance = np.sqrt(
(facility['lat'] - customer['lat'])**2 +
(facility['lng'] - customer['lng'])**2
)
# 运输成本 = 距离 * 单位距离成本 * 需求量
cost_per_km = 0.5
return distance * cost_per_km * customer['demand']

题目8:供应商评估与选择

场景:建立供应商综合评估体系,支持采购决策。

评估模型

# 供应商评估模型
class SupplierEvaluationSystem:
def __init__(self):
self.evaluation_criteria = {
'quality': 0.3, # 质量权重
'cost': 0.25, # 成本权重
'delivery': 0.2, # 交付权重
'service': 0.15, # 服务权重
'sustainability': 0.1 # 可持续性权重
}
def ahp_supplier_selection(self, suppliers_data, criteria_comparison_matrix):
"""层次分析法(AHP)供应商选择"""
# 1. 计算准则权重
criteria_weights = self.calculate_ahp_weights(criteria_comparison_matrix)
# 2. 计算各供应商在各准则下的得分
supplier_scores = {}
for criterion in self.evaluation_criteria.keys():
# 构建供应商在该准则下的比较矩阵
supplier_comparison = self.build_supplier_comparison_matrix(
suppliers_data, criterion
)
# 计算权重
supplier_weights = self.calculate_ahp_weights(supplier_comparison)
supplier_scores[criterion] = supplier_weights
# 3. 计算综合得分
final_scores = {}
for supplier in suppliers_data.keys():
score = 0
for criterion, weight in criteria_weights.items():
score += weight * supplier_scores[criterion][supplier]
final_scores[supplier] = score
# 排序
ranked_suppliers = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)
return {
'criteria_weights': criteria_weights,
'supplier_scores': supplier_scores,
'final_ranking': ranked_suppliers
}
def calculate_ahp_weights(self, comparison_matrix):
"""计算AHP权重"""
eigenvalues, eigenvectors = np.linalg.eig(comparison_matrix)
# 找到最大特征值对应的特征向量
max_eigenvalue_index = np.argmax(eigenvalues.real)
principal_eigenvector = eigenvectors[:, max_eigenvalue_index].real
# 归一化得到权重
weights = principal_eigenvector / np.sum(principal_eigenvector)
return weights
def supplier_risk_assessment(self, supplier_data, market_data):
"""供应商风险评估"""
risk_factors = {}
for supplier_id, data in supplier_data.items():
# 1. 财务风险
financial_risk = self.calculate_financial_risk(data['financial_metrics'])
# 2. 运营风险
operational_risk = self.calculate_operational_risk(data['operational_metrics'])
# 3. 地理风险
geographic_risk = self.calculate_geographic_risk(data['location'], market_data)
# 4. 市场风险
market_risk = self.calculate_market_risk(data['market_position'], market_data)
# 综合风险评分
total_risk = (
financial_risk * 0.3 +
operational_risk * 0.3 +
geographic_risk * 0.2 +
market_risk * 0.2
)
risk_factors[supplier_id] = {
'financial_risk': financial_risk,
'operational_risk': operational_risk,
'geographic_risk': geographic_risk,
'market_risk': market_risk,
'total_risk_score': total_risk,
'risk_level': self.categorize_risk_level(total_risk)
}
return risk_factors
def calculate_financial_risk(self, financial_metrics):
"""计算财务风险"""
# 财务指标评估
debt_ratio = financial_metrics.get('debt_ratio', 0)
current_ratio = financial_metrics.get('current_ratio', 1)
profit_margin = financial_metrics.get('profit_margin', 0)
# 风险评分(0-1,越高风险越大)
debt_risk = min(debt_ratio / 0.7, 1) # 负债率超过70%高风险
liquidity_risk = max(0, (2 - current_ratio) / 2) # 流动比率低于2有风险
profitability_risk = max(0, (0.05 - profit_margin) / 0.05) # 利润率低于5%有风险
financial_risk = (debt_risk + liquidity_risk + profitability_risk) / 3
return financial_risk

数据产品经理面试题库

电商数据产品设计

题目9:用户画像产品设计(核心题目)

场景:为电商平台设计用户画像产品,支持精准营销和个性化推荐。

产品设计要点

## 用户画像产品设计
### 1. 产品目标与价值
#### 业务目标
- 提升营销ROI:精准投放提升转化率30%
- 优化用户体验:个性化推荐提升点击率25%
- 降低获客成本:精准获客降低CAC 20%
- 提升用户价值:生命周期价值提升15%
#### 用户价值
- 营销人员:快速定位目标用户群体
- 产品经理:了解用户需求指导产品优化
- 运营人员:制定个性化运营策略
- 算法工程师:提供特征支持模型优化
### 2. 核心功能设计
#### 画像构建模块
- 标签体系管理:分层标签体系设计
- 数据源管理:多源数据整合清洗
- 特征工程:自动化特征提取计算
- 画像更新:实时和离线更新机制
#### 画像查询模块
- 用户检索:单用户画像详情查看
- 群体分析:用户群体特征分析
- 标签筛选:多维度条件组合查询
- 画像对比:不同用户群体对比
#### 应用服务模块
- API服务:实时画像数据接口
- 营销投放:人群包生成和投放
- 个性化推荐:特征数据支持
- 效果追踪:应用效果监控分析
### 3. 技术架构设计
#### 数据层
- 行为数据:点击、浏览、购买、搜索
- 交易数据:订单、支付、退款
- 内容数据:商品浏览、收藏、评价
- 外部数据:第三方数据补充
#### 计算层
- 离线计算:T+1批量画像更新
- 实时计算:关键标签实时更新
- 特征工程:自动化特征提取
- 模型服务:机器学习模型预测
#### 服务层
- 画像服务:RESTful API接口
- 查询服务:高性能查询引擎
- 推送服务:主动数据推送
- 监控服务:系统健康度监控
#### 应用层
- 管理后台:标签管理配置
- 查询平台:自助查询分析
- 开放平台:第三方集成
- 移动应用:移动端访问
### 4. 标签体系设计
#### 基础属性标签
- 人口统计:年龄、性别、地域、职业
- 设备信息:设备类型、操作系统、网络
- 注册信息:注册时间、渠道来源
#### 行为特征标签
- 访问行为:访问频次、时长、路径
- 购买行为:购买频次、金额、品类
- 互动行为:收藏、分享、评价
#### 偏好兴趣标签
- 品类偏好:服装、数码、家居偏好度
- 品牌偏好:品牌忠诚度、价格敏感度
- 内容偏好:关注内容类型和主题
#### 价值风险标签
- 价值标签:RFM价值、生命周期价值
- 风险标签:信用风险、流失风险
- 潜力标签:成长潜力、推荐价值
### 5. 产品迭代规划
#### MVP版本(3个月)
- 基础画像标签体系
- 单用户画像查询
- 简单的人群筛选
- 基础API服务
#### V1.0版本(6个月)
- 完整标签体系
- 高级查询和分析
- 营销应用集成
- 效果监控体系
#### V2.0版本(12个月)
- 实时画像更新
- 智能标签推荐
- 多场景应用优化
- 开放平台建设

题目10:数据产品商业化策略

场景:如何将内部数据产品商业化,开放给外部客户使用?

商业化策略

## 数据产品商业化策略
### 1. 市场机会分析
#### 目标市场
- 中小电商:缺乏数据能力的中小商家
- 品牌商:需要消费者洞察的品牌方
- 代理商:广告代理和营销服务商
- 开发者:需要数据API的开发者
#### 市场规模
- TAM:数据服务市场总规模
- SAM:可服务市场规模
- SOM:可获得市场份额
### 2. 产品定位策略
#### 核心价值主张
- 数据丰富度:覆盖亿级用户数据
- 实时性:毫秒级数据更新
- 准确性:算法模型保证精度
- 易用性:开箱即用的产品体验
#### 差异化优势
- vs 第三方数据公司:数据更新鲜、更准确
- vs 自建方案:成本更低、部署更快
- vs 通用解决方案:行业专业性更强
### 3. 商业模式设计
#### SaaS订阅模式
- 基础版:免费,限制调用量
- 专业版:月费999元,标准调用量
- 企业版:年费19999元,无限调用
#### API调用计费
- 按次计费:0.01元/次
- 包量计费:1万次/月 50元
- 流量计费:按数据传输量
#### 定制服务
- 数据定制:按需数据采集和处理
- 模型定制:专属算法模型开发
- 部署定制:私有化部署服务
### 4. 技术产品化
#### API标准化
- RESTful设计:标准HTTP接口
- 文档完善:详细的API文档
- SDK支持:多语言SDK支持
- 测试环境:沙箱测试环境
#### 平台化建设
- 开发者门户:注册、认证、管理
- 控制台:使用监控、账单管理
- 技术支持:在线客服、工单系统
- 社区建设:开发者社区和论坛
### 5. 商业化实施
#### 产品包装
- 产品命名:数据洞察云平台
- 品牌设计:专业的视觉形象
- 价值包装:ROI量化和案例展示
- 营销物料:产品手册、演示视频
#### 销售策略
- 在线销售:自助注册购买
- 直销团队:大客户直销
- 渠道合作:代理商分销
- 生态合作:与SI合作
#### 客户成功
- 客户导入:专业的实施团队
- 使用培训:产品使用培训
- 客户运营:定期回访和优化
- 续费管理:续费提醒和挽留

面试准备建议

电商行业核心能力

业务理解深度

# 电商数据岗位核心知识清单
## 电商业务模式
- [ ] 平台型电商:淘宝、京东模式理解
- [ ] 自营电商:网易严选、小米商城
- [ ] 社交电商:拼多多、微商模式
- [ ] 跨境电商:亚马逊、阿里国际
## 核心业务流程
- [ ] 用户生命周期:获客→激活→留存→变现→推荐
- [ ] 商品生命周期:上架→推广→销售→下架
- [ ] 订单流程:下单→支付→发货→收货→评价
- [ ] 供应链流程:采购→入库→分拣→配送
## 关键指标体系
- [ ] 流量指标:UV、PV、跳出率、转化率
- [ ] 交易指标:GMV、客单价、复购率
- [ ] 用户指标:LTV、CAC、留存率
- [ ] 运营指标:营销ROI、库存周转率
## 技术应用场景
- [ ] 个性化推荐:协同过滤、深度学习
- [ ] 搜索优化:搜索排序、查询理解
- [ ] 风控反欺诈:异常检测、图算法
- [ ] 供应链优化:需求预测、库存管理

技术能力要求

核心技术栈

# 电商数据技术栈
## 数据处理
- [ ] 实时计算:Flink、Kafka、Storm
- [ ] 离线计算:Spark、Hive、MapReduce
- [ ] 数据存储:HBase、Redis、ES
- [ ] 数据同步:DataX、Canal、Sqoop
## 算法模型
- [ ] 推荐算法:协同过滤、深度学习、强化学习
- [ ] 搜索算法:信息检索、排序学习
- [ ] 预测算法:时间序列、回归模型
- [ ] 优化算法:线性规划、启发式算法
## 工程能力
- [ ] 系统设计:高并发、高可用架构
- [ ] 性能优化:缓存、索引、分库分表
- [ ] 监控告警:系统监控、业务监控
- [ ] 部署运维:容器化、自动化部署

项目经验积累

推荐项目实践

  1. 电商推荐系统:端到端推荐系统开发
  2. 用户画像平台:多源数据整合和标签体系
  3. A/B测试平台:实验设计和效果评估
  4. 供应链优化:需求预测和库存优化
  5. 实时风控系统:异常检测和风险控制

学习资源推荐

  • 技术博客:美团技术团队、阿里技术
  • 开源项目:推荐系统、搜索引擎项目
  • 在线课程:机器学习、深度学习课程
  • 技术会议:电商技术大会、推荐系统会议

学习连接

前置知识

  • 推荐系统基础 - 推荐算法理论基础
  • 电商业务分析 - 电商业务理解

相关概念

  • 推荐系统技术栈 - 技术实现细节
  • 用户运营分析 - 用户分析方法

后续学习

  • 制造业面试题库 - 传统行业数据应用
  • 技能提升指南 - 持续能力建设


本文节选自数据从业者全栈知识库。知识库包含 2300+ 篇体系化技术文档,覆盖数据分析、数据工程、数据治理、AI 等全栈领域。了解更多 ->

Elazer (石头)
Elazer (石头)

11 年数据老兵,从分析师到架构专家。用真实经历帮数据人少走弯路。

加入免费社群

和数据从业者一起交流成长

了解详情 →

成为会员

解锁全部内容 + 知识库

查看权益 →
← 上一篇 职场认知 30|数据人的可持续发展:工作与生活如何真正实现平衡 下一篇 → 职场认知 31|从职业转型到突破:成长型思维的系统培养方法