本文来源于数据从业者全栈知识库,更多体系化内容请访问知识库。
电商行业数据工作特点
行业特色与挑战
电商数据工作核心特征:
%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#f8f9fa", "primaryTextColor": "#2c3e50", "primaryBorderColor": "#c1c8cd", "lineColor": "#6c757d", "secondaryColor": "#e8f4f7", "tertiaryColor": "#ffffff", "background": "#fafafa", "mainBkg": "#ffffff", "secondBkg": "#f1f3f4", "nodeBorder": "#c1c8cd", "clusterBkg": "#f8f9fa", "defaultLinkColor": "#495057", "titleColor": "#212529", "nodeTextColor": "#343a40"}, "flowchart": {"curve": "stepAfter"}}}%%
flowchart TD
A[电商数据工作特色] --> B[全链路数据闭环]
A --> C[实时性要求高]
A --> D[多维度复杂分析]
A --> E[强调商业转化]
B --> B1[用户行为追踪]
B --> B2[商品全生命周期]
B --> B3[交易完整链路]
C --> C1[实时推荐]
C --> C2[动态定价]
C --> C3[库存监控]
D --> D1[用户维度分析]
D --> D2[商品维度分析]
D --> D3[渠道维度分析]
E --> E1[GMV提升]
E --> E2[转化率优化]
E --> E3[用户价值最大化]
style A fill:#e1f5fe
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#f3e5f5
style E fill:#fce4ec
核心业务指标体系:
- 流量指标:UV、PV、跳出率、停留时长、流量转化
- 交易指标:GMV、订单量、客单价、转化率、复购率
- 用户指标:新客获取、用户留存、用户价值、生命周期
- 商品指标:商品转化、库存周转、价格弹性、销售预测
- 运营指标:营销ROI、渠道效果、活动效果、客服效率
数据分析师面试题库
电商核心业务分析
题目1:GMV异常诊断分析(高频核心题)
场景:电商平台双11活动期间GMV同比下降15%,需要快速定位原因并制定应对策略。
期望回答框架:
- 问题拆解分析(5分钟):
## GMV拆解分析框架GMV = 访问用户数 × 转化率 × 客单价
### 第一层拆解1. 流量分析: - 总访问量:UV、PV变化 - 流量质量:跳出率、停留时长 - 流量结构:新老用户占比
2. 转化分析: - 整体转化率:浏览→下单→支付 - 分渠道转化:APP、H5、小程序 - 分品类转化:不同商品类别
3. 客单价分析: - 平均订单金额变化 - 购买商品数量变化 - 商品价格带分布
### 第二层拆解#### 流量维度深入- 渠道分析:自然流量、付费流量、社交流量- 地域分析:一二三线城市表现差异- 设备分析:移动端、PC端流量变化- 时间分析:活动期间不同时段表现
#### 用户维度深入- 用户分层:新用户、老用户、VIP用户- 用户画像:年龄、性别、消费偏好- 用户行为:浏览深度、加购行为、收藏行为
#### 商品维度深入- 品类分析:服装、3C、家电等各品类表现- 价格带分析:不同价格区间商品销售- 品牌分析:自营vs第三方、品牌vs白牌- 数据分析实现(8分钟):
-- GMV多维度分析SQLWITH gmv_analysis AS ( SELECT DATE(order_time) as order_date, channel, user_type, category, city_tier,
-- 基础指标 COUNT(DISTINCT user_id) as uv, COUNT(DISTINCT order_id) as order_count, SUM(order_amount) as gmv, AVG(order_amount) as avg_order_value,
-- 转化相关 COUNT(DISTINCT CASE WHEN order_status = 'paid' THEN order_id END) as paid_orders, SUM(CASE WHEN order_status = 'paid' THEN order_amount ELSE 0 END) as paid_gmv
FROM orders o JOIN users u ON o.user_id = u.user_id JOIN products p ON o.product_id = p.product_id WHERE DATE(order_time) BETWEEN '2024-11-01' AND '2024-11-15' GROUP BY 1,2,3,4,5),
-- 同比分析yoy_comparison AS ( SELECT channel, user_type, category,
-- 今年数据 SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) as gmv_2024, SUM(CASE WHEN order_date >= '2024-11-11' THEN uv ELSE 0 END) as uv_2024,
-- 去年数据(需要关联历史表) -- 这里简化处理,实际需要JOIN历史数据 SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15 as gmv_2023_est,
-- 计算同比变化 (SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) - SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15) / (SUM(CASE WHEN order_date >= '2024-11-11' THEN gmv ELSE 0 END) * 1.15) as gmv_yoy_change
FROM gmv_analysis GROUP BY 1,2,3),
-- 漏斗转化分析funnel_analysis AS ( SELECT DATE(event_time) as event_date, channel,
-- 流量漏斗 COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END) as pv_users, COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) as cart_users, COUNT(DISTINCT CASE WHEN event_type = 'checkout' THEN user_id END) as checkout_users, COUNT(DISTINCT CASE WHEN event_type = 'payment' THEN user_id END) as payment_users,
-- 转化率计算 COUNT(DISTINCT CASE WHEN event_type = 'add_to_cart' THEN user_id END) * 1.0 / NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END), 0) as pv_to_cart_rate,
COUNT(DISTINCT CASE WHEN event_type = 'payment' THEN user_id END) * 1.0 / NULLIF(COUNT(DISTINCT CASE WHEN event_type = 'page_view' THEN user_id END), 0) as pv_to_payment_rate
FROM user_behavior_logs WHERE DATE(event_time) BETWEEN '2024-11-01' AND '2024-11-15' GROUP BY 1,2)
-- 主查询:综合分析结果SELECT g.channel, g.user_type, g.category,
-- GMV表现 SUM(g.gmv) as total_gmv, AVG(g.avg_order_value) as avg_order_value, SUM(g.uv) as total_uv,
-- 同比变化 y.gmv_yoy_change,
-- 转化表现 AVG(f.pv_to_cart_rate) as avg_pv_to_cart_rate, AVG(f.pv_to_payment_rate) as avg_pv_to_payment_rate
FROM gmv_analysis gLEFT JOIN yoy_comparison y ON g.channel = y.channel AND g.user_type = y.user_type AND g.category = y.categoryLEFT JOIN funnel_analysis f ON g.order_date = f.event_date AND g.channel = f.channelGROUP BY 1,2,3, y.gmv_yoy_changeORDER BY total_gmv DESC;- Python数据分析(5分钟):
# 电商GMV分析工具import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as snsfrom datetime import datetime, timedelta
class EcommerceGMVAnalyzer: def __init__(self): self.metrics = ['gmv', 'orders', 'users', 'aov']
def load_and_prepare_data(self, start_date, end_date): """加载和准备数据""" # 这里模拟数据加载 # 实际场景中会从数据库或数据仓库加载
np.random.seed(42) dates = pd.date_range(start_date, end_date, freq='D')
data = [] channels = ['organic', 'paid_search', 'social', 'direct'] categories = ['electronics', 'clothing', 'home', 'books']
for date in dates: for channel in channels: for category in categories: # 模拟双11效应 is_1111 = date.strftime('%m-%d') == '11-11' base_multiplier = 5 if is_1111 else 1
data.append({ 'date': date, 'channel': channel, 'category': category, 'gmv': np.random.normal(10000, 2000) * base_multiplier, 'orders': np.random.poisson(100) * base_multiplier, 'users': np.random.poisson(80) * base_multiplier, 'pv': np.random.poisson(1000) * base_multiplier })
df = pd.DataFrame(data) df['aov'] = df['gmv'] / df['orders'] df['conversion_rate'] = df['orders'] / df['pv']
return df
def decompose_gmv_change(self, current_data, baseline_data): """GMV变化拆解分析""" # 计算各组成部分的贡献
# 当前期间指标 current_gmv = current_data['gmv'].sum() current_users = current_data['users'].sum() current_orders = current_data['orders'].sum() current_aov = current_data['gmv'].sum() / current_data['orders'].sum() current_conversion = current_data['orders'].sum() / current_data['pv'].sum()
# 基准期间指标 baseline_gmv = baseline_data['gmv'].sum() baseline_users = baseline_data['users'].sum() baseline_orders = baseline_data['orders'].sum() baseline_aov = baseline_data['gmv'].sum() / baseline_data['orders'].sum() baseline_conversion = baseline_data['orders'].sum() / baseline_data['pv'].sum()
# GMV变化拆解 gmv_change = current_gmv - baseline_gmv gmv_change_pct = gmv_change / baseline_gmv
# 用户数变化影响 user_impact = (current_users - baseline_users) * baseline_conversion * baseline_aov
# 转化率变化影响 conversion_impact = current_users * (current_conversion - baseline_conversion) * baseline_aov
# 客单价变化影响 aov_impact = current_users * current_conversion * (current_aov - baseline_aov)
decomposition = { 'total_change': gmv_change, 'total_change_pct': gmv_change_pct, 'user_impact': user_impact, 'conversion_impact': conversion_impact, 'aov_impact': aov_impact, 'user_impact_pct': user_impact / abs(gmv_change) if gmv_change != 0 else 0, 'conversion_impact_pct': conversion_impact / abs(gmv_change) if gmv_change != 0 else 0, 'aov_impact_pct': aov_impact / abs(gmv_change) if gmv_change != 0 else 0 }
return decomposition
def channel_performance_analysis(self, data): """渠道效果分析""" channel_summary = data.groupby('channel').agg({ 'gmv': 'sum', 'orders': 'sum', 'users': 'sum', 'pv': 'sum' }).reset_index()
channel_summary['aov'] = channel_summary['gmv'] / channel_summary['orders'] channel_summary['conversion_rate'] = channel_summary['orders'] / channel_summary['pv'] channel_summary['gmv_per_user'] = channel_summary['gmv'] / channel_summary['users']
# 渠道效率排名 channel_summary['efficiency_score'] = ( channel_summary['conversion_rate'] * 0.4 + channel_summary['aov'] / channel_summary['aov'].max() * 0.3 + channel_summary['gmv_per_user'] / channel_summary['gmv_per_user'].max() * 0.3 )
return channel_summary.sort_values('efficiency_score', ascending=False)
def cohort_analysis(self, data, metric='gmv'): """队列分析""" # 按注册时间分组用户,分析后续表现 # 这里简化处理,实际需要用户注册数据
weekly_data = data.groupby([ data['date'].dt.to_period('W'), 'channel' ])[metric].sum().unstack(fill_value=0)
# 计算环比变化 weekly_change = weekly_data.pct_change().fillna(0)
return weekly_data, weekly_change
def anomaly_detection(self, data, metric='gmv', threshold=2): """异常检测""" # 使用Z-score方法检测异常 data_copy = data.copy()
# 按渠道和品类分组计算Z-score for channel in data['channel'].unique(): for category in data['category'].unique(): mask = (data_copy['channel'] == channel) & (data_copy['category'] == category) values = data_copy.loc[mask, metric]
mean_val = values.mean() std_val = values.std()
if std_val > 0: z_scores = np.abs((values - mean_val) / std_val) data_copy.loc[mask, f'{metric}_zscore'] = z_scores data_copy.loc[mask, f'{metric}_anomaly'] = z_scores > threshold
return data_copy
def generate_insights(self, decomposition, channel_performance): """生成业务洞察""" insights = []
# GMV变化洞察 if decomposition['total_change_pct'] < -0.1: insights.append(f"GMV同比下降{abs(decomposition['total_change_pct']):.1%},需要紧急关注")
# 找出主要影响因素 impacts = { '用户数': decomposition['user_impact_pct'], '转化率': decomposition['conversion_impact_pct'], '客单价': decomposition['aov_impact_pct'] }
main_factor = max(impacts.items(), key=lambda x: abs(x[1])) insights.append(f"主要影响因素是{main_factor[0]},贡献了{abs(main_factor[1]):.1%}的变化")
# 渠道表现洞察 best_channel = channel_performance.iloc[0]['channel'] worst_channel = channel_performance.iloc[-1]['channel']
insights.append(f"表现最好的渠道是{best_channel},效率分数{channel_performance.iloc[0]['efficiency_score']:.2f}") insights.append(f"表现最差的渠道是{worst_channel},需要优化策略")
return insights
# 使用示例def analyze_gmv_decline(): """GMV下降分析示例""" analyzer = EcommerceGMVAnalyzer()
# 加载数据 current_data = analyzer.load_and_prepare_data('2024-11-01', '2024-11-15') baseline_data = analyzer.load_and_prepare_data('2023-11-01', '2023-11-15')
# 拆解分析 decomposition = analyzer.decompose_gmv_change(current_data, baseline_data)
# 渠道分析 channel_perf = analyzer.channel_performance_analysis(current_data)
# 异常检测 anomaly_data = analyzer.anomaly_detection(current_data)
# 生成洞察 insights = analyzer.generate_insights(decomposition, channel_perf)
return { 'decomposition': decomposition, 'channel_performance': channel_perf, 'anomaly_data': anomaly_data, 'insights': insights }- 解决方案建议(2分钟):
## 应对策略制定### 短期应急措施(24小时内)1. 流量补偿: - 增加付费投放预算 - 调整推荐算法权重 - 启动站内流量位支持
2. 转化提升: - 优化商品详情页 - 调整价格策略 - 增加优惠券发放
3. 用户召回: - Push消息推送 - 短信营销触达 - 社群运营激活
### 中期优化措施(7天内)1. 深度分析: - 用户调研了解流失原因 - 竞品分析对比策略 - A/B测试验证优化方案
2. 产品优化: - 页面加载速度优化 - 购买流程简化 - 个性化推荐精准度提升
### 长期建设措施(30天内)1. 数据体系: - 完善实时监控体系 - 建立异常预警机制 - 优化归因分析模型
2. 用户运营: - 精细化用户分层 - 生命周期管理优化 - 用户价值提升策略评分要点:
- 分析思路的系统性和逻辑性
- SQL和Python代码的实用性
- 业务理解的深度和准确性
- 解决方案的可行性和针对性
题目2:用户生命周期价值分析(LTV)
场景:电商平台需要建立用户生命周期价值模型,指导获客投入和用户运营策略。
期望回答:
- LTV模型设计(8分钟):
# 用户生命周期价值(LTV)分析模型import pandas as pdimport numpy as npfrom datetime import datetime, timedeltafrom sklearn.preprocessing import StandardScalerfrom sklearn.cluster import KMeansimport matplotlib.pyplot as plt
class CustomerLTVAnalyzer: def __init__(self): self.rfm_weights = {'recency': 0.2, 'frequency': 0.3, 'monetary': 0.5} self.ltv_model = None
def calculate_rfm_features(self, transaction_data, analysis_date=None): """计算RFM特征""" if analysis_date is None: analysis_date = transaction_data['order_date'].max()
rfm_data = transaction_data.groupby('customer_id').agg({ 'order_date': lambda x: (analysis_date - x.max()).days, # Recency 'order_id': 'count', # Frequency 'order_amount': ['sum', 'mean'] # Monetary }).round(2)
rfm_data.columns = ['recency', 'frequency', 'monetary_total', 'monetary_avg'] rfm_data['monetary'] = rfm_data['monetary_total'] # 使用总金额作为M值
return rfm_data.reset_index()
def calculate_basic_ltv(self, transaction_data, prediction_period=365): """计算基础LTV(历史法)""" customer_metrics = transaction_data.groupby('customer_id').agg({ 'order_date': ['min', 'max', 'count'], 'order_amount': ['sum', 'mean'], 'order_id': 'count' }).round(2)
customer_metrics.columns = [ 'first_order_date', 'last_order_date', 'date_count', 'total_spent', 'avg_order_value', 'order_frequency' ]
# 计算生命周期长度(天) customer_metrics['lifetime_days'] = ( customer_metrics['last_order_date'] - customer_metrics['first_order_date'] ).dt.days + 1
# 计算年化指标 customer_metrics['orders_per_year'] = ( customer_metrics['order_frequency'] * 365 / customer_metrics['lifetime_days'] ) customer_metrics['annual_value'] = ( customer_metrics['avg_order_value'] * customer_metrics['orders_per_year'] )
# 简单LTV预测(假设行为延续) customer_metrics['predicted_ltv'] = ( customer_metrics['annual_value'] * prediction_period / 365 )
return customer_metrics.reset_index()
def calculate_probabilistic_ltv(self, transaction_data): """概率型LTV模型(BG/NBD + Gamma-Gamma)""" # 这里提供简化版本的概率模型
customer_summary = transaction_data.groupby('customer_id').agg({ 'order_date': ['min', 'max', 'count'], 'order_amount': ['sum', 'mean', 'std'] }).round(2)
customer_summary.columns = [ 'first_purchase', 'last_purchase', 'frequency', 'total_spent', 'avg_order_value', 'order_std' ]
# 计算关键参数 analysis_date = transaction_data['order_date'].max() customer_summary['T'] = (analysis_date - customer_summary['first_purchase']).dt.days customer_summary['recency'] = (customer_summary['last_purchase'] - customer_summary['first_purchase']).dt.days
# 简化的存活概率计算 customer_summary['survival_prob'] = np.exp(-customer_summary['recency'] / customer_summary['T'].clip(lower=1))
# 预期频率(简化) customer_summary['expected_frequency'] = ( customer_summary['frequency'] * customer_summary['survival_prob'] / customer_summary['T'].clip(lower=1) * 365 )
# 概率型LTV customer_summary['probabilistic_ltv'] = ( customer_summary['expected_frequency'] * customer_summary['avg_order_value'] * customer_summary['survival_prob'] )
return customer_summary.reset_index()
def segment_customers_by_ltv(self, ltv_data, n_segments=5): """基于LTV进行客户分层""" # 使用K-means聚类 features = ['predicted_ltv', 'order_frequency', 'avg_order_value', 'recency']
# 数据标准化 scaler = StandardScaler() scaled_features = scaler.fit_transform(ltv_data[features].fillna(0))
# K-means聚类 kmeans = KMeans(n_clusters=n_segments, random_state=42) ltv_data['ltv_segment'] = kmeans.fit_predict(scaled_features)
# 计算各段特征 segment_summary = ltv_data.groupby('ltv_segment').agg({ 'predicted_ltv': ['count', 'mean', 'median'], 'order_frequency': 'mean', 'avg_order_value': 'mean', 'recency': 'mean' }).round(2)
# 给分段命名 segment_names = { 0: 'Champion', # 高价值高频 1: 'Loyal', # 忠实客户 2: 'Potential', # 潜力客户 3: 'At Risk', # 风险客户 4: 'Lost' # 流失客户 }
# 重新排序分段(按LTV从高到低) segment_avg_ltv = ltv_data.groupby('ltv_segment')['predicted_ltv'].mean().sort_values(ascending=False) segment_mapping = {old_id: new_id for new_id, (old_id, _) in enumerate(segment_avg_ltv.items())}
ltv_data['ltv_segment_ordered'] = ltv_data['ltv_segment'].map(segment_mapping) ltv_data['segment_name'] = ltv_data['ltv_segment_ordered'].map( {i: name for i, name in enumerate(['Champion', 'Loyal', 'Potential', 'At Risk', 'Lost'])} )
return ltv_data, segment_summary
def calculate_clv_cohort(self, transaction_data): """队列LTV分析""" # 按用户首次购买时间分组 first_purchase = transaction_data.groupby('customer_id')['order_date'].min().reset_index() first_purchase.columns = ['customer_id', 'cohort_month'] first_purchase['cohort_month'] = first_purchase['cohort_month'].dt.to_period('M')
# 合并队列信息 transaction_with_cohort = transaction_data.merge(first_purchase, on='customer_id') transaction_with_cohort['period_number'] = ( transaction_with_cohort['order_date'].dt.to_period('M') - transaction_with_cohort['cohort_month'] ).apply(attrgetter('n'))
# 计算累积LTV cohort_ltv = transaction_with_cohort.groupby(['cohort_month', 'period_number']).agg({ 'customer_id': 'nunique', 'order_amount': 'sum' }).reset_index()
cohort_ltv['cumulative_ltv'] = cohort_ltv.groupby('cohort_month')['order_amount'].cumsum() cohort_ltv['avg_ltv'] = cohort_ltv['cumulative_ltv'] / cohort_ltv['customer_id']
return cohort_ltv
def ltv_prediction_model(self, features, target_ltv): """LTV预测模型""" from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score
# 特征工程 X = features'recency', 'frequency', 'monetary', 'avg_order_value', 'order_frequency', 'lifetime_days'.fillna(0) y = target_ltv
# 训练测试分割 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 )
# 模型训练 model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train)
# 模型评估 y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) r2 = r2_score(y_test, y_pred)
# 特征重要性 feature_importance = pd.DataFrame({ 'feature': X.columns, 'importance': model.feature_importances_ }).sort_values('importance', ascending=False)
self.ltv_model = model
return { 'model': model, 'mse': mse, 'r2': r2, 'feature_importance': feature_importance, 'predictions': y_pred }
def calculate_cac_ltv_ratio(self, ltv_data, acquisition_cost_data): """计算CAC/LTV比率""" # 合并获客成本数据 merged_data = ltv_data.merge( acquisition_cost_data, on=['customer_id'], how='left' )
# 计算CAC/LTV比率 merged_data['cac_ltv_ratio'] = merged_data['acquisition_cost'] / merged_data['predicted_ltv'] merged_data['ltv_cac_ratio'] = merged_data['predicted_ltv'] / merged_data['acquisition_cost']
# 按渠道分析 channel_analysis = merged_data.groupby('acquisition_channel').agg({ 'acquisition_cost': 'mean', 'predicted_ltv': 'mean', 'cac_ltv_ratio': 'mean', 'ltv_cac_ratio': 'mean', 'customer_id': 'count' }).round(2)
# 渠道健康度评估 channel_analysis['channel_health'] = np.where( channel_analysis['ltv_cac_ratio'] > 3, 'Healthy', np.where(channel_analysis['ltv_cac_ratio'] > 1, 'Acceptable', 'Unhealthy') )
return merged_data, channel_analysis
# 使用示例和业务应用def ltv_business_application(): """LTV业务应用示例""" analyzer = CustomerLTVAnalyzer()
# 模拟交易数据 np.random.seed(42)
# 生成模拟客户交易数据 customers = range(1, 1001) transactions = []
for customer_id in customers: # 模拟客户行为 first_order_date = pd.Timestamp('2023-01-01') + pd.Timedelta(days=np.random.randint(0, 365)) n_orders = np.random.poisson(5) + 1
for order_num in range(n_orders): order_date = first_order_date + pd.Timedelta(days=np.random.exponential(30) * order_num) order_amount = np.random.lognormal(mean=4, sigma=0.5)
transactions.append({ 'customer_id': customer_id, 'order_id': f'ORD_{customer_id}_{order_num}', 'order_date': order_date, 'order_amount': order_amount })
transaction_df = pd.DataFrame(transactions)
# 计算LTV basic_ltv = analyzer.calculate_basic_ltv(transaction_df) prob_ltv = analyzer.calculate_probabilistic_ltv(transaction_df)
# 客户分层 ltv_segments, segment_summary = analyzer.segment_customers_by_ltv(basic_ltv)
# 队列分析 cohort_ltv = analyzer.calculate_clv_cohort(transaction_df)
return { 'basic_ltv': basic_ltv, 'probabilistic_ltv': prob_ltv, 'segments': ltv_segments, 'segment_summary': segment_summary, 'cohort_ltv': cohort_ltv }- LTV应用策略(5分钟):
## LTV驱动的运营策略### 获客策略优化1. 渠道投放策略: - 高LTV渠道:增加投放预算 - 中LTV渠道:优化投放精准度 - 低LTV渠道:减少投放或停止
2. 获客成本控制: - CAC < LTV/3:健康获客 - CAC = LTV/3:盈亏平衡 - CAC > LTV/3:亏损获客
### 用户运营策略1. Champion客户(高LTV): - VIP专享服务 - 优先新品推荐 - 个性化定制服务
2. Loyal客户(中高LTV): - 会员权益升级 - 交叉销售机会 - 推荐奖励计划
3. Potential客户(中LTV): - 个性化推荐 - 使用教育内容 - 购买频次提升
4. At Risk客户(低LTV): - 流失预警干预 - 专属优惠券 - 客服主动关怀
5. Lost客户(极低LTV): - 召回活动 - 重新激活 - 成本控制
### 产品策略指导1. 商品组合优化: - 基于高LTV用户偏好调整SKU - 开发高复购率商品 - 优化价格策略
2. 功能开发优先级: - 提升高LTV用户体验功能 - 促进用户粘性的功能 - 降低流失风险的功能题目3:推荐系统效果评估
场景:电商平台推荐系统上线后,如何评估推荐效果并进行优化?
评估框架:
# 推荐系统效果评估框架class RecommendationEvaluator: def __init__(self): self.metrics = {}
def calculate_accuracy_metrics(self, recommendations, actual_purchases): """计算准确性指标""" metrics = {}
for k in [5, 10, 20]: # Precision@K precision_k = self.precision_at_k(recommendations, actual_purchases, k)
# Recall@K recall_k = self.recall_at_k(recommendations, actual_purchases, k)
# F1@K if precision_k + recall_k > 0: f1_k = 2 * precision_k * recall_k / (precision_k + recall_k) else: f1_k = 0
metrics[f'precision@{k}'] = precision_k metrics[f'recall@{k}'] = recall_k metrics[f'f1@{k}'] = f1_k
# NDCG@K for k in [5, 10, 20]: metrics[f'ndcg@{k}'] = self.ndcg_at_k(recommendations, actual_purchases, k)
return metrics
def calculate_business_metrics(self, experiment_data, control_data): """计算业务指标""" business_metrics = {}
# 点击率提升 exp_ctr = experiment_data['clicks'].sum() / experiment_data['impressions'].sum() ctrl_ctr = control_data['clicks'].sum() / control_data['impressions'].sum() business_metrics['ctr_lift'] = (exp_ctr - ctrl_ctr) / ctrl_ctr
# 转化率提升 exp_cvr = experiment_data['purchases'].sum() / experiment_data['clicks'].sum() ctrl_cvr = control_data['purchases'].sum() / control_data['clicks'].sum() business_metrics['cvr_lift'] = (exp_cvr - ctrl_cvr) / ctrl_cvr
# GMV提升 exp_gmv = experiment_data['purchase_amount'].sum() ctrl_gmv = control_data['purchase_amount'].sum() business_metrics['gmv_lift'] = (exp_gmv - ctrl_gmv) / ctrl_gmv
# 用户参与度 exp_engagement = experiment_data['session_length'].mean() ctrl_engagement = control_data['session_length'].mean() business_metrics['engagement_lift'] = (exp_engagement - ctrl_engagement) / ctrl_engagement
return business_metrics
def diversity_coverage_analysis(self, recommendations, item_catalog): """多样性和覆盖度分析""" metrics = {}
# 推荐覆盖度:被推荐的商品占总商品的比例 recommended_items = set() for user_recs in recommendations.values(): recommended_items.update(user_recs)
metrics['catalog_coverage'] = len(recommended_items) / len(item_catalog)
# 推荐多样性:用户推荐列表的多样性 user_diversity_scores = [] for user_id, user_recs in recommendations.items(): if len(user_recs) > 1: diversity_score = self.calculate_intra_list_diversity(user_recs, item_catalog) user_diversity_scores.append(diversity_score)
metrics['avg_diversity'] = np.mean(user_diversity_scores)
# 新颖性:推荐不热门商品的程度 item_popularity = item_catalog['purchase_count'] / item_catalog['purchase_count'].sum() novelty_scores = []
for user_recs in recommendations.values(): rec_popularity = item_popularity[user_recs].mean() novelty_scores.append(1 - rec_popularity) # 越不热门,新颖性越高
metrics['avg_novelty'] = np.mean(novelty_scores)
return metrics商品运营分析
题目4:商品销售预测模型
场景:双11前需要预测各商品的销量,指导库存准备和促销策略。
预测模型设计:
# 商品销售预测模型import pandas as pdimport numpy as npfrom sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressorfrom sklearn.model_selection import TimeSeriesSplitfrom sklearn.metrics import mean_absolute_error, mean_squared_errorimport xgboost as xgb
class ProductSalesForecast: def __init__(self): self.models = {} self.feature_importance = {}
def prepare_features(self, sales_data, product_data, promotion_data, external_data): """特征工程""" # 合并基础数据 features_df = sales_data.merge(product_data, on='product_id', how='left') features_df = features_df.merge(promotion_data, on=['product_id', 'date'], how='left') features_df = features_df.merge(external_data, on='date', how='left')
# 时间特征 features_df['year'] = features_df['date'].dt.year features_df['month'] = features_df['date'].dt.month features_df['day'] = features_df['date'].dt.day features_df['weekday'] = features_df['date'].dt.weekday features_df['is_weekend'] = features_df['weekday'].isin([5, 6]).astype(int) features_df['is_holiday'] = features_df['date'].isin(self.get_holidays()).astype(int)
# 滞后特征 for lag in [1, 7, 14, 30]: features_df[f'sales_lag_{lag}'] = features_df.groupby('product_id')['sales'].shift(lag)
# 滚动统计特征 for window in [7, 14, 30]: features_df[f'sales_mean_{window}d'] = features_df.groupby('product_id')['sales'].rolling(window).mean().reset_index(0, drop=True) features_df[f'sales_std_{window}d'] = features_df.groupby('product_id')['sales'].rolling(window).std().reset_index(0, drop=True)
# 商品特征 features_df['price_change'] = features_df.groupby('product_id')['price'].pct_change() features_df['days_since_launch'] = (features_df['date'] - features_df['launch_date']).dt.days
# 促销特征 features_df['has_promotion'] = features_df['promotion_type'].notna().astype(int) features_df['discount_rate'] = features_df['discount_rate'].fillna(0)
# 竞争对手特征 features_df['competitor_avg_price'] = features_df.groupby(['category', 'date'])['price'].transform('mean') features_df['price_competitiveness'] = features_df['price'] / features_df['competitor_avg_price']
# 外部特征 features_df['weather_score'] = features_df['temperature'] * 0.3 + features_df['humidity'] * 0.7
return features_df
def create_ensemble_model(self, X_train, y_train, X_val, y_val): """集成模型""" models = { 'rf': RandomForestRegressor(n_estimators=100, random_state=42), 'gbm': GradientBoostingRegressor(n_estimators=100, random_state=42), 'xgb': xgb.XGBRegressor(n_estimators=100, random_state=42) }
predictions = {} model_weights = {}
# 训练各个模型 for name, model in models.items(): model.fit(X_train, y_train) val_pred = model.predict(X_val) val_mae = mean_absolute_error(y_val, val_pred)
predictions[name] = val_pred model_weights[name] = 1 / (val_mae + 1e-6) # 权重与误差成反比
self.models[name] = model
# 归一化权重 total_weight = sum(model_weights.values()) model_weights = {k: v/total_weight for k, v in model_weights.items()}
# 加权平均预测 ensemble_pred = sum(predictions[name] * weight for name, weight in model_weights.items())
return ensemble_pred, model_weights
def predict_campaign_impact(self, base_forecast, campaign_features): """预测营销活动影响""" # 基于历史活动数据训练影响因子模型 impact_multipliers = {}
for campaign_type in campaign_features['campaign_type'].unique(): if campaign_type in ['flash_sale', 'coupon', 'bundle']: # 根据活动类型设定不同的提升系数 impact_multipliers[campaign_type] = { 'flash_sale': 2.5, 'coupon': 1.8, 'bundle': 1.3 }[campaign_type]
# 应用影响因子 adjusted_forecast = base_forecast.copy() for idx, row in campaign_features.iterrows(): if row['campaign_type'] in impact_multipliers: multiplier = impact_multipliers[row['campaign_type']] # 考虑折扣力度 discount_factor = 1 + (row['discount_rate'] * 0.5) adjusted_forecast[idx] *= multiplier * discount_factor
return adjusted_forecast
def demand_sensing(self, real_time_data): """实时需求感知""" # 基于实时数据调整预测
# 计算实时转化指标 current_ctr = real_time_data['clicks'] / real_time_data['impressions'] current_cvr = real_time_data['orders'] / real_time_data['clicks']
# 与历史基准对比 historical_ctr = 0.05 # 历史平均CTR historical_cvr = 0.08 # 历史平均CVR
ctr_factor = current_ctr / historical_ctr cvr_factor = current_cvr / historical_cvr
# 调整因子 adjustment_factor = (ctr_factor * 0.4 + cvr_factor * 0.6)
return adjustment_factor
def inventory_optimization(self, forecast_data, inventory_constraints): """库存优化建议""" optimization_results = []
for product_id in forecast_data['product_id'].unique(): product_forecast = forecast_data[forecast_data['product_id'] == product_id]
# 安全库存计算 avg_daily_sales = product_forecast['predicted_sales'].mean() sales_std = product_forecast['predicted_sales'].std() lead_time = inventory_constraints.get(product_id, {}).get('lead_time', 7) service_level = 0.95 # 95%服务水平
from scipy.stats import norm z_score = norm.ppf(service_level) safety_stock = z_score * sales_std * np.sqrt(lead_time)
# 推荐库存量 forecast_period = len(product_forecast) total_forecast = product_forecast['predicted_sales'].sum() recommended_inventory = total_forecast + safety_stock
optimization_results.append({ 'product_id': product_id, 'forecast_sales': total_forecast, 'safety_stock': safety_stock, 'recommended_inventory': recommended_inventory, 'current_inventory': inventory_constraints.get(product_id, {}).get('current_stock', 0), 'reorder_point': avg_daily_sales * lead_time + safety_stock })
return pd.DataFrame(optimization_results)
# 价格弹性分析class PriceElasticityAnalyzer: def __init__(self): self.elasticity_models = {}
def calculate_price_elasticity(self, sales_data): """计算价格弹性""" elasticity_results = []
for product_id in sales_data['product_id'].unique(): product_data = sales_data[sales_data['product_id'] == product_id].copy()
if len(product_data) < 30: # 数据点太少 continue
# 对数线性回归计算弹性 product_data['log_sales'] = np.log(product_data['sales'] + 1) product_data['log_price'] = np.log(product_data['price'])
# 控制其他变量 from sklearn.linear_model import LinearRegression
X = product_data'log_price', 'is_weekend', 'has_promotion'.fillna(0) y = product_data['log_sales']
model = LinearRegression() model.fit(X, y)
price_elasticity = model.coef_[0] # 价格系数即为弹性
elasticity_results.append({ 'product_id': product_id, 'price_elasticity': price_elasticity, 'elasticity_interpretation': self.interpret_elasticity(price_elasticity), 'r_squared': model.score(X, y) })
return pd.DataFrame(elasticity_results)
def interpret_elasticity(self, elasticity): """解释价格弹性""" if abs(elasticity) < 0.5: return 'inelastic' # 缺乏弹性 elif abs(elasticity) < 1.0: return 'moderately_elastic' # 中度弹性 else: return 'highly_elastic' # 高弹性
def optimal_pricing_strategy(self, elasticity_data, cost_data): """最优定价策略""" pricing_recommendations = []
for _, row in elasticity_data.iterrows(): product_id = row['product_id'] elasticity = row['price_elasticity']
# 获取成本信息 cost = cost_data.get(product_id, {}).get('unit_cost', 0) current_price = cost_data.get(product_id, {}).get('current_price', 0)
if elasticity < -1: # 富有弹性 # 降价策略:需求增加幅度大于价格下降幅度 recommended_change = -0.05 # 降价5% strategy = 'reduce_price' elif elasticity > -0.5: # 缺乏弹性 # 涨价策略:需求下降幅度小于价格上涨幅度 recommended_change = 0.08 # 涨价8% strategy = 'increase_price' else: # 中度弹性 # 维持策略 recommended_change = 0 strategy = 'maintain_price'
new_price = current_price * (1 + recommended_change) expected_demand_change = elasticity * recommended_change
pricing_recommendations.append({ 'product_id': product_id, 'current_price': current_price, 'recommended_price': new_price, 'price_change_pct': recommended_change, 'expected_demand_change_pct': expected_demand_change, 'strategy': strategy, 'elasticity': elasticity })
return pd.DataFrame(pricing_recommendations)推荐算法工程师面试题库
推荐系统设计
题目5:电商推荐系统架构设计(核心题目)
场景:为大型电商平台设计支持千万级用户的个性化推荐系统。
系统架构设计:
# 电商推荐系统架构class EcommerceRecommendationSystem: def __init__(self): self.user_profiles = {} self.item_profiles = {} self.models = {}
def multi_stage_recommendation(self, user_id, context=None): """多阶段推荐流程"""
# Stage 1: 召回层 (Recall) candidate_items = self.recall_stage(user_id, context)
# Stage 2: 粗排层 (Coarse Ranking) coarse_ranked_items = self.coarse_ranking_stage(user_id, candidate_items, context)
# Stage 3: 精排层 (Fine Ranking) fine_ranked_items = self.fine_ranking_stage(user_id, coarse_ranked_items, context)
# Stage 4: 重排层 (Re-ranking) final_recommendations = self.reranking_stage(user_id, fine_ranked_items, context)
return final_recommendations
def recall_stage(self, user_id, context=None, top_k=1000): """召回阶段:从全量商品中召回候选集""" candidates = set()
# 1. 协同过滤召回 cf_candidates = self.collaborative_filtering_recall(user_id, top_k//4) candidates.update(cf_candidates)
# 2. 内容召回 content_candidates = self.content_based_recall(user_id, top_k//4) candidates.update(content_candidates)
# 3. 热门商品召回 popular_candidates = self.popularity_recall(user_id, context, top_k//4) candidates.update(popular_candidates)
# 4. 深度学习召回 dl_candidates = self.deep_learning_recall(user_id, top_k//4) candidates.update(dl_candidates)
return list(candidates)[:top_k]
def collaborative_filtering_recall(self, user_id, top_k): """协同过滤召回""" # UserCF + ItemCF 混合
# UserCF: 基于用户相似度 similar_users = self.find_similar_users(user_id, top_k=100) user_cf_items = []
for similar_user, similarity in similar_users: user_items = self.get_user_items(similar_user) current_user_items = set(self.get_user_items(user_id))
for item_id, rating in user_items: if item_id not in current_user_items: score = similarity * rating user_cf_items.append((item_id, score))
# ItemCF: 基于物品相似度 user_items = self.get_user_items(user_id) item_cf_items = []
for item_id, rating in user_items[-10:]: # 最近10个商品 similar_items = self.find_similar_items(item_id, top_k=20) for similar_item, similarity in similar_items: score = similarity * rating item_cf_items.append((similar_item, score))
# 合并和排序 all_cf_items = user_cf_items + item_cf_items cf_scores = {} for item_id, score in all_cf_items: if item_id in cf_scores: cf_scores[item_id] += score else: cf_scores[item_id] = score
# 返回top_k商品 sorted_items = sorted(cf_scores.items(), key=lambda x: x[1], reverse=True) return [item_id for item_id, score in sorted_items[:top_k]]
def deep_learning_recall(self, user_id, top_k): """深度学习召回 - 双塔模型"""
# 用户向量 user_embedding = self.get_user_embedding(user_id)
# 商品向量(预计算存储) item_embeddings = self.load_item_embeddings()
# 计算相似度 similarities = np.dot(item_embeddings, user_embedding)
# 获取top_k top_indices = np.argsort(similarities)[-top_k:][::-1] top_items = [self.index_to_item_id[idx] for idx in top_indices]
return top_items
def fine_ranking_stage(self, user_id, candidate_items, context): """精排阶段:深度CTR预估模型"""
features = [] for item_id in candidate_items: feature_vector = self.extract_ranking_features(user_id, item_id, context) features.append(feature_vector)
# 使用深度CTR模型预测点击概率 click_probs = self.ctr_model.predict(features)
# 使用CVR模型预测转化概率 conversion_probs = self.cvr_model.predict(features)
# 综合排序分数 ranking_scores = [] for i, (click_prob, cvr_prob) in enumerate(zip(click_probs, conversion_probs)): # CTCVR = CTR × CVR ctcvr = click_prob * cvr_prob
# 考虑多目标优化 item_id = candidate_items[i] item_price = self.get_item_price(item_id) item_margin = self.get_item_margin(item_id)
# 综合分数:点击转化概率 + 商业价值 final_score = ctcvr * 0.7 + (item_price * item_margin * ctcvr) * 0.3 ranking_scores.append((item_id, final_score))
# 按分数排序 ranked_items = sorted(ranking_scores, key=lambda x: x[1], reverse=True) return [item_id for item_id, score in ranked_items]
def reranking_stage(self, user_id, ranked_items, context): """重排阶段:多样性和业务规则"""
# 1. 多样性优化 diversified_items = self.diversification(ranked_items, user_id)
# 2. 业务规则过滤 filtered_items = self.apply_business_rules(diversified_items, user_id, context)
# 3. 实时调整 final_items = self.real_time_adjustment(filtered_items, user_id, context)
return final_items
def diversification(self, ranked_items, user_id, lambda_param=0.3): """MMR多样性优化""" selected_items = [] remaining_items = ranked_items.copy()
# 第一个商品直接选择最高分的 if remaining_items: selected_items.append(remaining_items.pop(0))
# 后续商品考虑多样性 while remaining_items and len(selected_items) < 20: max_mmr_score = -1 best_item = None best_index = -1
for i, candidate in enumerate(remaining_items): # 相关性分数(原始排序分数) relevance_score = self.get_item_relevance_score(candidate, user_id)
# 与已选商品的最大相似度 max_similarity = 0 for selected_item in selected_items: similarity = self.calculate_item_similarity(candidate, selected_item) max_similarity = max(max_similarity, similarity)
# MMR分数 mmr_score = lambda_param * relevance_score - (1 - lambda_param) * max_similarity
if mmr_score > max_mmr_score: max_mmr_score = mmr_score best_item = candidate best_index = i
if best_item: selected_items.append(best_item) remaining_items.pop(best_index)
return selected_items
def extract_ranking_features(self, user_id, item_id, context): """提取排序特征""" features = {}
# 用户特征 user_profile = self.get_user_profile(user_id) features.update({ 'user_age': user_profile.get('age', 0), 'user_gender': user_profile.get('gender', 0), 'user_city_tier': user_profile.get('city_tier', 0), 'user_purchase_power': user_profile.get('purchase_power', 0) })
# 商品特征 item_profile = self.get_item_profile(item_id) features.update({ 'item_category': item_profile.get('category', 0), 'item_price': item_profile.get('price', 0), 'item_brand': item_profile.get('brand', 0), 'item_rating': item_profile.get('rating', 0), 'item_sales_volume': item_profile.get('sales_volume', 0) })
# 用户-商品交互特征 features.update({ 'user_item_category_preference': self.get_category_preference(user_id, item_profile.get('category')), 'user_item_brand_preference': self.get_brand_preference(user_id, item_profile.get('brand')), 'user_item_price_match': self.calculate_price_match(user_profile, item_profile) })
# 上下文特征 if context: features.update({ 'hour_of_day': context.get('hour', 0), 'day_of_week': context.get('weekday', 0), 'is_weekend': context.get('is_weekend', 0), 'device_type': context.get('device', 0), 'page_type': context.get('page_type', 0) })
# 统计特征 features.update({ 'item_ctr_7d': self.get_item_ctr(item_id, days=7), 'item_cvr_7d': self.get_item_cvr(item_id, days=7), 'user_category_ctr': self.get_user_category_ctr(user_id, item_profile.get('category')) })
return features
# CTR预估模型 - DeepFM实现class DeepFMCTRModel: def __init__(self, feature_dims, embedding_dim=8, hidden_dims=[256, 128, 64]): self.feature_dims = feature_dims self.embedding_dim = embedding_dim self.hidden_dims = hidden_dims self.model = self.build_model()
def build_model(self): """构建DeepFM模型""" import tensorflow as tf from tensorflow.keras import layers, Model
# 输入层 feature_inputs = [] embeddings = []
for i, dim in enumerate(self.feature_dims): input_layer = layers.Input(shape=(1,), name=f'feature_{i}') feature_inputs.append(input_layer)
# Embedding层 embedding = layers.Embedding(dim, self.embedding_dim)(input_layer) embedding = layers.Flatten()(embedding) embeddings.append(embedding)
# FM部分 # 一阶特征 first_order = layers.Concatenate()(feature_inputs) first_order_output = layers.Dense(1, activation=None)(first_order)
# 二阶特征(交叉项) embeddings_concat = layers.Concatenate()(embeddings)
# sum of squares sum_square = layers.Lambda(lambda x: tf.square(tf.reduce_sum( tf.reshape(x, (-1, len(self.feature_dims), self.embedding_dim)), axis=1 )))(embeddings_concat)
# square of sum square_sum = layers.Lambda(lambda x: tf.reduce_sum( tf.square(tf.reshape(x, (-1, len(self.feature_dims), self.embedding_dim))), axis=1 ))(embeddings_concat)
# FM交叉项 cross_term = layers.Lambda(lambda x: 0.5 * tf.reduce_sum(x[0] - x[1], axis=1, keepdims=True))([sum_square, square_sum])
# Deep部分 deep_input = embeddings_concat for hidden_dim in self.hidden_dims: deep_input = layers.Dense(hidden_dim, activation='relu')(deep_input) deep_input = layers.Dropout(0.3)(deep_input)
deep_output = layers.Dense(1, activation=None)(deep_input)
# 最终输出 output = layers.Add()([first_order_output, cross_term, deep_output]) output = layers.Activation('sigmoid')(output)
model = Model(inputs=feature_inputs, outputs=output) return model
def train(self, X_train, y_train, X_val, y_val): """训练模型""" self.model.compile( optimizer='adam', loss='binary_crossentropy', metrics=['auc'] )
# 训练 history = self.model.fit( X_train, y_train, validation_data=(X_val, y_val), epochs=10, batch_size=1024, verbose=1 )
return history
def predict(self, X): """预测""" return self.model.predict(X)
# 多目标学习模型class MultiTaskLearningModel: def __init__(self): self.shared_layers = None self.task_specific_layers = {}
def build_mmoe_model(self, feature_dims, num_experts=4, expert_dim=64): """Multi-gate Mixture-of-Experts模型""" import tensorflow as tf from tensorflow.keras import layers, Model
# 输入层 inputs = layers.Input(shape=(sum(feature_dims),))
# 专家网络 experts = [] for i in range(num_experts): expert = layers.Dense(expert_dim, activation='relu', name=f'expert_{i}')(inputs) experts.append(expert)
experts_concat = layers.Lambda(lambda x: tf.stack(x, axis=1))(experts)
# 门控网络 def build_gate(task_name): gate = layers.Dense(num_experts, activation='softmax', name=f'gate_{task_name}')(inputs) gate = layers.Reshape((num_experts, 1))(gate)
# 加权专家输出 weighted_expert = layers.Lambda(lambda x: tf.reduce_sum(x[0] * x[1], axis=1))([experts_concat, gate]) return weighted_expert
# CTR任务 ctr_gate_output = build_gate('ctr') ctr_output = layers.Dense(64, activation='relu')(ctr_gate_output) ctr_output = layers.Dense(1, activation='sigmoid', name='ctr_output')(ctr_output)
# CVR任务 cvr_gate_output = build_gate('cvr') cvr_output = layers.Dense(64, activation='relu')(cvr_gate_output) cvr_output = layers.Dense(1, activation='sigmoid', name='cvr_output')(cvr_output)
# 模型 model = Model(inputs=inputs, outputs=[ctr_output, cvr_output])
return model题目6:冷启动问题解决方案
场景:新用户和新商品的推荐策略设计。
解决方案:
# 冷启动解决方案class ColdStartSolver: def __init__(self): self.user_onboarding_model = None self.item_content_model = None
def new_user_recommendation(self, user_basic_info, onboarding_behavior=None): """新用户推荐策略""" recommendations = []
# 1. 基于人口统计学的推荐 demo_recs = self.demographic_based_recommendation(user_basic_info) recommendations.extend(demo_recs)
# 2. 基于地理位置的推荐 location_recs = self.location_based_recommendation(user_basic_info.get('city')) recommendations.extend(location_recs)
# 3. 热门商品推荐 popular_recs = self.popularity_based_recommendation(user_basic_info) recommendations.extend(popular_recs)
# 4. 如果有引导行为,基于内容推荐 if onboarding_behavior: content_recs = self.onboarding_based_recommendation(onboarding_behavior) recommendations.extend(content_recs)
# 去重并排序 unique_recs = list(set(recommendations)) return unique_recs[:20]
def new_item_recommendation(self, item_info): """新商品推荐策略""" # 1. 基于内容的用户匹配 similar_items = self.find_similar_items_by_content(item_info)
# 2. 找到喜欢相似商品的用户 target_users = [] for similar_item in similar_items: users = self.get_item_users(similar_item) target_users.extend(users)
# 3. 用户筛选和排序 user_scores = {} for user_id in target_users: score = self.calculate_user_item_match_score(user_id, item_info) user_scores[user_id] = score
# 返回最匹配的用户 sorted_users = sorted(user_scores.items(), key=lambda x: x[1], reverse=True) return [user_id for user_id, score in sorted_users[:100]]
def active_learning_strategy(self, user_id, candidate_items): """主动学习策略""" # 选择最有信息量的商品让用户评价
# 1. 不确定性采样:选择预测不确定性最高的商品 uncertainties = [] for item_id in candidate_items: prediction_variance = self.calculate_prediction_uncertainty(user_id, item_id) uncertainties.append((item_id, prediction_variance))
# 2. 多样性采样:确保覆盖不同类别 diverse_items = self.ensure_diversity_sampling(uncertainties)
# 3. 返回推荐用于主动学习 return diverse_items[:5] # 推荐5个最有价值的商品让用户反馈供应链数据科学家面试题库
需求预测与库存优化
题目7:库存优化模型(重点题目)
场景:电商平台需要优化商品库存,平衡缺货风险和库存成本。
优化模型设计:
# 库存优化模型import numpy as npimport pandas as pdfrom scipy.optimize import minimizefrom scipy.stats import norm, poisson
class InventoryOptimization: def __init__(self): self.demand_models = {} self.cost_parameters = {}
def newsvendor_model(self, demand_forecast, demand_std, unit_cost, selling_price, salvage_value=0): """报童模型 - 单周期库存优化"""
# 计算关键参数 overage_cost = unit_cost - salvage_value # 过量成本 underage_cost = selling_price - unit_cost # 不足成本
# 最优服务水平 optimal_service_level = underage_cost / (underage_cost + overage_cost)
# 最优订货量 z_score = norm.ppf(optimal_service_level) optimal_order_quantity = demand_forecast + z_score * demand_std
# 期望利润计算 def expected_profit(order_qty): # 蒙特卡洛模拟计算期望利润 simulations = 10000 profits = []
for _ in range(simulations): demand = np.random.normal(demand_forecast, demand_std) demand = max(0, demand) # 需求不能为负
sales = min(demand, order_qty) leftover = max(0, order_qty - demand)
revenue = sales * selling_price + leftover * salvage_value cost = order_qty * unit_cost profit = revenue - cost
profits.append(profit)
return np.mean(profits)
optimal_profit = expected_profit(optimal_order_quantity)
return { 'optimal_order_quantity': optimal_order_quantity, 'optimal_service_level': optimal_service_level, 'expected_profit': optimal_profit, 'critical_ratio': optimal_service_level }
def multi_period_inventory(self, demand_forecasts, holding_cost, stockout_cost, order_cost): """多周期库存优化"""
periods = len(demand_forecasts)
# 动态规划求解 def dp_inventory(period, current_inventory, total_cost=0): if period >= periods: return total_cost
min_cost = float('inf') optimal_order = 0
# 尝试不同的订货量 max_order = max(demand_forecasts) * 2 # 设定合理的上界
for order_qty in range(0, int(max_order) + 1, 10): # 步长为10减少计算量 inventory_before_demand = current_inventory + order_qty
# 计算本期成本 period_order_cost = order_cost if order_qty > 0 else 0 period_holding_cost = holding_cost * inventory_before_demand
# 需求实现后的库存和缺货 demand = demand_forecasts[period] inventory_after_demand = max(0, inventory_before_demand - demand) stockout = max(0, demand - inventory_before_demand)
period_stockout_cost = stockout_cost * stockout
period_total_cost = period_order_cost + period_holding_cost + period_stockout_cost
# 递归计算后续期间成本 future_cost = dp_inventory(period + 1, inventory_after_demand, 0)
total_period_cost = period_total_cost + future_cost
if total_period_cost < min_cost: min_cost = total_period_cost optimal_order = order_qty
return min_cost
# 求解最优策略 optimal_cost = dp_inventory(0, 0)
return optimal_cost
def abc_xyz_analysis(self, sales_data): """ABC-XYZ分析进行库存分类管理"""
# 计算商品的销售额和变异系数 product_analysis = sales_data.groupby('product_id').agg({ 'sales_amount': 'sum', 'sales_quantity': ['mean', 'std'] }).round(2)
product_analysis.columns = ['total_sales', 'avg_quantity', 'std_quantity']
# 计算变异系数 product_analysis['cv'] = product_analysis['std_quantity'] / product_analysis['avg_quantity'] product_analysis['cv'] = product_analysis['cv'].fillna(0)
# ABC分类(基于销售额) sales_sorted = product_analysis.sort_values('total_sales', ascending=False) sales_cumsum = sales_sorted['total_sales'].cumsum() total_sales = sales_sorted['total_sales'].sum() sales_cumsum_pct = sales_cumsum / total_sales
# ABC分类标准:A类80%,B类15%,C类5% product_analysis['abc_category'] = 'C' a_threshold = sales_cumsum_pct <= 0.8 b_threshold = (sales_cumsum_pct > 0.8) & (sales_cumsum_pct <= 0.95)
product_analysis.loc[sales_sorted[a_threshold].index, 'abc_category'] = 'A' product_analysis.loc[sales_sorted[b_threshold].index, 'abc_category'] = 'B'
# XYZ分类(基于变异系数) product_analysis['xyz_category'] = 'Z' product_analysis.loc[product_analysis['cv'] <= 0.5, 'xyz_category'] = 'X' # 稳定 product_analysis.loc[(product_analysis['cv'] > 0.5) & (product_analysis['cv'] <= 1.0), 'xyz_category'] = 'Y' # 中等
# 组合分类 product_analysis['abc_xyz_category'] = product_analysis['abc_category'] + product_analysis['xyz_category']
# 库存策略建议 inventory_strategies = { 'AX': '高频盘点,精确预测,适量安全库存', 'AY': '中频盘点,预测+缓冲,中等安全库存', 'AZ': '低频盘点,大缓冲库存,高安全库存', 'BX': '中频盘点,正常库存管理', 'BY': '低频盘点,中等缓冲', 'BZ': '低频盘点,高缓冲', 'CX': '低频盘点,最小库存', 'CY': '极低频盘点,按需订货', 'CZ': '考虑淘汰或外包' }
product_analysis['inventory_strategy'] = product_analysis['abc_xyz_category'].map(inventory_strategies)
return product_analysis
def safety_stock_calculation(self, demand_data, lead_time, service_level=0.95): """安全库存计算"""
results = []
for product_id in demand_data['product_id'].unique(): product_demand = demand_data[demand_data['product_id'] == product_id]['daily_demand']
# 需求统计 avg_demand = product_demand.mean() demand_std = product_demand.std()
# 安全库存公式:SS = Z * σ * √L z_score = norm.ppf(service_level) safety_stock = z_score * demand_std * np.sqrt(lead_time)
# 再订货点:ROP = 平均需求 * 提前期 + 安全库存 reorder_point = avg_demand * lead_time + safety_stock
# 服务水平分析 current_service_level = 1 - norm.cdf(0, avg_demand * lead_time, demand_std * np.sqrt(lead_time))
results.append({ 'product_id': product_id, 'avg_daily_demand': avg_demand, 'demand_std': demand_std, 'safety_stock': safety_stock, 'reorder_point': reorder_point, 'target_service_level': service_level, 'current_service_level': current_service_level })
return pd.DataFrame(results)
# 供应链网络优化class SupplyChainNetworkOptimization: def __init__(self): self.facilities = {} self.transportation_costs = {}
def facility_location_optimization(self, demand_points, potential_facilities, fixed_costs, variable_costs): """设施选址优化"""
from scipy.optimize import linprog
n_facilities = len(potential_facilities) n_customers = len(demand_points)
# 决策变量:x_ij(从设施i到客户j的供应量),y_i(是否开设施i)
# 目标函数系数 c = []
# 运输成本(x_ij变量) for i in range(n_facilities): for j in range(n_customers): transport_cost = self.calculate_transport_cost(potential_facilities[i], demand_points[j]) c.append(transport_cost)
# 固定成本(y_i变量) for i in range(n_facilities): c.append(fixed_costs[i])
# 约束条件 A_eq = [] b_eq = []
# 需求满足约束:每个客户的需求必须被满足 for j in range(n_customers): constraint = [0] * len(c) for i in range(n_facilities): constraint[i * n_customers + j] = 1 A_eq.append(constraint) b_eq.append(demand_points[j]['demand'])
# 容量约束:供应量不能超过设施容量 A_ub = [] b_ub = []
for i in range(n_facilities): constraint = [0] * len(c) # 该设施的所有供应量 for j in range(n_customers): constraint[i * n_customers + j] = 1 # 减去设施容量乘以开设指示变量 constraint[n_facilities * n_customers + i] = -potential_facilities[i]['capacity'] A_ub.append(constraint) b_ub.append(0)
# 变量边界 bounds = []
# x_ij变量边界(供应量非负) for i in range(n_facilities): for j in range(n_customers): bounds.append((0, None))
# y_i变量边界(二进制变量,这里放宽为0-1连续) for i in range(n_facilities): bounds.append((0, 1))
# 求解 result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
if result.success: # 解析结果 solution = result.x
# 开设的设施 opened_facilities = [] facility_vars_start = n_facilities * n_customers
for i in range(n_facilities): if solution[facility_vars_start + i] > 0.5: # 二进制变量阈值 opened_facilities.append(i)
# 分配方案 allocation = {} for i in range(n_facilities): if i in opened_facilities: allocation[i] = {} for j in range(n_customers): supply = solution[i * n_customers + j] if supply > 0.01: # 忽略极小值 allocation[i][j] = supply
return { 'opened_facilities': opened_facilities, 'allocation': allocation, 'total_cost': result.fun, 'optimization_successful': True } else: return { 'optimization_successful': False, 'message': 'Optimization failed' }
def calculate_transport_cost(self, facility, customer): """计算运输成本""" # 简化的距离计算(实际应使用地理距离) distance = np.sqrt( (facility['lat'] - customer['lat'])**2 + (facility['lng'] - customer['lng'])**2 )
# 运输成本 = 距离 * 单位距离成本 * 需求量 cost_per_km = 0.5 return distance * cost_per_km * customer['demand']题目8:供应商评估与选择
场景:建立供应商综合评估体系,支持采购决策。
评估模型:
# 供应商评估模型class SupplierEvaluationSystem: def __init__(self): self.evaluation_criteria = { 'quality': 0.3, # 质量权重 'cost': 0.25, # 成本权重 'delivery': 0.2, # 交付权重 'service': 0.15, # 服务权重 'sustainability': 0.1 # 可持续性权重 }
def ahp_supplier_selection(self, suppliers_data, criteria_comparison_matrix): """层次分析法(AHP)供应商选择"""
# 1. 计算准则权重 criteria_weights = self.calculate_ahp_weights(criteria_comparison_matrix)
# 2. 计算各供应商在各准则下的得分 supplier_scores = {}
for criterion in self.evaluation_criteria.keys(): # 构建供应商在该准则下的比较矩阵 supplier_comparison = self.build_supplier_comparison_matrix( suppliers_data, criterion )
# 计算权重 supplier_weights = self.calculate_ahp_weights(supplier_comparison) supplier_scores[criterion] = supplier_weights
# 3. 计算综合得分 final_scores = {} for supplier in suppliers_data.keys(): score = 0 for criterion, weight in criteria_weights.items(): score += weight * supplier_scores[criterion][supplier] final_scores[supplier] = score
# 排序 ranked_suppliers = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)
return { 'criteria_weights': criteria_weights, 'supplier_scores': supplier_scores, 'final_ranking': ranked_suppliers }
def calculate_ahp_weights(self, comparison_matrix): """计算AHP权重""" eigenvalues, eigenvectors = np.linalg.eig(comparison_matrix)
# 找到最大特征值对应的特征向量 max_eigenvalue_index = np.argmax(eigenvalues.real) principal_eigenvector = eigenvectors[:, max_eigenvalue_index].real
# 归一化得到权重 weights = principal_eigenvector / np.sum(principal_eigenvector)
return weights
def supplier_risk_assessment(self, supplier_data, market_data): """供应商风险评估"""
risk_factors = {}
for supplier_id, data in supplier_data.items(): # 1. 财务风险 financial_risk = self.calculate_financial_risk(data['financial_metrics'])
# 2. 运营风险 operational_risk = self.calculate_operational_risk(data['operational_metrics'])
# 3. 地理风险 geographic_risk = self.calculate_geographic_risk(data['location'], market_data)
# 4. 市场风险 market_risk = self.calculate_market_risk(data['market_position'], market_data)
# 综合风险评分 total_risk = ( financial_risk * 0.3 + operational_risk * 0.3 + geographic_risk * 0.2 + market_risk * 0.2 )
risk_factors[supplier_id] = { 'financial_risk': financial_risk, 'operational_risk': operational_risk, 'geographic_risk': geographic_risk, 'market_risk': market_risk, 'total_risk_score': total_risk, 'risk_level': self.categorize_risk_level(total_risk) }
return risk_factors
def calculate_financial_risk(self, financial_metrics): """计算财务风险""" # 财务指标评估 debt_ratio = financial_metrics.get('debt_ratio', 0) current_ratio = financial_metrics.get('current_ratio', 1) profit_margin = financial_metrics.get('profit_margin', 0)
# 风险评分(0-1,越高风险越大) debt_risk = min(debt_ratio / 0.7, 1) # 负债率超过70%高风险 liquidity_risk = max(0, (2 - current_ratio) / 2) # 流动比率低于2有风险 profitability_risk = max(0, (0.05 - profit_margin) / 0.05) # 利润率低于5%有风险
financial_risk = (debt_risk + liquidity_risk + profitability_risk) / 3
return financial_risk数据产品经理面试题库
电商数据产品设计
题目9:用户画像产品设计(核心题目)
场景:为电商平台设计用户画像产品,支持精准营销和个性化推荐。
产品设计要点:
## 用户画像产品设计### 1. 产品目标与价值#### 业务目标- 提升营销ROI:精准投放提升转化率30%- 优化用户体验:个性化推荐提升点击率25%- 降低获客成本:精准获客降低CAC 20%- 提升用户价值:生命周期价值提升15%
#### 用户价值- 营销人员:快速定位目标用户群体- 产品经理:了解用户需求指导产品优化- 运营人员:制定个性化运营策略- 算法工程师:提供特征支持模型优化
### 2. 核心功能设计#### 画像构建模块- 标签体系管理:分层标签体系设计- 数据源管理:多源数据整合清洗- 特征工程:自动化特征提取计算- 画像更新:实时和离线更新机制
#### 画像查询模块- 用户检索:单用户画像详情查看- 群体分析:用户群体特征分析- 标签筛选:多维度条件组合查询- 画像对比:不同用户群体对比
#### 应用服务模块- API服务:实时画像数据接口- 营销投放:人群包生成和投放- 个性化推荐:特征数据支持- 效果追踪:应用效果监控分析
### 3. 技术架构设计#### 数据层- 行为数据:点击、浏览、购买、搜索- 交易数据:订单、支付、退款- 内容数据:商品浏览、收藏、评价- 外部数据:第三方数据补充
#### 计算层- 离线计算:T+1批量画像更新- 实时计算:关键标签实时更新- 特征工程:自动化特征提取- 模型服务:机器学习模型预测
#### 服务层- 画像服务:RESTful API接口- 查询服务:高性能查询引擎- 推送服务:主动数据推送- 监控服务:系统健康度监控
#### 应用层- 管理后台:标签管理配置- 查询平台:自助查询分析- 开放平台:第三方集成- 移动应用:移动端访问
### 4. 标签体系设计#### 基础属性标签- 人口统计:年龄、性别、地域、职业- 设备信息:设备类型、操作系统、网络- 注册信息:注册时间、渠道来源
#### 行为特征标签- 访问行为:访问频次、时长、路径- 购买行为:购买频次、金额、品类- 互动行为:收藏、分享、评价
#### 偏好兴趣标签- 品类偏好:服装、数码、家居偏好度- 品牌偏好:品牌忠诚度、价格敏感度- 内容偏好:关注内容类型和主题
#### 价值风险标签- 价值标签:RFM价值、生命周期价值- 风险标签:信用风险、流失风险- 潜力标签:成长潜力、推荐价值
### 5. 产品迭代规划#### MVP版本(3个月)- 基础画像标签体系- 单用户画像查询- 简单的人群筛选- 基础API服务
#### V1.0版本(6个月)- 完整标签体系- 高级查询和分析- 营销应用集成- 效果监控体系
#### V2.0版本(12个月)- 实时画像更新- 智能标签推荐- 多场景应用优化- 开放平台建设题目10:数据产品商业化策略
场景:如何将内部数据产品商业化,开放给外部客户使用?
商业化策略:
## 数据产品商业化策略### 1. 市场机会分析#### 目标市场- 中小电商:缺乏数据能力的中小商家- 品牌商:需要消费者洞察的品牌方- 代理商:广告代理和营销服务商- 开发者:需要数据API的开发者
#### 市场规模- TAM:数据服务市场总规模- SAM:可服务市场规模- SOM:可获得市场份额
### 2. 产品定位策略#### 核心价值主张- 数据丰富度:覆盖亿级用户数据- 实时性:毫秒级数据更新- 准确性:算法模型保证精度- 易用性:开箱即用的产品体验
#### 差异化优势- vs 第三方数据公司:数据更新鲜、更准确- vs 自建方案:成本更低、部署更快- vs 通用解决方案:行业专业性更强
### 3. 商业模式设计#### SaaS订阅模式- 基础版:免费,限制调用量- 专业版:月费999元,标准调用量- 企业版:年费19999元,无限调用
#### API调用计费- 按次计费:0.01元/次- 包量计费:1万次/月 50元- 流量计费:按数据传输量
#### 定制服务- 数据定制:按需数据采集和处理- 模型定制:专属算法模型开发- 部署定制:私有化部署服务
### 4. 技术产品化#### API标准化- RESTful设计:标准HTTP接口- 文档完善:详细的API文档- SDK支持:多语言SDK支持- 测试环境:沙箱测试环境
#### 平台化建设- 开发者门户:注册、认证、管理- 控制台:使用监控、账单管理- 技术支持:在线客服、工单系统- 社区建设:开发者社区和论坛
### 5. 商业化实施#### 产品包装- 产品命名:数据洞察云平台- 品牌设计:专业的视觉形象- 价值包装:ROI量化和案例展示- 营销物料:产品手册、演示视频
#### 销售策略- 在线销售:自助注册购买- 直销团队:大客户直销- 渠道合作:代理商分销- 生态合作:与SI合作
#### 客户成功- 客户导入:专业的实施团队- 使用培训:产品使用培训- 客户运营:定期回访和优化- 续费管理:续费提醒和挽留面试准备建议
电商行业核心能力
业务理解深度
# 电商数据岗位核心知识清单
## 电商业务模式- [ ] 平台型电商:淘宝、京东模式理解- [ ] 自营电商:网易严选、小米商城- [ ] 社交电商:拼多多、微商模式- [ ] 跨境电商:亚马逊、阿里国际
## 核心业务流程- [ ] 用户生命周期:获客→激活→留存→变现→推荐- [ ] 商品生命周期:上架→推广→销售→下架- [ ] 订单流程:下单→支付→发货→收货→评价- [ ] 供应链流程:采购→入库→分拣→配送
## 关键指标体系- [ ] 流量指标:UV、PV、跳出率、转化率- [ ] 交易指标:GMV、客单价、复购率- [ ] 用户指标:LTV、CAC、留存率- [ ] 运营指标:营销ROI、库存周转率
## 技术应用场景- [ ] 个性化推荐:协同过滤、深度学习- [ ] 搜索优化:搜索排序、查询理解- [ ] 风控反欺诈:异常检测、图算法- [ ] 供应链优化:需求预测、库存管理技术能力要求
核心技术栈
# 电商数据技术栈
## 数据处理- [ ] 实时计算:Flink、Kafka、Storm- [ ] 离线计算:Spark、Hive、MapReduce- [ ] 数据存储:HBase、Redis、ES- [ ] 数据同步:DataX、Canal、Sqoop
## 算法模型- [ ] 推荐算法:协同过滤、深度学习、强化学习- [ ] 搜索算法:信息检索、排序学习- [ ] 预测算法:时间序列、回归模型- [ ] 优化算法:线性规划、启发式算法
## 工程能力- [ ] 系统设计:高并发、高可用架构- [ ] 性能优化:缓存、索引、分库分表- [ ] 监控告警:系统监控、业务监控- [ ] 部署运维:容器化、自动化部署项目经验积累
推荐项目实践
- 电商推荐系统:端到端推荐系统开发
- 用户画像平台:多源数据整合和标签体系
- A/B测试平台:实验设计和效果评估
- 供应链优化:需求预测和库存优化
- 实时风控系统:异常检测和风险控制
学习资源推荐
- 技术博客:美团技术团队、阿里技术
- 开源项目:推荐系统、搜索引擎项目
- 在线课程:机器学习、深度学习课程
- 技术会议:电商技术大会、推荐系统会议
学习连接
前置知识
- 推荐系统基础 - 推荐算法理论基础
- 电商业务分析 - 电商业务理解
相关概念
- 推荐系统技术栈 - 技术实现细节
- 用户运营分析 - 用户分析方法
后续学习
- 制造业面试题库 - 传统行业数据应用
- 技能提升指南 - 持续能力建设
本文节选自数据从业者全栈知识库。知识库包含 2300+ 篇体系化技术文档,覆盖数据分析、数据工程、数据治理、AI 等全栈领域。了解更多 ->