# -*- coding: utf-8 -*- """ Zero Intelligence Traders Author: Dale K. Brearcliffe Email: dbrearcl@masonlive.gmu.edu Date: 1 September 2016 Version: 1.0 Inspired by: Dr. Robert Axtell's C pThread version Based on: Gode and Sunder, QJE, 1993 Copyright 2016 Dale K. Brearcliffe Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("ZIT") sc = SparkContext(conf = conf) # Hide annoying messages log4j = sc._jvm.org.apache.log4j log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR) # Get libray routines import random import timeit from functools import partial import pyspark.sql.functions as sqlfunc from pyspark.sql.types import IntegerType # Initialize variables max_buyer_value = 30 max_seller_value = 30 max_number_of_trades = 10000000 number_of_buyers = 100000 number_of_sellers = 100000 cores = int(conf.get('spark.executor.cores')) memory = conf.get('spark.executor.memory') executors = int(conf.get('spark.executor.instances')) number_of_partitions = executors * 4 start_time = timeit.default_timer() # Function to properly create a random integer between 1 and some max # Note: random.randint(s, e) often repeats values across range def initialize(p, max_value): return random.SystemRandom().randint(1, max_value) # Create data initialization functions for each type of dataset initialize_buyers = partial(initialize, max_value=max_buyer_value) initialize_sellers = partial(initialize, max_value=max_seller_value) initialize_trades_buyers = partial(initialize, max_value=number_of_buyers) initialize_trades_sellers = partial(initialize, max_value=number_of_sellers) # SQL function to set transaction price or '-1' if impossible make_a_trade = sqlfunc.udf(lambda a_ask, a_bid: -1 if a_ask > a_bid else random.SystemRandom().randint(a_ask, a_bid), IntegerType()) # Create a dataframe of random buyers and sellers using format: (ID, quantity, bid/ask price) DF_Buyers = sc.parallelize(xrange(0, number_of_buyers), number_of_partitions).map(lambda x: (x, 0, initialize_buyers(x))).cache().toDF(["Agent_Buyer","Quantity_Buyer","Bid"]) DF_Sellers = sc.parallelize(xrange(0, number_of_sellers), number_of_partitions).map(lambda x: (x, 1, initialize_sellers(x))).cache().toDF(["Agent_Seller","Quantity_Seller","Ask"]) # Create a dataframe of random trades using format: (ID, Buyer ID, Seller ID) DF_Trades = sc.parallelize(xrange(0, max_number_of_trades), number_of_partitions).map(lambda x: (x, initialize_trades_buyers(x) - 1, initialize_trades_sellers(x) - 1)).cache().toDF(["Trade","Buyer","Seller"]) # Create a new dataframe by joining the Buyers and Sellers to the Trades. This # is a series of all data needed to judge the potential of each trade. DF_Trans_Potential = DF_Trades.join(DF_Buyers, DF_Trades.Buyer == DF_Buyers.Agent_Buyer).join(DF_Sellers, DF_Trades.Seller == DF_Sellers.Agent_Seller).select(DF_Trades.Trade, DF_Trades.Buyer, DF_Buyers.Quantity_Buyer, DF_Buyers.Bid, DF_Trades.Seller, DF_Sellers.Quantity_Seller, DF_Sellers.Ask) # For each potential transaction see if the Bid price is greater than the Ask # price. If so, calculate a random transaction value between the Ask and Bid # values. If not so, use -1 as the transaction value. Append the transaction # value in the new column 'Transaction'. Also add a column names 'Agents" that # is composed of the Buyer and Seller IDs. DF_Trans_Possible = DF_Trans_Potential.withColumn("Transaction", make_a_trade(DF_Trans_Potential.Ask, DF_Trans_Potential.Bid)) # Filter out the impossible trades designated with a -1 transaction. DF_Trans_Complete = DF_Trans_Possible.filter(DF_Trans_Possible.Transaction > -1) # Create a set of unique sellers from a set of unique buyers DF_Trans_Final = DF_Trans_Complete.dropDuplicates(['Buyer']).dropDuplicates(['Seller']) #DF_Trans_Final.show(100) DF_Trans_Final.describe('Transaction').show() end_time = timeit.default_timer() print "Sellers: {0}".format(number_of_sellers) print "Buyers: {0}".format(number_of_buyers) print "Trades: {0}".format(max_number_of_trades) print "Executors: {0}".format(executors) print "Cores: {0}".format(cores) print "Memory: {0}".format(memory) print "Partitions: {0}".format(number_of_partitions) print "===========================" print "Total Time: {0} seconds\n\n".format(end_time - start_time) DF_Buyers.unpersist() DF_Sellers.unpersist() DF_Trades.unpersist()