2024-08-10



import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}
import play.api.libs.ws._
import play.api.libs.json._
import scala.concurrent.duration._
 
// 假设以下方法用于获取亚马逊商品页面的HTML
def fetchProductPage(asin: String, proxy: Option[String] = None): Future[String] = {
  val url = s"http://www.amazon.com/dp/$asin/?tag=yourtag-20"
  val request = WS.url(url)
  proxy.foreach(request.withProxy)
  val response = request.get()
 
  response.map { res =>
    if (res.status == 200) {
      res.body
    } else {
      throw new Exception(s"Failed to fetch product page for ASIN: $asin, status: ${res.status}")
    }
  }
}
 
// 使用示例
val asin = "B01M8L5Z3Q" // 示例ASIN
val proxyOption = Some("http://user:password@proxyserver:port") // 代理服务器(如有需要)
 
val pageFuture = fetchProductPage(asin, proxyOption)
 
pageFuture.onComplete {
  case Success(html) => println(s"Success: $html")
  case Failure(e) => println(s"Failed: ${e.getMessage}")
}
 
// 等待响应,如果需要同步执行
import scala.concurrent.Await
Await.result(pageFuture, 30.seconds) match {
  case html => println(s"Success: $html")
}

这个代码示例展示了如何使用Scala和Play WS库来异步获取亚马逊商品页面的HTML内容。它使用Future来处理异步操作,并且可以通过可选的代理服务器参数来绕过反爬虫措施。这个例子简洁地展示了如何应对代理和反爬虫的挑战,同时保持代码的简洁性和可读性。

2024-08-07

Spark的"经典demo"通常指的是WordCount程序,它用于统计文本文件中每个单词出现的次数。以下是Scala和Java两种语言的实现。

Scala版本:




import org.apache.spark.{SparkConf, SparkContext}
 
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
 
    val textFile = sc.textFile("hdfs://path/to/input/file.txt")
    val wordCounts = textFile.flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _)
    wordCounts.saveAsTextFile("hdfs://path/to/output/directory")
 
    sc.stop()
  }
}

Java版本:




import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
 
public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("WordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);
 
        JavaRDD<String> textFile = sc.textFile("hdfs://path/to/input/file.txt");
        JavaRDD<String> words = textFile.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator());
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey((x, y) -> x + y);
        wordCounts.saveAsTextFile("hdfs://path/to/output/directory");
 
        sc.stop();
    }
}

在这两个例子中,我们首先设置了Spark的配置并创建了一个SparkContext对象。然后,我们读取了一个文本文件,将其分割成单词,并为每个单词设置了计数1,接着通过reduceByKey来累加相同单词的计数。最后,我们将结果保存到指定的文件系统路径。代码中的HDFS路径需要替换为实际的输入文件路径和输出目录路径。